merlimat closed pull request #2116: Added optional key in pulsar IO
URL: https://github.com/apache/incubator-pulsar/pull/2116
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index bda7f6526f..edfb1c4459 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -44,7 +44,6 @@
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.io.core.RecordContext;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 504cc6161b..5ea8bfef5e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -20,6 +20,13 @@
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.collect.Maps;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
@@ -32,22 +39,15 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
-import com.google.common.collect.Maps;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-
-public class MessageImpl<T> extends MessageRecordImpl<T, MessageId> {
+public class MessageImpl<T> implements Message<T> {
+ protected MessageId messageId;
private MessageMetadata.Builder msgMetadataBuilder;
private ClientCnx cnx;
private ByteBuf payload;
@@ -86,7 +86,7 @@
Schema<T> schema) {
this(messageId, msgMetadata, payload, null, cnx, schema);
}
-
+
MessageImpl(MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf
payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema) {
this.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
@@ -330,7 +330,7 @@ public boolean hasReplicateTo() {
void setMessageId(MessageIdImpl messageId) {
this.messageId = messageId;
}
-
+
@Override
public Optional<EncryptionContext> getEncryptionCtx() {
return encryptionCtx;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index f7f1f9b964..4f5ac13f7d 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -26,10 +26,11 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.EncryptionContext;
-public class TopicMessageImpl<T> extends MessageRecordImpl<T,
TopicMessageIdImpl> {
+public class TopicMessageImpl<T> implements Message<T> {
private final String topicName;
private final Message<T> msg;
+ private final TopicMessageIdImpl messageId;
TopicMessageImpl(String topicName,
Message<T> msg) {
@@ -109,9 +110,9 @@ public String getKey() {
public T getValue() {
return msg.getValue();
}
-
+
@Override
public Optional<EncryptionContext> getEncryptionCtx() {
- return (msg instanceof MessageImpl) ? ((MessageImpl)
msg).getEncryptionCtx() : Optional.empty();
+ return msg.getEncryptionCtx();
}
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
deleted file mode 100644
index 7df373acf9..0000000000
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import static org.mockito.Mockito.CALLS_REAL_METHODS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertSame;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.pulsar.client.api.MessageId;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-public class MessageRecordImplTest {
-
- private MessageRecordImpl<byte[], MessageId> message;
- private MessageId messageId;
-
- @BeforeMethod
- public void setup() {
- this.messageId = mock(MessageId.class);
- this.message = mock(MessageRecordImpl.class, CALLS_REAL_METHODS);
- when(message.getMessageId()).thenReturn(messageId);
- }
-
- @Test
- public void testAck() throws Exception {
- final AtomicReference<MessageId> msgIdRef = new AtomicReference<>();
- final CountDownLatch ackLatch = new CountDownLatch(1);
-
- this.message.setAckFunction(msgId -> {
- msgIdRef.set(msgId);
- ackLatch.countDown();
- });
-
- this.message.ack();
- ackLatch.await();
-
- assertSame(messageId, msgIdRef.get());
- }
-
-}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index ba0de385b2..4c29232b85 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -324,7 +324,7 @@ private void processResult(Record srcRecord,
private void sendOutputMessage(Record srcRecord, Object output) {
try {
- this.sink.write(srcRecord, output);
+ this.sink.write(new SinkRecord<>(srcRecord, output));
} catch (Exception e) {
log.info("Encountered exception in sink write: ", e);
throw new RuntimeException(e);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
similarity index 53%
rename from
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
rename to
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index ea1d892c18..4e2edbec23 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -16,65 +16,66 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.functions.instance;
-import java.util.Collections;
import java.util.Map;
import java.util.Optional;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.io.core.Record;
-/**
- * Abstract class that implements message api and connect record api.
- */
-public abstract class MessageRecordImpl<T, M extends MessageId> implements
Message<T>, Record<T> {
+@Data
+@AllArgsConstructor
+public class SinkRecord<T> implements Record<T> {
- protected M messageId;
- protected java.util.function.Consumer<M> ackFunction;
+ private final Record<T> sourceRecord;
+ private final T value;
- public void setAckFunction(java.util.function.Consumer<M> ackFunction) {
- this.ackFunction = ackFunction;
+ public Record<T> getSourceRecord() {
+ return sourceRecord;
}
@Override
- public String getPartitionId() {
- if (null != messageId) {
- if (messageId instanceof MessageIdImpl) {
- return String.valueOf(((MessageIdImpl)
messageId).getPartitionIndex());
- } else {
- return "";
- }
- }
- return "";
+ public Optional<String> getKey() {
+ return sourceRecord.getKey();
}
@Override
- public long getRecordSequence() {
- return getSequenceId();
+ public T getValue() {
+ return value;
}
@Override
- public void ack() {
- if (null != ackFunction) {
- ackFunction.accept((M) getMessageId());
- }
+ public Optional<String> getPartitionId() {
+ return sourceRecord.getPartitionId();
}
@Override
- public void fail() {
- // no-op
+ public Optional<Long> getRecordSequence() {
+ return sourceRecord.getRecordSequence();
+ }
+
+ @Override
+ public Optional<EncryptionContext> getEncryptionCtx() {
+ return sourceRecord.getEncryptionCtx();
}
@Override
public Map<String, String> getProperties() {
- return Collections.emptyMap();
+ return sourceRecord.getProperties();
}
@Override
- public Optional<EncryptionContext> getEncryptionCtx() {
- return Optional.empty();
+ public void ack() {
+ sourceRecord.ack();
+ }
+
+ @Override
+ public void fail() {
+ sourceRecord.fail();
}
+
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 7aebd93b1f..2170879ce3 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -19,10 +19,13 @@
package org.apache.pulsar.functions.sink;
import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Base64;
+import java.util.Map;
+
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
@@ -35,18 +38,18 @@
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.InstanceUtils;
+import org.apache.pulsar.functions.instance.SinkRecord;
import
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
import
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
import org.apache.pulsar.functions.instance.producers.Producers;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
-import java.util.Base64;
-import java.util.Map;
+import net.jodah.typetools.TypeResolver;
@Slf4j
public class PulsarSink<T> implements Sink<T> {
@@ -57,16 +60,16 @@
private PulsarSinkProcessor pulsarSinkProcessor;
- private interface PulsarSinkProcessor {
+ private interface PulsarSinkProcessor<T> {
void initializeOutputProducer(String outputTopic) throws Exception;
void sendOutputMessage(MessageBuilder outputMsgBuilder,
- RecordContext recordContext) throws Exception;
+ Record<T> recordContext) throws Exception;
void close() throws Exception;
}
- private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor
{
+ private class PulsarSinkAtMostOnceProcessor implements
PulsarSinkProcessor<T> {
private Producer<byte[]> producer;
@Override
@@ -77,7 +80,7 @@ public void initializeOutputProducer(String outputTopic)
throws Exception {
@Override
public void sendOutputMessage(MessageBuilder outputMsgBuilder,
- RecordContext recordContext) throws
Exception {
+ Record<T> recordContext) throws
Exception {
Message<byte[]> outputMsg = outputMsgBuilder.build();
this.producer.sendAsync(outputMsg);
}
@@ -94,7 +97,7 @@ public void close() throws Exception {
}
}
- private class PulsarSinkAtLeastOnceProcessor implements
PulsarSinkProcessor {
+ private class PulsarSinkAtLeastOnceProcessor implements
PulsarSinkProcessor<T> {
private Producer<byte[]> producer;
@Override
@@ -105,7 +108,7 @@ public void initializeOutputProducer(String outputTopic)
throws Exception {
@Override
public void sendOutputMessage(MessageBuilder outputMsgBuilder,
- RecordContext recordContext) throws
Exception {
+ Record<T> recordContext) throws
Exception {
Message<byte[]> outputMsg = outputMsgBuilder.build();
this.producer.sendAsync(outputMsg).thenAccept(messageId ->
recordContext.ack());
}
@@ -122,7 +125,7 @@ public void close() throws Exception {
}
}
- private class PulsarSinkEffectivelyOnceProcessor implements
PulsarSinkProcessor, ConsumerEventListener {
+ private class PulsarSinkEffectivelyOnceProcessor implements
PulsarSinkProcessor<T>, ConsumerEventListener {
@Getter(AccessLevel.PACKAGE)
protected Producers outputProducer;
@@ -134,15 +137,16 @@ public void initializeOutputProducer(String outputTopic)
throws Exception {
}
@Override
- public void sendOutputMessage(MessageBuilder outputMsgBuilder,
RecordContext recordContext)
+ public void sendOutputMessage(MessageBuilder outputMsgBuilder,
Record<T> recordContext)
throws Exception {
// assign sequence id to output message for idempotent producing
- outputMsgBuilder = outputMsgBuilder
- .setSequenceId(recordContext.getRecordSequence());
+ if (recordContext.getRecordSequence().isPresent()) {
+
outputMsgBuilder.setSequenceId(recordContext.getRecordSequence().get());
+ }
// currently on PulsarRecord
- Producer producer =
outputProducer.getProducer(recordContext.getPartitionId());
+ Producer producer =
outputProducer.getProducer(recordContext.getPartitionId().get());
org.apache.pulsar.client.api.Message outputMsg =
outputMsgBuilder.build();
producer.sendAsync(outputMsg)
@@ -211,29 +215,37 @@ public void open(Map<String, Object> config, SinkContext
sinkContext) throws Exc
}
@Override
- public void write(RecordContext recordContext, T value) throws Exception {
+ public void write(Record<T> record) throws Exception {
byte[] output;
try {
- output = this.outputSerDe.serialize(value);
+ output = this.outputSerDe.serialize(record.getValue());
} catch (Exception e) {
//TODO Add serialization exception stats
throw new RuntimeException("Error occured when attempting to
serialize output:", e);
}
+
MessageBuilder msgBuilder = MessageBuilder.create();
+ if (record.getKey().isPresent()) {
+ msgBuilder.setKey(record.getKey().get());
+ }
+
msgBuilder.setContent(output);
- if (recordContext instanceof PulsarRecord) {
- PulsarRecord pulsarRecord = (PulsarRecord) recordContext;
+
+ if (!record.getProperties().isEmpty()) {
+ msgBuilder.setProperties(record.getProperties());
+ }
+
+ SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
+ if (sinkRecord.getSourceRecord() instanceof PulsarRecord) {
+ PulsarRecord<T> pulsarRecord = (PulsarRecord<T>)
sinkRecord.getSourceRecord();
// forward user properties to sink-topic
- if (pulsarRecord.getProperties() != null) {
- msgBuilder.setProperties(pulsarRecord.getProperties());
- }
msgBuilder.setProperty("__pfn_input_topic__",
pulsarRecord.getTopicName()).setProperty(
"__pfn_input_msg_id__",
new
String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
}
- this.pulsarSinkProcessor.sendOutputMessage(msgBuilder, recordContext);
+ this.pulsarSinkProcessor.sendOutputMessage(msgBuilder, record);
}
@Override
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 113cf82060..7a6d11b1bd 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -18,35 +18,72 @@
*/
package org.apache.pulsar.functions.source;
+import java.util.Map;
+import java.util.Optional;
+
import lombok.Builder;
-import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
-import java.util.Map;
-import java.util.Optional;
-
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.io.core.Record;
-@Data
@Builder
@Getter
@ToString
@EqualsAndHashCode
public class PulsarRecord<T> implements Record<T> {
- private String partitionId;
- private long recordSequence;
- private T value;
- private MessageId messageId;
- private String topicName;
- private Map<String, String> properties;
- private Optional<EncryptionContext> encryptionCtx = Optional.empty();
- private Runnable failFunction;
- private Runnable ackFunction;
+ private final String topicName;
+ private final int partition;
+
+ // TODO: When we switch to schema for functions, we should just rely on
the message object value
+ private final T value;
+ private final Message<byte[]> message;
+
+ private final Runnable failFunction;
+ private final Runnable ackFunction;
+
+ @Override
+ public Optional<String> getKey() {
+ if (message.hasKey()) {
+ return Optional.of(message.getKey());
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public Optional<String> getPartitionId() {
+ return Optional.of(String.format("%s-%s", topicName, partition));
+ }
+
+ @Override
+ public Optional<Long> getRecordSequence() {
+ return Optional.of(Utils.getSequenceId(message.getMessageId()));
+ }
+
+ /**
+ * Retrieves encryption-context that is attached to record.
+ *
+ * @return {@link Optional}<{@link EncryptionContext}>
+ */
+ public Optional<EncryptionContext> getEncryptionCtx() {
+ return message.getEncryptionCtx();
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return message.getProperties();
+ }
+
+ public MessageId getMessageId() {
+ return message.getMessageId();
+ }
@Override
public void ack() {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 680d754265..d5a1df9ada 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -54,7 +54,7 @@
private boolean isTopicsPattern;
@Getter
- private org.apache.pulsar.client.api.Consumer inputConsumer;
+ private org.apache.pulsar.client.api.Consumer<byte[]> inputConsumer;
public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig
pulsarConfig) {
this.pulsarClient = pulsarClient;
@@ -69,17 +69,17 @@ public void open(Map<String, Object> config, SourceContext
sourceContext) throws
// Setup pulsar consumer
ConsumerBuilder<byte[]> consumerBuilder =
this.pulsarClient.newConsumer()
//consume message even if can't decrypt and deliver it along
with encryption-ctx
- .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+ .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
.subscriptionName(this.pulsarSourceConfig.getSubscriptionName())
.subscriptionType(this.pulsarSourceConfig.getSubscriptionType());
if(isNotBlank(this.pulsarSourceConfig.getTopicsPattern())) {
-
consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern());
+
consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern());
isTopicsPattern = true;
}else {
- consumerBuilder.topics(new
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()));
+ consumerBuilder.topics(new
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()));
}
-
+
if (pulsarSourceConfig.getTimeoutMs() != null) {
consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(),
TimeUnit.MILLISECONDS);
}
@@ -88,21 +88,21 @@ public void open(Map<String, Object> config, SourceContext
sourceContext) throws
@Override
public Record<T> read() throws Exception {
- org.apache.pulsar.client.api.Message<T> message =
this.inputConsumer.receive();
+ org.apache.pulsar.client.api.Message<byte[]> message =
this.inputConsumer.receive();
String topicName;
- String partitionId;
+ int partitionId;
// If more than one topics are being read than the Message return by
the consumer will be TopicMessageImpl
// If there is only topic being read then the Message returned by the
consumer wil be MessageImpl
if (message instanceof TopicMessageImpl) {
- topicName = ((TopicMessageImpl) message).getTopicName();
+ topicName = ((TopicMessageImpl<?>) message).getTopicName();
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl)
message.getMessageId();
MessageIdImpl messageId = (MessageIdImpl)
topicMessageId.getInnerMessageId();
- partitionId = Long.toString(messageId.getPartitionIndex());
+ partitionId = messageId.getPartitionIndex();
} else {
topicName =
this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet().iterator().next();
- partitionId = Long.toString(((MessageIdImpl)
message.getMessageId()).getPartitionIndex());
+ partitionId = ((MessageIdImpl)
message.getMessageId()).getPartitionIndex();
}
Object object;
@@ -130,14 +130,10 @@ public void open(Map<String, Object> config,
SourceContext sourceContext) throws
throw new RuntimeException("Error in casting input to expected
type:", e);
}
- PulsarRecord<T> pulsarMessage = (PulsarRecord<T>)
PulsarRecord.builder()
+ return PulsarRecord.<T>builder()
.value(input)
- .messageId(message.getMessageId())
- .partitionId(String.format("%s-%s", topicName, partitionId))
- .recordSequence(Utils.getSequenceId(message.getMessageId()))
+ .message(message)
.topicName(topicName)
- .properties(message.getProperties())
- .encryptionCtx(message.getEncryptionCtx())
.ackFunction(() -> {
if (pulsarSourceConfig.getProcessingGuarantees() ==
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
inputConsumer.acknowledgeCumulativeAsync(message);
@@ -150,7 +146,6 @@ public void open(Map<String, Object> config, SourceContext
sourceContext) throws
}
})
.build();
- return pulsarMessage;
}
@Override
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 0bac0a68b2..415912a7eb 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -29,6 +29,9 @@
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.assertEquals;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -65,7 +68,7 @@
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -77,9 +80,6 @@
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
-import com.google.common.collect.Lists;
-import com.google.gson.Gson;
-
/**
* Unit test of {@link FunctionApiV2Resource}.
*/
@@ -98,7 +98,7 @@ public String process(String input, Context context) throws
Exception {
return input;
}
}
-
+
public static final class TestSink implements Sink<byte[]> {
@Override
@@ -110,7 +110,7 @@ public void open(Map config, SinkContext sinkContext)
throws Exception {
}
@Override
- public void write(RecordContext inputRecordContext, byte[] value)
throws Exception {
+ public void write(Record<byte[]> record) throws Exception {
}
}
@@ -670,7 +670,7 @@ public void testUpdateFunctionWithUrl() throws IOException {
String fileLocation =
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
String filePackageUrl = "file://" + fileLocation;
-
+
SinkSpec sinkSpec = SinkSpec.newBuilder()
.setTopic(outputTopic)
.setSerDeClassName(outputSerdeClassName).build();
@@ -681,14 +681,14 @@ public void testUpdateFunctionWithUrl() throws
IOException {
.setParallelism(parallelism)
.setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
.putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
-
+
when(mockedManager.containsFunction(eq(tenant), eq(namespace),
eq(function))).thenReturn(true);
RequestResult rr = new RequestResult()
.setSuccess(true)
.setMessage("function registered");
CompletableFuture<RequestResult> requestResult =
CompletableFuture.completedFuture(rr);
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
+
Response response = resource.updateFunction(
tenant,
namespace,
@@ -697,10 +697,10 @@ public void testUpdateFunctionWithUrl() throws
IOException {
null,
filePackageUrl,
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
-
+
assertEquals(Status.OK.getStatusCode(), response.getStatus());
}
-
+
@Test
public void testUpdateFunctionFailure() throws Exception {
mockStatic(Utils.class);
@@ -990,7 +990,7 @@ public void testListFunctionsSuccess() throws Exception {
assertEquals(Status.OK.getStatusCode(), response.getStatus());
assertEquals(new Gson().toJson(functions), response.getEntity());
}
-
+
@Test
public void testDownloadFunctionHttpUrl() throws Exception {
String jarHttpUrl =
"http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
@@ -1006,7 +1006,7 @@ public void testDownloadFunctionHttpUrl() throws
Exception {
pkgFile.delete();
}
}
-
+
@Test
public void testDownloadFunctionFile() throws Exception {
String fileLocation =
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
@@ -1022,7 +1022,7 @@ public void testDownloadFunctionFile() throws Exception {
pkgFile.delete();
}
}
-
+
@Test
public void testRegisterFunctionFileUrlWithValidSinkClass() throws
IOException {
Configurator.setRootLevel(Level.DEBUG);
diff --git
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
index ba296161fd..6ea657e659 100644
---
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
+++
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
@@ -31,22 +31,23 @@
import com.aerospike.client.listener.WriteListener;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.WritePolicy;
-import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.SimpleSink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A Simple abstract class for Aerospike sink
* Users need to implement extractKeyValue function to use this sink
*/
-public abstract class AerospikeAbstractSink<K, V> extends SimpleSink<byte[]> {
+public abstract class AerospikeAbstractSink<K, V> implements Sink<byte[]> {
private static final Logger LOG =
LoggerFactory.getLogger(AerospikeAbstractSink.class);
@@ -55,6 +56,7 @@
private AerospikeClient client;
private WritePolicy writePolicy;
private BlockingQueue<AWriteListener> queue;
+ private NioEventLoops eventLoops;
private EventLoop eventLoop;
@Override
@@ -74,18 +76,25 @@ public void open(Map<String, Object> config, SinkContext
sinkContext) throws Exc
for (int i = 0; i < aerospikeSinkConfig.getMaxConcurrentRequests();
++i) {
queue.put(new AWriteListener(queue));
}
- eventLoop = new NioEventLoops(new EventPolicy(), 1).next();
+
+ eventLoops = new NioEventLoops(new EventPolicy(), 1);
+ eventLoop = eventLoops.next();
}
@Override
public void close() throws Exception {
- client.close();
+ if (client != null) {
+ client.close();
+ }
+
+ if (eventLoops != null) {
+ eventLoops.close();
+ }
LOG.info("Connection Closed");
}
@Override
- public CompletableFuture<Void> write(byte[] record) {
- CompletableFuture<Void> future = new CompletableFuture<>();
+ public void write(Record<byte[]> record) {
KeyValue<K, V> keyValue = extractKeyValue(record);
Key key = new Key(aerospikeSinkConfig.getKeyspace(),
aerospikeSinkConfig.getKeySet(), keyValue.getKey().toString());
Bin bin = new Bin(aerospikeSinkConfig.getColumnName(),
Value.getAsBlob(keyValue.getValue()));
@@ -93,12 +102,11 @@ public void close() throws Exception {
try {
listener = queue.take();
} catch (InterruptedException ex) {
- future.completeExceptionally(ex);
- return future;
+ record.fail();
+ return;
}
- listener.setFuture(future);
+ listener.setContext(record);
client.put(eventLoop, listener, writePolicy, key, bin);
- return future;
}
private void createClient() {
@@ -121,21 +129,21 @@ private void createClient() {
}
private class AWriteListener implements WriteListener {
- private CompletableFuture<Void> future;
+ private Record<byte[]> context;
private BlockingQueue<AWriteListener> queue;
public AWriteListener(BlockingQueue<AWriteListener> queue) {
this.queue = queue;
}
- public void setFuture(CompletableFuture<Void> future) {
- this.future = future;
+ public void setContext(Record<byte[]> record) {
+ this.context = record;
}
@Override
public void onSuccess(Key key) {
- if (future != null) {
- future.complete(null);
+ if (context != null) {
+ context.ack();
}
try {
queue.put(this);
@@ -146,8 +154,8 @@ public void onSuccess(Key key) {
@Override
public void onFailure(AerospikeException e) {
- if (future != null) {
- future.completeExceptionally(e);
+ if (context != null) {
+ context.fail();
}
try {
queue.put(this);
@@ -157,5 +165,5 @@ public void onFailure(AerospikeException e) {
}
}
- public abstract KeyValue<K, V> extractKeyValue(byte[] message);
+ public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> message);
}
\ No newline at end of file
diff --git
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
index 8bf0174603..40ffd9035b 100644
---
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
+++
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
@@ -20,6 +20,7 @@
package org.apache.pulsar.io.aerospike;
import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Record;
/**
* Aerospike sink that treats incoming messages on the input topic as Strings
@@ -27,7 +28,8 @@
*/
public class AerospikeStringSink extends AerospikeAbstractSink<String, String>
{
@Override
- public KeyValue<String, String> extractKeyValue(byte[] message) {
- return new KeyValue<>(new String(message), new String(message));
+ public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
+ String key = record.getKey().orElseGet(() -> new
String(record.getValue()));
+ return new KeyValue<>(key, new String(record.getValue()));
}
}
\ No newline at end of file
diff --git
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
index c8a9ca51a3..fd76c0c630 100644
---
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
+++
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
@@ -29,17 +29,17 @@
import com.google.common.util.concurrent.Futures;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.SimpleSink;
+import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
/**
* A Simple abstract class for Cassandra sink
* Users need to implement extractKeyValue function to use this sink
*/
-public abstract class CassandraAbstractSink<K, V> extends SimpleSink<byte[]> {
+public abstract class CassandraAbstractSink<K, V> implements Sink<byte[]> {
// ----- Runtime fields
private Cluster cluster;
@@ -69,24 +69,22 @@ public void close() throws Exception {
}
@Override
- public CompletableFuture<Void> write(byte[] record) {
+ public void write(Record<byte[]> record) {
KeyValue<K, V> keyValue = extractKeyValue(record);
BoundStatement bound = statement.bind(keyValue.getKey(),
keyValue.getValue());
ResultSetFuture future = session.executeAsync(bound);
- CompletableFuture<Void> completable = new CompletableFuture<Void>();
Futures.addCallback(future,
new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
- completable.complete(null);
+ record.ack();
}
@Override
public void onFailure(Throwable t) {
- completable.completeExceptionally(t);
+ record.fail();
}
});
- return completable;
}
private void createClient(String roots) {
@@ -107,5 +105,5 @@ private void createClient(String roots) {
session.execute("USE " + cassandraSinkConfig.getKeyspace());
}
- public abstract KeyValue<K, V> extractKeyValue(byte[] message);
+ public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> record);
}
\ No newline at end of file
diff --git
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
index 5c0cb16c84..c2e72b8b3e 100644
---
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
+++
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
@@ -20,6 +20,7 @@
package org.apache.pulsar.io.cassandra;
import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Record;
/**
* Cassandra sink that treats incoming messages on the input topic as Strings
@@ -27,7 +28,8 @@
*/
public class CassandraStringSink extends CassandraAbstractSink<String, String>
{
@Override
- public KeyValue<String, String> extractKeyValue(byte[] message) {
- return new KeyValue<>(new String(message), new String(message));
+ public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
+ String key = record.getKey().orElseGet(() -> new
String(record.getValue()));
+ return new KeyValue<>(key, new String(record.getValue()));
}
}
\ No newline at end of file
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
index 08c78bfdef..504d5f4f6b 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
@@ -18,14 +18,74 @@
*/
package org.apache.pulsar.io.core;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.pulsar.common.api.EncryptionContext;
+
/**
- * Pulsar Connect's Record interface. Record encapsulates the
- * information about a record being read from a Source.
+ * Pulsar Connect's Record interface. Record encapsulates the information
about a record being read from a Source.
*/
-public interface Record<T> extends RecordContext {
+public interface Record<T> {
+
+ /**
+ * Return a key if the key has one associated
+ */
+ Optional<String> getKey();
+
/**
* Retrieves the actual data of the record
+ *
* @return The record data
*/
T getValue();
+
+ /**
+ * Retrieves the partition information if any of the record.
+ *
+ * @return The partition id where the
+ */
+ default Optional<String> getPartitionId() {
+ return Optional.empty();
+ }
+
+ /**
+ * Retrieves the sequence of the record from a source partition.
+ *
+ * @return Sequence Id associated with the record
+ */
+ default Optional<Long> getRecordSequence() {
+ return Optional.empty();
+ }
+
+ /**
+ * Retrieves encryption-context that is attached to record.
+ *
+ * @return {@link Optional}<{@link EncryptionContext}>
+ */
+ default public Optional<EncryptionContext> getEncryptionCtx() {
+ return Optional.empty();
+ }
+
+ /**
+ * Retrieves user-defined properties attached to record.
+ *
+ * @return Map of user-properties
+ */
+ default Map<String, String> getProperties() {
+ return Collections.emptyMap();
+ }
+
+ /**
+ * Acknowledge that this record is fully processed
+ */
+ default void ack() {
+ }
+
+ /**
+ * To indicate that this record has failed to be processed
+ */
+ default void fail() {
+ }
}
diff --git
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
deleted file mode 100644
index 5e1212562b..0000000000
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.core;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Optional;
-
-import org.apache.pulsar.common.api.EncryptionContext;
-
-/**
- * A source context that can be used by the runtime to interact with source.
- */
-public interface RecordContext {
-
- /**
- * Retrieves the partition information if any of the record.
- * @return The partition id where the
- */
- default String getPartitionId() { return null; }
-
- /**
- * Retrieves the sequence of the record from a source partition.
- * @return Sequence Id associated with the record
- */
- default long getRecordSequence() { return -1L; }
-
- /**
- * Retrieves user-properties attached to record.
- *
- * @return Map of user-properties
- */
- default Map<String, String> getProperties() { return
Collections.emptyMap();}
-
- /**
- * Retrieves encryption-context that is attached to record.
- *
- * @return {@link Optional}<{@link EncryptionContext}>
- */
- default Optional<EncryptionContext> getEncryptionCtx() { return
Optional.empty();}
-
- /**
- * Acknowledge that this record is fully processed
- */
- default void ack() {}
-
- /**
- * To indicate that this record has failed to be processed
- */
- default void fail() {}
-
-}
diff --git
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
deleted file mode 100644
index 352a0593a4..0000000000
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.core;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * A simpler version of the Sink interface users can extend for use cases to
- * don't require fine grained delivery control
- */
-public abstract class SimpleSink<T> implements Sink<T> {
-
- @Override
- public void write(RecordContext inputRecordContext, T value) throws
Exception {
- write(value)
- .thenAccept(ignored -> inputRecordContext.ack())
- .exceptionally(cause -> {
- inputRecordContext.fail();
- return null;
- });
- }
-
- /**
- * Attempt to publish a type safe collection of messages
- *
- * @param value output value
- * @return Completable future fo async publish request
- */
- public abstract CompletableFuture<Void> write(T value);
-}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
index 31a044b7f2..51d635ac31 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
@@ -23,7 +23,7 @@
/**
* Generic sink interface users can implement to run Sink on top of Pulsar
Functions
*/
-public interface Sink<T> extends AutoCloseable{
+public interface Sink<T> extends AutoCloseable {
/**
* Open connector with configuration
*
@@ -32,12 +32,12 @@
* @throws Exception IO type exceptions when opening a connector
*/
void open(final Map<String, Object> config, SinkContext sinkContext)
throws Exception;
-
+
/**
* Write a message to Sink
- * @param inputRecordContext Context of value
- * @param value value to write to sink
+ * @param inputRecordContext Context of input record from the source
+ * @param record record to write to sink
* @throws Exception
*/
- void write(RecordContext inputRecordContext, T value) throws Exception;
+ void write(Record<T> record) throws Exception;
}
diff --git
a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
index 4fb4a7d4e1..bb1100cf80 100644
--- a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
+++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
@@ -38,7 +38,7 @@ public void open(Map<String, Object> config, SinkContext
sinkContext) throws Exc
}
@Override
- public void write(RecordContext inputRecordContext, String value)
throws Exception {
+ public void write(Record<String> record) throws Exception {
}
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index c597489adc..d6acae83c7 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -19,47 +19,45 @@
package org.apache.pulsar.io.kafka;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.SimpleSink;
+import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
/**
* A Simple abstract class for Kafka sink
* Users need to implement extractKeyValue function to use this sink
*/
-public abstract class KafkaAbstractSink<K, V> extends SimpleSink<byte[]> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(KafkaAbstractSink.class);
+@Slf4j
+public abstract class KafkaAbstractSink<K, V> implements Sink<byte[]> {
private Producer<K, V> producer;
private Properties props = new Properties();
private KafkaSinkConfig kafkaSinkConfig;
@Override
- public CompletableFuture<Void> write(byte[] message) {
- KeyValue<K, V> keyValue = extractKeyValue(message);
+ public void write(Record<byte[]> sourceRecord) {
+ KeyValue<K, V> keyValue = extractKeyValue(sourceRecord);
ProducerRecord<K, V> record = new
ProducerRecord<>(kafkaSinkConfig.getTopic(), keyValue.getKey(),
keyValue.getValue());
- LOG.debug("Record sending to kafka, record={}.", record);
- Future f = producer.send(record);
- return CompletableFuture.supplyAsync(() -> {
- try {
- f.get();
- return null;
- } catch (InterruptedException|ExecutionException e) {
- throw new RuntimeException(e);
+ if (log.isDebugEnabled()) {
+ log.debug("Record sending to kafka, record={}.", record);
+ }
+
+ producer.send(record, (metadata, exception) -> {
+ if (exception == null) {
+ sourceRecord.ack();
+ } else {
+ sourceRecord.fail();
}
});
}
@@ -68,7 +66,7 @@
public void close() throws IOException {
if (producer != null) {
producer.close();
- LOG.info("Kafka sink stopped.");
+ log.info("Kafka sink stopped.");
}
}
@@ -93,8 +91,8 @@ public void open(Map<String, Object> config, SinkContext
sinkContext) throws Exc
producer = new KafkaProducer<>(props);
- LOG.info("Kafka sink started.");
+ log.info("Kafka sink started.");
}
- public abstract KeyValue<K, V> extractKeyValue(byte[] message);
+ public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> message);
}
\ No newline at end of file
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index fe8a6a70ef..bbafc8e5eb 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -33,6 +33,7 @@
import java.util.Arrays;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -44,7 +45,7 @@
private static final Logger LOG =
LoggerFactory.getLogger(KafkaAbstractSource.class);
- private Consumer<byte[], byte[]> consumer;
+ private Consumer<String, byte[]> consumer;
private Properties props;
private KafkaSourceConfig kafkaSourceConfig;
Thread runnerThread;
@@ -98,12 +99,12 @@ public void start() {
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(kafkaSourceConfig.getTopic()));
LOG.info("Kafka source started.");
- ConsumerRecords<byte[], byte[]> consumerRecords;
+ ConsumerRecords<String, byte[]> consumerRecords;
while(true){
consumerRecords = consumer.poll(1000);
CompletableFuture<?>[] futures = new
CompletableFuture<?>[consumerRecords.count()];
int index = 0;
- for (ConsumerRecord<byte[], byte[]> consumerRecord :
consumerRecords) {
+ for (ConsumerRecord<String, byte[]> consumerRecord :
consumerRecords) {
LOG.debug("Record received from kafka, key: {}. value:
{}", consumerRecord.key(), consumerRecord.value());
KafkaRecord<V> record = new KafkaRecord<>(consumerRecord,
extractValue(consumerRecord));
consume(record);
@@ -125,27 +126,32 @@ public void start() {
runnerThread.start();
}
- public abstract V extractValue(ConsumerRecord<byte[], byte[]> record);
+ public abstract V extractValue(ConsumerRecord<String, byte[]> record);
static private class KafkaRecord<V> implements Record<V> {
- private final ConsumerRecord<byte[], byte[]> record;
+ private final ConsumerRecord<String, byte[]> record;
private final V value;
@Getter
- private final CompletableFuture<Void> completableFuture = new
CompletableFuture();
+ private final CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- public KafkaRecord(ConsumerRecord<byte[], byte[]> record,
+ public KafkaRecord(ConsumerRecord<String, byte[]> record,
V value) {
this.record = record;
this.value = value;
}
@Override
- public String getPartitionId() {
- return Integer.toString(record.partition());
+ public Optional<String> getPartitionId() {
+ return Optional.of(Integer.toString(record.partition()));
}
@Override
- public long getRecordSequence() {
- return record.offset();
+ public Optional<Long> getRecordSequence() {
+ return Optional.of(record.offset());
+ }
+
+ @Override
+ public Optional<String> getKey() {
+ return Optional.ofNullable(record.key());
}
@Override
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
index 775b5bec0c..aab44748cb 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
@@ -20,6 +20,7 @@
package org.apache.pulsar.io.kafka;
import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Record;
/**
* Kafka sink that treats incoming messages on the input topic as Strings
@@ -27,7 +28,7 @@
*/
public class KafkaStringSink extends KafkaAbstractSink<String, String> {
@Override
- public KeyValue<String, String> extractKeyValue(byte[] message) {
- return new KeyValue<>(new String(message), new String(message));
+ public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
+ return new KeyValue<>(record.getKey().orElse(null), new
String(record.getValue()));
}
}
\ No newline at end of file
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
index e31b75f4a3..802d943e8d 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
@@ -27,7 +27,7 @@
*/
public class KafkaStringSource extends KafkaAbstractSource<String> {
@Override
- public String extractValue(ConsumerRecord<byte[], byte[]> record) {
+ public String extractValue(ConsumerRecord<String, byte[]> record) {
return new String(record.value());
}
}
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index dbe1e756e5..c3b6cdcd26 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -24,22 +24,6 @@
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.pulsar.io.core.RecordContext;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
@@ -55,17 +39,32 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
- * A Kinesis sink which can be configured by {@link KinesisSinkConfig}.
- * <pre>
+ * A Kinesis sink which can be configured by {@link KinesisSinkConfig}.
+ * <pre>
* {@link KinesisSinkConfig} accepts
* 1. <b>awsEndpoint:</b> kinesis end-point url can be found at :
https://docs.aws.amazon.com/general/latest/gr/rande.html
* 2. <b>awsRegion:</b> appropriate aws region eg: us-west-1, us-west-2
* 3. <b>awsKinesisStreamName:</b> kinesis stream name
* 4. <b>awsCredentialPluginName:</b> Fully-Qualified class name of
implementation of {@link AwsCredentialProviderPlugin}.
* - It is a factory class which creates an {@link AWSCredentialsProvider}
that will be used by {@link KinesisProducer}
- * - If it is empty then {@link KinesisSink} creates default {@link
AWSCredentialsProvider}
- * which accepts json-map of credentials in awsCredentialPluginParam
+ * - If it is empty then {@link KinesisSink} creates default {@link
AWSCredentialsProvider}
+ * which accepts json-map of credentials in awsCredentialPluginParam
* eg: awsCredentialPluginParam =
{"accessKey":"my-access-key","secretKey":"my-secret-key"}
* 5. <b>awsCredentialPluginParam:</b> json-parameters to initialize {@link
AwsCredentialProviderPlugin}
* 6. messageFormat: enum:["ONLY_RAW_PAYLOAD","FULL_MESSAGE_IN_JSON"]
@@ -76,9 +75,9 @@
* Example:
*
{"payloadBase64":"cGF5bG9hZA==","properties":{"prop1":"value"},"encryptionCtx":{"keysMapBase64":{"key1":"dGVzdDE=","key2":"dGVzdDI="},"keysMetadataMap":{"key1":{"ckms":"cmks-1","version":"v1"},"key2":{"ckms":"cmks-2","version":"v2"}},"metadata":{"ckms":"cmks-1","version":"v1"},"encParamBase64":"cGFyYW0=","algorithm":"algo","compressionType":"LZ4","uncompressedMessageSize":10,"batchSize":10}}
* </pre>
- *
- *
- *
+ *
+ *
+ *
*/
public class KinesisSink implements Sink<byte[]> {
@@ -94,20 +93,18 @@
public static final String SECRET_KEY_NAME = "secretKey";
@Override
- public void write(RecordContext inputRecordContext, byte[] value) throws
Exception {
- String partitionedKey =
StringUtils.isNotBlank(inputRecordContext.getPartitionId())
- ? inputRecordContext.getPartitionId()
- : defaultPartitionedKey;
+ public void write(Record<byte[]> record) throws Exception {
+ String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
: partitionedKey; // partitionedKey Length must be at least
one, and at most 256
ListenableFuture<UserRecordResult> addRecordResult =
kinesisProducer.addUserRecord(this.streamName,
partitionedKey,
- createKinesisMessage(kinesisSinkConfig.getMessageFormat(),
inputRecordContext, value));
+ createKinesisMessage(kinesisSinkConfig.getMessageFormat(),
record));
addCallback(addRecordResult,
- ProducerSendCallback.create(this.streamName,
inputRecordContext, System.nanoTime()), directExecutor());
+ ProducerSendCallback.create(this.streamName, record,
System.nanoTime()), directExecutor());
if (LOG.isDebugEnabled()) {
- LOG.debug("Published message to kinesis stream {} with size {}",
streamName, value.length);
+ LOG.debug("Published message to kinesis stream {} with size {}",
streamName, record.getValue().length);
}
}
@@ -151,13 +148,13 @@ protected AWSCredentialsProvider
createCredentialProvider(String awsCredentialPl
if (isNotBlank(awsCredentialPluginName)) {
return createCredentialProviderWithPlugin(awsCredentialPluginName,
awsCredentialPluginParam);
} else {
- return defaultCredentialProvider(awsCredentialPluginParam);
+ return defaultCredentialProvider(awsCredentialPluginParam);
}
}
private static final class ProducerSendCallback implements
FutureCallback<UserRecordResult> {
- private RecordContext resultContext;
+ private Record<byte[]> resultContext;
private String streamName;
private long startTime = 0;
private final Handle<ProducerSendCallback> recyclerHandle;
@@ -166,9 +163,9 @@ private ProducerSendCallback(Handle<ProducerSendCallback>
recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
- static ProducerSendCallback create(String streamName, RecordContext
result, long startTime) {
+ static ProducerSendCallback create(String streamName, Record<byte[]>
resultContext, long startTime) {
ProducerSendCallback sendCallback = RECYCLER.get();
- sendCallback.resultContext = result;
+ sendCallback.resultContext = resultContext;
sendCallback.streamName = streamName;
sendCallback.startTime = startTime;
return sendCallback;
@@ -210,7 +207,7 @@ public void onFailure(Throwable exception) {
/**
* Creates a instance of credential provider which can return {@link
AWSCredentials} or {@link BasicAWSCredentials}
* based on IAM user/roles.
- *
+ *
* @param pluginFQClassName
* @param param
* @return
@@ -234,7 +231,7 @@ public static AWSCredentialsProvider
createCredentialProviderWithPlugin(String p
/**
* It creates a default credential provider which takes accessKey and
secretKey form configuration and creates
* {@link AWSCredentials}
- *
+ *
* @param awsCredentialPluginParam
* @return
*/
@@ -274,15 +271,15 @@ public void refresh() {
};
}
- public static ByteBuffer createKinesisMessage(MessageFormat msgFormat,
RecordContext recordCtx, byte[] data) {
+ public static ByteBuffer createKinesisMessage(MessageFormat msgFormat,
Record<byte[]> record) {
if (MessageFormat.FULL_MESSAGE_IN_JSON.equals(msgFormat)) {
- return ByteBuffer.wrap(Utils.serializeRecordToJson(recordCtx,
data).getBytes());
+ return
ByteBuffer.wrap(Utils.serializeRecordToJson(record).getBytes());
} else if (MessageFormat.FULL_MESSAGE_IN_FB.equals(msgFormat)) {
- return Utils.serializeRecordToFlatBuffer(recordCtx, data);
+ return Utils.serializeRecordToFlatBuffer(record);
} else {
// send raw-message
- return ByteBuffer.wrap(data);
+ return ByteBuffer.wrap(record.getValue());
}
}
-
+
}
\ No newline at end of file
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
index 469151eeb9..a7382b80e0 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
@@ -22,20 +22,21 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Base64.getEncoder;
+import com.google.gson.JsonObject;
+
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import org.apache.pulsar.common.api.EncryptionContext;
-import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.kinesis.fbs.EncryptionCtx;
import org.apache.pulsar.io.kinesis.fbs.EncryptionKey;
import org.apache.pulsar.io.kinesis.fbs.KeyValue;
import org.apache.pulsar.io.kinesis.fbs.Message;
import com.google.flatbuffers.FlatBufferBuilder;
-import com.google.gson.JsonObject;
public class Utils {
@@ -49,25 +50,25 @@
private static final String UNCPRESSED_MSG_SIZE_FIELD =
"uncompressedMessageSize";
private static final String BATCH_SIZE_FIELD = "batchSize";
private static final String ENCRYPTION_CTX_FIELD = "encryptionCtx";
-
+
private static final FlatBufferBuilder DEFAULT_FB_BUILDER = new
FlatBufferBuilder(0);
/**
* Serialize record to flat-buffer. it's not a thread-safe method.
- *
- * @param inputRecordContext
+ *
+ * @param record
* @param data
* @return
*/
- public static ByteBuffer serializeRecordToFlatBuffer(RecordContext
inputRecordContext, byte[] data) {
+ public static ByteBuffer serializeRecordToFlatBuffer(Record<byte[]>
record) {
DEFAULT_FB_BUILDER.clear();
- return serializeRecordToFlatBuffer(DEFAULT_FB_BUILDER,
inputRecordContext, data);
+ return serializeRecordToFlatBuffer(DEFAULT_FB_BUILDER, record);
}
-
- public static ByteBuffer serializeRecordToFlatBuffer(FlatBufferBuilder
builder, RecordContext inputRecordContext, byte[] data) {
- checkNotNull(inputRecordContext, "record-context can't be null");
- Optional<EncryptionContext> encryptionCtx =
inputRecordContext.getEncryptionCtx();
- Map<String, String> properties = inputRecordContext.getProperties();
+
+ public static ByteBuffer serializeRecordToFlatBuffer(FlatBufferBuilder
builder, Record<byte[]> record) {
+ checkNotNull(record, "record-context can't be null");
+ Optional<EncryptionContext> encryptionCtx = record.getEncryptionCtx();
+ Map<String, String> properties = record.getProperties();
int encryptionCtxOffset = -1;
int propertiesOffset = -1;
@@ -86,7 +87,7 @@ public static ByteBuffer
serializeRecordToFlatBuffer(FlatBufferBuilder builder,
encryptionCtxOffset = createEncryptionCtxOffset(builder,
encryptionCtx);
}
- int payloadOffset = Message.createPayloadVector(builder, data);
+ int payloadOffset = Message.createPayloadVector(builder,
record.getValue());
Message.startMessage(builder);
Message.addPayload(builder, payloadOffset);
if (encryptionCtxOffset != -1) {
@@ -98,11 +99,11 @@ public static ByteBuffer
serializeRecordToFlatBuffer(FlatBufferBuilder builder,
int endMessage = Message.endMessage(builder);
builder.finish(endMessage);
ByteBuffer bb = builder.dataBuffer();
-
+
// to avoid copying of data, use same byte[] wrapped by ByteBuffer.
But, ByteBuffer.array() returns entire array
// so, it requires to read from offset:
// builder.sizedByteArray()=>copies buffer: sizedByteArray(space,
bb.capacity() - space)
- int space = bb.capacity() - builder.offset();
+ int space = bb.capacity() - builder.offset();
return ByteBuffer.wrap(bb.array(), space, bb.capacity() - space);
}
@@ -133,7 +134,7 @@ private static int createEncryptionCtxOffset(final
FlatBufferBuilder builder, Op
EncryptionKey.addKey(builder, key);
EncryptionKey.addValue(builder, value);
if(metadataOffset!=-1) {
- EncryptionKey.addMetadata(builder, metadataOffset);
+ EncryptionKey.addMetadata(builder, metadataOffset);
}
keysOffsets[keyIndex++] = EncryptionKey.endEncryptionKey(builder);
}
@@ -156,29 +157,31 @@ private static int createEncryptionCtxOffset(final
FlatBufferBuilder builder, Op
}
return EncryptionCtx.createEncryptionCtx(builder, keysOffset, param,
algo, compressionType,
ctx.getUncompressedMessageSize(), batchSize,
ctx.getBatchSize().isPresent());
-
+
}
/**
* Serializes sink-record into json format. It encodes encryption-keys,
encryption-param and payload in base64
* format so, it can be sent in json.
- *
+ *
* @param inputRecordContext
* @param data
* @return
*/
- public static String serializeRecordToJson(RecordContext
inputRecordContext, byte[] data) {
- checkNotNull(inputRecordContext, "record-context can't be null");
+ public static String serializeRecordToJson(Record<byte[]> record) {
+ checkNotNull(record, "record can't be null");
+
JsonObject result = new JsonObject();
- result.addProperty(PAYLOAD_FIELD, getEncoder().encodeToString(data));
- if (inputRecordContext.getProperties() != null) {
+ result.addProperty(PAYLOAD_FIELD,
getEncoder().encodeToString(record.getValue()));
+ if (record.getProperties() != null) {
JsonObject properties = new JsonObject();
- inputRecordContext.getProperties().entrySet()
+ record.getProperties().entrySet()
.forEach(e -> properties.addProperty(e.getKey(),
e.getValue()));
result.add(PROPERTIES_FIELD, properties);
}
- if (inputRecordContext.getEncryptionCtx().isPresent()) {
- EncryptionContext encryptionCtx =
inputRecordContext.getEncryptionCtx().get();
+
+ if (record.getEncryptionCtx().isPresent()) {
+ EncryptionContext encryptionCtx = record.getEncryptionCtx().get();
JsonObject encryptionCtxJson = new JsonObject();
JsonObject keyBase64Map = new JsonObject();
JsonObject keyMetadataMap = new JsonObject();
diff --git
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
index b9f5c8a6b1..45aa0c7237 100644
---
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
+++
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
@@ -20,16 +20,22 @@
import static java.util.Base64.getDecoder;
+import com.google.gson.Gson;
+
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
-import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.kinesis.fbs.KeyValue;
import org.apache.pulsar.io.kinesis.fbs.Message;
import org.testng.Assert;
@@ -37,12 +43,6 @@
import org.testng.annotations.Test;
import org.testng.collections.Maps;
-import com.google.gson.Gson;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-
/**
* Unit test of {@link UtilsTest}.
*/
@@ -52,7 +52,7 @@
public Object[][] encryptionProvider() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}
-
+
@Test
public void testJsonSerialization() throws Exception {
@@ -75,9 +75,9 @@ public void testJsonSerialization() throws Exception {
Map<String, String> metadata2 = Maps.newHashMap();
metadata2.put("version", "v2");
metadata2.put("ckms", "cmks-2");
- RecordContext recordCtx = createRecordContext(algo, keyNames,
keyValues, param.getBytes(), metadata1, metadata2,
+ Record<byte[]> recordCtx = createRecord(data, algo, keyNames,
keyValues, param.getBytes(), metadata1, metadata2,
batchSize, compressionMsgSize, properties, true);
- String json = Utils.serializeRecordToJson(recordCtx, data);
+ String json = Utils.serializeRecordToJson(recordCtx);
// deserialize from json and assert
KinesisMessageResponse kinesisJsonResponse =
deSerializeRecordFromJson(json);
@@ -118,9 +118,9 @@ public void testFbSerialization(boolean isEncryption)
throws Exception {
Map<String, String> metadata2 = Maps.newHashMap();
metadata2.put("version", "v2");
metadata2.put("ckms", "cmks-2");
- RecordContext recordCtx = createRecordContext(algo, keyNames,
keyValues, param.getBytes(), metadata1,
+ Record<byte[]> record = createRecord(data, algo, keyNames,
keyValues, param.getBytes(), metadata1,
metadata2, batchSize, compressionMsgSize, properties,
isEncryption);
- ByteBuffer flatBuffer =
Utils.serializeRecordToFlatBuffer(recordCtx, data);
+ ByteBuffer flatBuffer = Utils.serializeRecordToFlatBuffer(record);
Message kinesisJsonResponse = Message.getRootAsMessage(flatBuffer);
byte[] fbPayloadBytes = new
byte[kinesisJsonResponse.payloadLength()];
@@ -164,7 +164,7 @@ public void testFbSerialization(boolean isEncryption)
throws Exception {
Assert.assertEquals(param.getBytes(), paramBytes);
Assert.assertEquals(algo, encryptionCtxDeser.algo());
}
-
+
Map<String, String> fbproperties = Maps.newHashMap();
for (int i = 0; i < kinesisJsonResponse.propertiesLength(); i++) {
KeyValue property = kinesisJsonResponse.properties(i);
@@ -175,7 +175,7 @@ public void testFbSerialization(boolean isEncryption)
throws Exception {
}
}
- private RecordContext createRecordContext(String algo, String[] keyNames,
byte[][] keyValues, byte[] param,
+ private Record<byte[]> createRecord(byte[] data, String algo, String[]
keyNames, byte[][] keyValues, byte[] param,
Map<String, String> metadata1, Map<String, String> metadata2, int
batchSize, int compressionMsgSize,
Map<String, String> properties, boolean isEncryption) {
EncryptionContext ctx = null;
@@ -198,14 +198,16 @@ private RecordContext createRecordContext(String algo,
String[] keyNames, byte[]
ctx.setKeys(keys);
ctx.setParam(param);
}
- return new RecordContextImpl(properties, Optional.ofNullable(ctx));
+ return new RecordImpl(data, properties, Optional.ofNullable(ctx));
}
- class RecordContextImpl implements RecordContext {
+ class RecordImpl implements Record<byte[]> {
+ byte[] data;
Map<String, String> properties;
Optional<EncryptionContext> ectx;
- public RecordContextImpl(Map<String, String> properties,
Optional<EncryptionContext> ectx) {
+ public RecordImpl(byte[] data, Map<String, String> properties,
Optional<EncryptionContext> ectx) {
+ this.data = data;
this.properties = properties;
this.ectx = ectx;
}
@@ -217,6 +219,16 @@ public RecordContextImpl(Map<String, String> properties,
Optional<EncryptionCont
public Optional<EncryptionContext> getEncryptionCtx() {
return ectx;
}
+
+ @Override
+ public Optional<String> getKey() {
+ return Optional.empty();
+ }
+
+ @Override
+ public byte[] getValue() {
+ return data;
+ }
}
public static KinesisMessageResponse deSerializeRecordFromJson(String
jsonRecord) {
diff --git
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index dab3fe96eb..c05828fac1 100644
---
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -25,15 +25,19 @@
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+
+import lombok.Data;
+
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Map;
-
/**
* A simple connector to consume messages from a RabbitMQ queue
*/
@@ -82,20 +86,13 @@ public RabbitMQConsumer(RabbitMQSource source, Channel
channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
- source.consume(new RabbitMQRecord(body));
+ source.consume(new
RabbitMQRecord(Optional.ofNullable(envelope.getRoutingKey()), body));
}
}
+ @Data
static private class RabbitMQRecord implements Record<byte[]> {
- private final byte[] data;
-
- public RabbitMQRecord(byte[] data) {
- this.data = data;
- }
-
- @Override
- public byte[] getValue() {
- return data;
- }
+ private final Optional<String> key;
+ private final byte[] value;
}
}
\ No newline at end of file
diff --git
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index 56c52e7ca4..494ae48e5b 100644
---
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -33,6 +33,7 @@
import java.io.InputStream;
import java.io.Serializable;
import java.util.Map;
+import java.util.Optional;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.Record;
@@ -165,6 +166,12 @@ public TwitterRecord(String tweet) {
this.tweet = tweet;
}
+ @Override
+ public Optional<String> getKey() {
+ // TODO: Could use user or tweet ID as key here
+ return Optional.empty();
+ }
+
@Override
public String getValue() {
return tweet;
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services