This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ea6eae7 Added optional key in pulsar IO (#2116)
ea6eae7 is described below
commit ea6eae7d22e94fd2a998813ec9b2af42d9035fc0
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Jul 13 15:54:07 2018 -0700
Added optional key in pulsar IO (#2116)
* Added optional key in pulsar IO
* Unified Record and RecordContext interfaces
* Fixed pulsar sink
* Fixed test compilation
* Fixed imports
* Removed wrong import
---
.../apache/pulsar/admin/cli/CmdFunctionsTest.java | 1 -
.../org/apache/pulsar/client/impl/MessageImpl.java | 22 +++---
.../pulsar/client/impl/TopicMessageImpl.java | 7 +-
.../pulsar/client/impl/MessageRecordImplTest.java | 60 ----------------
.../functions/instance/JavaInstanceRunnable.java | 2 +-
.../pulsar/functions/instance/SinkRecord.java | 65 +++++++++---------
.../apache/pulsar/functions/sink/PulsarSink.java | 58 +++++++++-------
.../pulsar/functions/source/PulsarRecord.java | 65 ++++++++++++++----
.../pulsar/functions/source/PulsarSource.java | 29 ++++----
.../rest/api/v2/FunctionApiV2ResourceTest.java | 28 ++++----
.../pulsar/io/aerospike/AerospikeAbstractSink.java | 54 ++++++++-------
.../pulsar/io/aerospike/AerospikeStringSink.java | 6 +-
.../pulsar/io/cassandra/CassandraAbstractSink.java | 16 ++---
.../pulsar/io/cassandra/CassandraStringSink.java | 6 +-
.../java/org/apache/pulsar/io/core/Record.java | 66 +++++++++++++++++-
.../org/apache/pulsar/io/core/RecordContext.java | 68 -------------------
.../java/org/apache/pulsar/io/core/SimpleSink.java | 47 -------------
.../main/java/org/apache/pulsar/io/core/Sink.java | 10 +--
.../java/org/apache/pulsar/io/core/SinkTest.java | 2 +-
.../apache/pulsar/io/kafka/KafkaAbstractSink.java | 50 +++++++-------
.../pulsar/io/kafka/KafkaAbstractSource.java | 28 +++++---
.../apache/pulsar/io/kafka/KafkaStringSink.java | 5 +-
.../apache/pulsar/io/kafka/KafkaStringSource.java | 2 +-
.../org/apache/pulsar/io/kinesis/KinesisSink.java | 79 +++++++++++-----------
.../java/org/apache/pulsar/io/kinesis/Utils.java | 53 ++++++++-------
.../org/apache/pulsar/io/kinesis/UtilsTest.java | 46 ++++++++-----
.../apache/pulsar/io/rabbitmq/RabbitMQSource.java | 25 +++----
.../apache/pulsar/io/twitter/TwitterFireHose.java | 7 ++
28 files changed, 434 insertions(+), 473 deletions(-)
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index bda7f65..edfb1c4 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -44,7 +44,6 @@ import
org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.io.core.RecordContext;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 504cc61..5ea8bfe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -20,6 +20,13 @@ package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkNotNull;
+import com.google.common.collect.Maps;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
@@ -32,22 +39,15 @@ import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
-import com.google.common.collect.Maps;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-
-public class MessageImpl<T> extends MessageRecordImpl<T, MessageId> {
+public class MessageImpl<T> implements Message<T> {
+ protected MessageId messageId;
private MessageMetadata.Builder msgMetadataBuilder;
private ClientCnx cnx;
private ByteBuf payload;
@@ -86,7 +86,7 @@ public class MessageImpl<T> extends MessageRecordImpl<T,
MessageId> {
Schema<T> schema) {
this(messageId, msgMetadata, payload, null, cnx, schema);
}
-
+
MessageImpl(MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf
payload,
Optional<EncryptionContext> encryptionCtx, ClientCnx cnx,
Schema<T> schema) {
this.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
@@ -330,7 +330,7 @@ public class MessageImpl<T> extends MessageRecordImpl<T,
MessageId> {
void setMessageId(MessageIdImpl messageId) {
this.messageId = messageId;
}
-
+
@Override
public Optional<EncryptionContext> getEncryptionCtx() {
return encryptionCtx;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index f7f1f9b..4f5ac13 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -26,10 +26,11 @@ import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.EncryptionContext;
-public class TopicMessageImpl<T> extends MessageRecordImpl<T,
TopicMessageIdImpl> {
+public class TopicMessageImpl<T> implements Message<T> {
private final String topicName;
private final Message<T> msg;
+ private final TopicMessageIdImpl messageId;
TopicMessageImpl(String topicName,
Message<T> msg) {
@@ -109,9 +110,9 @@ public class TopicMessageImpl<T> extends
MessageRecordImpl<T, TopicMessageIdImpl
public T getValue() {
return msg.getValue();
}
-
+
@Override
public Optional<EncryptionContext> getEncryptionCtx() {
- return (msg instanceof MessageImpl) ? ((MessageImpl)
msg).getEncryptionCtx() : Optional.empty();
+ return msg.getEncryptionCtx();
}
}
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
deleted file mode 100644
index 7df373a..0000000
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import static org.mockito.Mockito.CALLS_REAL_METHODS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertSame;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.pulsar.client.api.MessageId;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-public class MessageRecordImplTest {
-
- private MessageRecordImpl<byte[], MessageId> message;
- private MessageId messageId;
-
- @BeforeMethod
- public void setup() {
- this.messageId = mock(MessageId.class);
- this.message = mock(MessageRecordImpl.class, CALLS_REAL_METHODS);
- when(message.getMessageId()).thenReturn(messageId);
- }
-
- @Test
- public void testAck() throws Exception {
- final AtomicReference<MessageId> msgIdRef = new AtomicReference<>();
- final CountDownLatch ackLatch = new CountDownLatch(1);
-
- this.message.setAckFunction(msgId -> {
- msgIdRef.set(msgId);
- ackLatch.countDown();
- });
-
- this.message.ack();
- ackLatch.await();
-
- assertSame(messageId, msgIdRef.get());
- }
-
-}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index ba0de38..4c29232 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -324,7 +324,7 @@ public class JavaInstanceRunnable implements AutoCloseable,
Runnable {
private void sendOutputMessage(Record srcRecord, Object output) {
try {
- this.sink.write(srcRecord, output);
+ this.sink.write(new SinkRecord<>(srcRecord, output));
} catch (Exception e) {
log.info("Encountered exception in sink write: ", e);
throw new RuntimeException(e);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
similarity index 53%
rename from
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
rename to
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index ea1d892..4e2edbe 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -16,65 +16,66 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.functions.instance;
-import java.util.Collections;
import java.util.Map;
import java.util.Optional;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.io.core.Record;
-/**
- * Abstract class that implements message api and connect record api.
- */
-public abstract class MessageRecordImpl<T, M extends MessageId> implements
Message<T>, Record<T> {
+@Data
+@AllArgsConstructor
+public class SinkRecord<T> implements Record<T> {
- protected M messageId;
- protected java.util.function.Consumer<M> ackFunction;
+ private final Record<T> sourceRecord;
+ private final T value;
- public void setAckFunction(java.util.function.Consumer<M> ackFunction) {
- this.ackFunction = ackFunction;
+ public Record<T> getSourceRecord() {
+ return sourceRecord;
}
@Override
- public String getPartitionId() {
- if (null != messageId) {
- if (messageId instanceof MessageIdImpl) {
- return String.valueOf(((MessageIdImpl)
messageId).getPartitionIndex());
- } else {
- return "";
- }
- }
- return "";
+ public Optional<String> getKey() {
+ return sourceRecord.getKey();
}
@Override
- public long getRecordSequence() {
- return getSequenceId();
+ public T getValue() {
+ return value;
}
@Override
- public void ack() {
- if (null != ackFunction) {
- ackFunction.accept((M) getMessageId());
- }
+ public Optional<String> getPartitionId() {
+ return sourceRecord.getPartitionId();
}
@Override
- public void fail() {
- // no-op
+ public Optional<Long> getRecordSequence() {
+ return sourceRecord.getRecordSequence();
+ }
+
+ @Override
+ public Optional<EncryptionContext> getEncryptionCtx() {
+ return sourceRecord.getEncryptionCtx();
}
@Override
public Map<String, String> getProperties() {
- return Collections.emptyMap();
+ return sourceRecord.getProperties();
}
@Override
- public Optional<EncryptionContext> getEncryptionCtx() {
- return Optional.empty();
+ public void ack() {
+ sourceRecord.ack();
+ }
+
+ @Override
+ public void fail() {
+ sourceRecord.fail();
}
+
}
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 7aebd93..2170879 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -19,10 +19,13 @@
package org.apache.pulsar.functions.sink;
import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Base64;
+import java.util.Map;
+
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
@@ -35,18 +38,18 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.api.utils.DefaultSerDe;
import org.apache.pulsar.functions.instance.InstanceUtils;
+import org.apache.pulsar.functions.instance.SinkRecord;
import
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
import
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
import org.apache.pulsar.functions.instance.producers.Producers;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
-import java.util.Base64;
-import java.util.Map;
+import net.jodah.typetools.TypeResolver;
@Slf4j
public class PulsarSink<T> implements Sink<T> {
@@ -57,16 +60,16 @@ public class PulsarSink<T> implements Sink<T> {
private PulsarSinkProcessor pulsarSinkProcessor;
- private interface PulsarSinkProcessor {
+ private interface PulsarSinkProcessor<T> {
void initializeOutputProducer(String outputTopic) throws Exception;
void sendOutputMessage(MessageBuilder outputMsgBuilder,
- RecordContext recordContext) throws Exception;
+ Record<T> recordContext) throws Exception;
void close() throws Exception;
}
- private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor
{
+ private class PulsarSinkAtMostOnceProcessor implements
PulsarSinkProcessor<T> {
private Producer<byte[]> producer;
@Override
@@ -77,7 +80,7 @@ public class PulsarSink<T> implements Sink<T> {
@Override
public void sendOutputMessage(MessageBuilder outputMsgBuilder,
- RecordContext recordContext) throws
Exception {
+ Record<T> recordContext) throws
Exception {
Message<byte[]> outputMsg = outputMsgBuilder.build();
this.producer.sendAsync(outputMsg);
}
@@ -94,7 +97,7 @@ public class PulsarSink<T> implements Sink<T> {
}
}
- private class PulsarSinkAtLeastOnceProcessor implements
PulsarSinkProcessor {
+ private class PulsarSinkAtLeastOnceProcessor implements
PulsarSinkProcessor<T> {
private Producer<byte[]> producer;
@Override
@@ -105,7 +108,7 @@ public class PulsarSink<T> implements Sink<T> {
@Override
public void sendOutputMessage(MessageBuilder outputMsgBuilder,
- RecordContext recordContext) throws
Exception {
+ Record<T> recordContext) throws
Exception {
Message<byte[]> outputMsg = outputMsgBuilder.build();
this.producer.sendAsync(outputMsg).thenAccept(messageId ->
recordContext.ack());
}
@@ -122,7 +125,7 @@ public class PulsarSink<T> implements Sink<T> {
}
}
- private class PulsarSinkEffectivelyOnceProcessor implements
PulsarSinkProcessor, ConsumerEventListener {
+ private class PulsarSinkEffectivelyOnceProcessor implements
PulsarSinkProcessor<T>, ConsumerEventListener {
@Getter(AccessLevel.PACKAGE)
protected Producers outputProducer;
@@ -134,15 +137,16 @@ public class PulsarSink<T> implements Sink<T> {
}
@Override
- public void sendOutputMessage(MessageBuilder outputMsgBuilder,
RecordContext recordContext)
+ public void sendOutputMessage(MessageBuilder outputMsgBuilder,
Record<T> recordContext)
throws Exception {
// assign sequence id to output message for idempotent producing
- outputMsgBuilder = outputMsgBuilder
- .setSequenceId(recordContext.getRecordSequence());
+ if (recordContext.getRecordSequence().isPresent()) {
+
outputMsgBuilder.setSequenceId(recordContext.getRecordSequence().get());
+ }
// currently on PulsarRecord
- Producer producer =
outputProducer.getProducer(recordContext.getPartitionId());
+ Producer producer =
outputProducer.getProducer(recordContext.getPartitionId().get());
org.apache.pulsar.client.api.Message outputMsg =
outputMsgBuilder.build();
producer.sendAsync(outputMsg)
@@ -211,29 +215,37 @@ public class PulsarSink<T> implements Sink<T> {
}
@Override
- public void write(RecordContext recordContext, T value) throws Exception {
+ public void write(Record<T> record) throws Exception {
byte[] output;
try {
- output = this.outputSerDe.serialize(value);
+ output = this.outputSerDe.serialize(record.getValue());
} catch (Exception e) {
//TODO Add serialization exception stats
throw new RuntimeException("Error occured when attempting to
serialize output:", e);
}
+
MessageBuilder msgBuilder = MessageBuilder.create();
+ if (record.getKey().isPresent()) {
+ msgBuilder.setKey(record.getKey().get());
+ }
+
msgBuilder.setContent(output);
- if (recordContext instanceof PulsarRecord) {
- PulsarRecord pulsarRecord = (PulsarRecord) recordContext;
+
+ if (!record.getProperties().isEmpty()) {
+ msgBuilder.setProperties(record.getProperties());
+ }
+
+ SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
+ if (sinkRecord.getSourceRecord() instanceof PulsarRecord) {
+ PulsarRecord<T> pulsarRecord = (PulsarRecord<T>)
sinkRecord.getSourceRecord();
// forward user properties to sink-topic
- if (pulsarRecord.getProperties() != null) {
- msgBuilder.setProperties(pulsarRecord.getProperties());
- }
msgBuilder.setProperty("__pfn_input_topic__",
pulsarRecord.getTopicName()).setProperty(
"__pfn_input_msg_id__",
new
String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
}
- this.pulsarSinkProcessor.sendOutputMessage(msgBuilder, recordContext);
+ this.pulsarSinkProcessor.sendOutputMessage(msgBuilder, record);
}
@Override
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 113cf82..7a6d11b 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -18,35 +18,72 @@
*/
package org.apache.pulsar.functions.source;
+import java.util.Map;
+import java.util.Optional;
+
import lombok.Builder;
-import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
-import java.util.Map;
-import java.util.Optional;
-
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.io.core.Record;
-@Data
@Builder
@Getter
@ToString
@EqualsAndHashCode
public class PulsarRecord<T> implements Record<T> {
- private String partitionId;
- private long recordSequence;
- private T value;
- private MessageId messageId;
- private String topicName;
- private Map<String, String> properties;
- private Optional<EncryptionContext> encryptionCtx = Optional.empty();
- private Runnable failFunction;
- private Runnable ackFunction;
+ private final String topicName;
+ private final int partition;
+
+ // TODO: When we switch to schema for functions, we should just rely on
the message object value
+ private final T value;
+ private final Message<byte[]> message;
+
+ private final Runnable failFunction;
+ private final Runnable ackFunction;
+
+ @Override
+ public Optional<String> getKey() {
+ if (message.hasKey()) {
+ return Optional.of(message.getKey());
+ } else {
+ return Optional.empty();
+ }
+ }
+
+ @Override
+ public Optional<String> getPartitionId() {
+ return Optional.of(String.format("%s-%s", topicName, partition));
+ }
+
+ @Override
+ public Optional<Long> getRecordSequence() {
+ return Optional.of(Utils.getSequenceId(message.getMessageId()));
+ }
+
+ /**
+ * Retrieves encryption-context that is attached to record.
+ *
+ * @return {@link Optional}<{@link EncryptionContext}>
+ */
+ public Optional<EncryptionContext> getEncryptionCtx() {
+ return message.getEncryptionCtx();
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ return message.getProperties();
+ }
+
+ public MessageId getMessageId() {
+ return message.getMessageId();
+ }
@Override
public void ack() {
diff --git
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 680d754..d5a1df9 100644
---
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -54,7 +54,7 @@ public class PulsarSource<T> implements Source<T> {
private boolean isTopicsPattern;
@Getter
- private org.apache.pulsar.client.api.Consumer inputConsumer;
+ private org.apache.pulsar.client.api.Consumer<byte[]> inputConsumer;
public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig
pulsarConfig) {
this.pulsarClient = pulsarClient;
@@ -69,17 +69,17 @@ public class PulsarSource<T> implements Source<T> {
// Setup pulsar consumer
ConsumerBuilder<byte[]> consumerBuilder =
this.pulsarClient.newConsumer()
//consume message even if can't decrypt and deliver it along
with encryption-ctx
- .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
+ .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
.subscriptionName(this.pulsarSourceConfig.getSubscriptionName())
.subscriptionType(this.pulsarSourceConfig.getSubscriptionType());
if(isNotBlank(this.pulsarSourceConfig.getTopicsPattern())) {
-
consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern());
+
consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern());
isTopicsPattern = true;
}else {
- consumerBuilder.topics(new
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()));
+ consumerBuilder.topics(new
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()));
}
-
+
if (pulsarSourceConfig.getTimeoutMs() != null) {
consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(),
TimeUnit.MILLISECONDS);
}
@@ -88,21 +88,21 @@ public class PulsarSource<T> implements Source<T> {
@Override
public Record<T> read() throws Exception {
- org.apache.pulsar.client.api.Message<T> message =
this.inputConsumer.receive();
+ org.apache.pulsar.client.api.Message<byte[]> message =
this.inputConsumer.receive();
String topicName;
- String partitionId;
+ int partitionId;
// If more than one topics are being read than the Message return by
the consumer will be TopicMessageImpl
// If there is only topic being read then the Message returned by the
consumer wil be MessageImpl
if (message instanceof TopicMessageImpl) {
- topicName = ((TopicMessageImpl) message).getTopicName();
+ topicName = ((TopicMessageImpl<?>) message).getTopicName();
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl)
message.getMessageId();
MessageIdImpl messageId = (MessageIdImpl)
topicMessageId.getInnerMessageId();
- partitionId = Long.toString(messageId.getPartitionIndex());
+ partitionId = messageId.getPartitionIndex();
} else {
topicName =
this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet().iterator().next();
- partitionId = Long.toString(((MessageIdImpl)
message.getMessageId()).getPartitionIndex());
+ partitionId = ((MessageIdImpl)
message.getMessageId()).getPartitionIndex();
}
Object object;
@@ -130,14 +130,10 @@ public class PulsarSource<T> implements Source<T> {
throw new RuntimeException("Error in casting input to expected
type:", e);
}
- PulsarRecord<T> pulsarMessage = (PulsarRecord<T>)
PulsarRecord.builder()
+ return PulsarRecord.<T>builder()
.value(input)
- .messageId(message.getMessageId())
- .partitionId(String.format("%s-%s", topicName, partitionId))
- .recordSequence(Utils.getSequenceId(message.getMessageId()))
+ .message(message)
.topicName(topicName)
- .properties(message.getProperties())
- .encryptionCtx(message.getEncryptionCtx())
.ackFunction(() -> {
if (pulsarSourceConfig.getProcessingGuarantees() ==
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
inputConsumer.acknowledgeCumulativeAsync(message);
@@ -150,7 +146,6 @@ public class PulsarSource<T> implements Source<T> {
}
})
.build();
- return pulsarMessage;
}
@Override
diff --git
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 0bac0a6..415912a 100644
---
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -29,6 +29,9 @@ import static org.powermock.api.mockito.PowerMockito.doThrow;
import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.assertEquals;
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -65,7 +68,7 @@ import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -77,9 +80,6 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.ObjectFactory;
import org.testng.annotations.Test;
-import com.google.common.collect.Lists;
-import com.google.gson.Gson;
-
/**
* Unit test of {@link FunctionApiV2Resource}.
*/
@@ -98,7 +98,7 @@ public class FunctionApiV2ResourceTest {
return input;
}
}
-
+
public static final class TestSink implements Sink<byte[]> {
@Override
@@ -110,7 +110,7 @@ public class FunctionApiV2ResourceTest {
}
@Override
- public void write(RecordContext inputRecordContext, byte[] value)
throws Exception {
+ public void write(Record<byte[]> record) throws Exception {
}
}
@@ -670,7 +670,7 @@ public class FunctionApiV2ResourceTest {
String fileLocation =
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
String filePackageUrl = "file://" + fileLocation;
-
+
SinkSpec sinkSpec = SinkSpec.newBuilder()
.setTopic(outputTopic)
.setSerDeClassName(outputSerdeClassName).build();
@@ -681,14 +681,14 @@ public class FunctionApiV2ResourceTest {
.setParallelism(parallelism)
.setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
.putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
-
+
when(mockedManager.containsFunction(eq(tenant), eq(namespace),
eq(function))).thenReturn(true);
RequestResult rr = new RequestResult()
.setSuccess(true)
.setMessage("function registered");
CompletableFuture<RequestResult> requestResult =
CompletableFuture.completedFuture(rr);
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-
+
Response response = resource.updateFunction(
tenant,
namespace,
@@ -697,10 +697,10 @@ public class FunctionApiV2ResourceTest {
null,
filePackageUrl,
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
-
+
assertEquals(Status.OK.getStatusCode(), response.getStatus());
}
-
+
@Test
public void testUpdateFunctionFailure() throws Exception {
mockStatic(Utils.class);
@@ -990,7 +990,7 @@ public class FunctionApiV2ResourceTest {
assertEquals(Status.OK.getStatusCode(), response.getStatus());
assertEquals(new Gson().toJson(functions), response.getEntity());
}
-
+
@Test
public void testDownloadFunctionHttpUrl() throws Exception {
String jarHttpUrl =
"http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
@@ -1006,7 +1006,7 @@ public class FunctionApiV2ResourceTest {
pkgFile.delete();
}
}
-
+
@Test
public void testDownloadFunctionFile() throws Exception {
String fileLocation =
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
@@ -1022,7 +1022,7 @@ public class FunctionApiV2ResourceTest {
pkgFile.delete();
}
}
-
+
@Test
public void testRegisterFunctionFileUrlWithValidSinkClass() throws
IOException {
Configurator.setRootLevel(Level.DEBUG);
diff --git
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
index ba29616..6ea657e 100644
---
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
+++
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
@@ -31,22 +31,23 @@ import com.aerospike.client.async.NioEventLoops;
import com.aerospike.client.listener.WriteListener;
import com.aerospike.client.policy.ClientPolicy;
import com.aerospike.client.policy.WritePolicy;
-import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.SimpleSink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingDeque;
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A Simple abstract class for Aerospike sink
* Users need to implement extractKeyValue function to use this sink
*/
-public abstract class AerospikeAbstractSink<K, V> extends SimpleSink<byte[]> {
+public abstract class AerospikeAbstractSink<K, V> implements Sink<byte[]> {
private static final Logger LOG =
LoggerFactory.getLogger(AerospikeAbstractSink.class);
@@ -55,6 +56,7 @@ public abstract class AerospikeAbstractSink<K, V> extends
SimpleSink<byte[]> {
private AerospikeClient client;
private WritePolicy writePolicy;
private BlockingQueue<AWriteListener> queue;
+ private NioEventLoops eventLoops;
private EventLoop eventLoop;
@Override
@@ -74,18 +76,25 @@ public abstract class AerospikeAbstractSink<K, V> extends
SimpleSink<byte[]> {
for (int i = 0; i < aerospikeSinkConfig.getMaxConcurrentRequests();
++i) {
queue.put(new AWriteListener(queue));
}
- eventLoop = new NioEventLoops(new EventPolicy(), 1).next();
+
+ eventLoops = new NioEventLoops(new EventPolicy(), 1);
+ eventLoop = eventLoops.next();
}
@Override
public void close() throws Exception {
- client.close();
+ if (client != null) {
+ client.close();
+ }
+
+ if (eventLoops != null) {
+ eventLoops.close();
+ }
LOG.info("Connection Closed");
}
@Override
- public CompletableFuture<Void> write(byte[] record) {
- CompletableFuture<Void> future = new CompletableFuture<>();
+ public void write(Record<byte[]> record) {
KeyValue<K, V> keyValue = extractKeyValue(record);
Key key = new Key(aerospikeSinkConfig.getKeyspace(),
aerospikeSinkConfig.getKeySet(), keyValue.getKey().toString());
Bin bin = new Bin(aerospikeSinkConfig.getColumnName(),
Value.getAsBlob(keyValue.getValue()));
@@ -93,12 +102,11 @@ public abstract class AerospikeAbstractSink<K, V> extends
SimpleSink<byte[]> {
try {
listener = queue.take();
} catch (InterruptedException ex) {
- future.completeExceptionally(ex);
- return future;
+ record.fail();
+ return;
}
- listener.setFuture(future);
+ listener.setContext(record);
client.put(eventLoop, listener, writePolicy, key, bin);
- return future;
}
private void createClient() {
@@ -121,21 +129,21 @@ public abstract class AerospikeAbstractSink<K, V> extends
SimpleSink<byte[]> {
}
private class AWriteListener implements WriteListener {
- private CompletableFuture<Void> future;
+ private Record<byte[]> context;
private BlockingQueue<AWriteListener> queue;
public AWriteListener(BlockingQueue<AWriteListener> queue) {
this.queue = queue;
}
- public void setFuture(CompletableFuture<Void> future) {
- this.future = future;
+ public void setContext(Record<byte[]> record) {
+ this.context = record;
}
@Override
public void onSuccess(Key key) {
- if (future != null) {
- future.complete(null);
+ if (context != null) {
+ context.ack();
}
try {
queue.put(this);
@@ -146,8 +154,8 @@ public abstract class AerospikeAbstractSink<K, V> extends
SimpleSink<byte[]> {
@Override
public void onFailure(AerospikeException e) {
- if (future != null) {
- future.completeExceptionally(e);
+ if (context != null) {
+ context.fail();
}
try {
queue.put(this);
@@ -157,5 +165,5 @@ public abstract class AerospikeAbstractSink<K, V> extends
SimpleSink<byte[]> {
}
}
- public abstract KeyValue<K, V> extractKeyValue(byte[] message);
+ public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> message);
}
\ No newline at end of file
diff --git
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
index 8bf0174..40ffd90 100644
---
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
+++
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
@@ -20,6 +20,7 @@
package org.apache.pulsar.io.aerospike;
import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Record;
/**
* Aerospike sink that treats incoming messages on the input topic as Strings
@@ -27,7 +28,8 @@ import org.apache.pulsar.io.core.KeyValue;
*/
public class AerospikeStringSink extends AerospikeAbstractSink<String, String>
{
@Override
- public KeyValue<String, String> extractKeyValue(byte[] message) {
- return new KeyValue<>(new String(message), new String(message));
+ public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
+ String key = record.getKey().orElseGet(() -> new
String(record.getValue()));
+ return new KeyValue<>(key, new String(record.getValue()));
}
}
\ No newline at end of file
diff --git
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
index c8a9ca5..fd76c0c 100644
---
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
+++
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
@@ -29,17 +29,17 @@ import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.SimpleSink;
+import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
/**
* A Simple abstract class for Cassandra sink
* Users need to implement extractKeyValue function to use this sink
*/
-public abstract class CassandraAbstractSink<K, V> extends SimpleSink<byte[]> {
+public abstract class CassandraAbstractSink<K, V> implements Sink<byte[]> {
// ----- Runtime fields
private Cluster cluster;
@@ -69,24 +69,22 @@ public abstract class CassandraAbstractSink<K, V> extends
SimpleSink<byte[]> {
}
@Override
- public CompletableFuture<Void> write(byte[] record) {
+ public void write(Record<byte[]> record) {
KeyValue<K, V> keyValue = extractKeyValue(record);
BoundStatement bound = statement.bind(keyValue.getKey(),
keyValue.getValue());
ResultSetFuture future = session.executeAsync(bound);
- CompletableFuture<Void> completable = new CompletableFuture<Void>();
Futures.addCallback(future,
new FutureCallback<ResultSet>() {
@Override
public void onSuccess(ResultSet result) {
- completable.complete(null);
+ record.ack();
}
@Override
public void onFailure(Throwable t) {
- completable.completeExceptionally(t);
+ record.fail();
}
});
- return completable;
}
private void createClient(String roots) {
@@ -107,5 +105,5 @@ public abstract class CassandraAbstractSink<K, V> extends
SimpleSink<byte[]> {
session.execute("USE " + cassandraSinkConfig.getKeyspace());
}
- public abstract KeyValue<K, V> extractKeyValue(byte[] message);
+ public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> record);
}
\ No newline at end of file
diff --git
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
index 5c0cb16..c2e72b8 100644
---
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
+++
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
@@ -20,6 +20,7 @@
package org.apache.pulsar.io.cassandra;
import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Record;
/**
* Cassandra sink that treats incoming messages on the input topic as Strings
@@ -27,7 +28,8 @@ import org.apache.pulsar.io.core.KeyValue;
*/
public class CassandraStringSink extends CassandraAbstractSink<String, String>
{
@Override
- public KeyValue<String, String> extractKeyValue(byte[] message) {
- return new KeyValue<>(new String(message), new String(message));
+ public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
+ String key = record.getKey().orElseGet(() -> new
String(record.getValue()));
+ return new KeyValue<>(key, new String(record.getValue()));
}
}
\ No newline at end of file
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
index 08c78bf..504d5f4 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
@@ -18,14 +18,74 @@
*/
package org.apache.pulsar.io.core;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.pulsar.common.api.EncryptionContext;
+
/**
- * Pulsar Connect's Record interface. Record encapsulates the
- * information about a record being read from a Source.
+ * Pulsar Connect's Record interface. Record encapsulates the information
about a record being read from a Source.
*/
-public interface Record<T> extends RecordContext {
+public interface Record<T> {
+
+ /**
+ * Return a key if the key has one associated
+ */
+ Optional<String> getKey();
+
/**
* Retrieves the actual data of the record
+ *
* @return The record data
*/
T getValue();
+
+ /**
+ * Retrieves the partition information if any of the record.
+ *
+ * @return The partition id where the
+ */
+ default Optional<String> getPartitionId() {
+ return Optional.empty();
+ }
+
+ /**
+ * Retrieves the sequence of the record from a source partition.
+ *
+ * @return Sequence Id associated with the record
+ */
+ default Optional<Long> getRecordSequence() {
+ return Optional.empty();
+ }
+
+ /**
+ * Retrieves encryption-context that is attached to record.
+ *
+ * @return {@link Optional}<{@link EncryptionContext}>
+ */
+ default public Optional<EncryptionContext> getEncryptionCtx() {
+ return Optional.empty();
+ }
+
+ /**
+ * Retrieves user-defined properties attached to record.
+ *
+ * @return Map of user-properties
+ */
+ default Map<String, String> getProperties() {
+ return Collections.emptyMap();
+ }
+
+ /**
+ * Acknowledge that this record is fully processed
+ */
+ default void ack() {
+ }
+
+ /**
+ * To indicate that this record has failed to be processed
+ */
+ default void fail() {
+ }
}
diff --git
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
deleted file mode 100644
index 5e12125..0000000
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.core;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Optional;
-
-import org.apache.pulsar.common.api.EncryptionContext;
-
-/**
- * A source context that can be used by the runtime to interact with source.
- */
-public interface RecordContext {
-
- /**
- * Retrieves the partition information if any of the record.
- * @return The partition id where the
- */
- default String getPartitionId() { return null; }
-
- /**
- * Retrieves the sequence of the record from a source partition.
- * @return Sequence Id associated with the record
- */
- default long getRecordSequence() { return -1L; }
-
- /**
- * Retrieves user-properties attached to record.
- *
- * @return Map of user-properties
- */
- default Map<String, String> getProperties() { return
Collections.emptyMap();}
-
- /**
- * Retrieves encryption-context that is attached to record.
- *
- * @return {@link Optional}<{@link EncryptionContext}>
- */
- default Optional<EncryptionContext> getEncryptionCtx() { return
Optional.empty();}
-
- /**
- * Acknowledge that this record is fully processed
- */
- default void ack() {}
-
- /**
- * To indicate that this record has failed to be processed
- */
- default void fail() {}
-
-}
diff --git
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
deleted file mode 100644
index 352a059..0000000
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.core;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * A simpler version of the Sink interface users can extend for use cases to
- * don't require fine grained delivery control
- */
-public abstract class SimpleSink<T> implements Sink<T> {
-
- @Override
- public void write(RecordContext inputRecordContext, T value) throws
Exception {
- write(value)
- .thenAccept(ignored -> inputRecordContext.ack())
- .exceptionally(cause -> {
- inputRecordContext.fail();
- return null;
- });
- }
-
- /**
- * Attempt to publish a type safe collection of messages
- *
- * @param value output value
- * @return Completable future fo async publish request
- */
- public abstract CompletableFuture<Void> write(T value);
-}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
index 31a044b..51d635a 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
@@ -23,7 +23,7 @@ import java.util.Map;
/**
* Generic sink interface users can implement to run Sink on top of Pulsar
Functions
*/
-public interface Sink<T> extends AutoCloseable{
+public interface Sink<T> extends AutoCloseable {
/**
* Open connector with configuration
*
@@ -32,12 +32,12 @@ public interface Sink<T> extends AutoCloseable{
* @throws Exception IO type exceptions when opening a connector
*/
void open(final Map<String, Object> config, SinkContext sinkContext)
throws Exception;
-
+
/**
* Write a message to Sink
- * @param inputRecordContext Context of value
- * @param value value to write to sink
+ * @param inputRecordContext Context of input record from the source
+ * @param record record to write to sink
* @throws Exception
*/
- void write(RecordContext inputRecordContext, T value) throws Exception;
+ void write(Record<T> record) throws Exception;
}
diff --git
a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
index 4fb4a7d..bb1100c 100644
--- a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
+++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
@@ -38,7 +38,7 @@ public class SinkTest {
}
@Override
- public void write(RecordContext inputRecordContext, String value)
throws Exception {
+ public void write(Record<String> record) throws Exception {
}
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index c597489..d6acae8 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -19,47 +19,45 @@
package org.apache.pulsar.io.kafka;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import lombok.extern.slf4j.Slf4j;
+
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.SimpleSink;
+import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
/**
* A Simple abstract class for Kafka sink
* Users need to implement extractKeyValue function to use this sink
*/
-public abstract class KafkaAbstractSink<K, V> extends SimpleSink<byte[]> {
-
- private static final Logger LOG =
LoggerFactory.getLogger(KafkaAbstractSink.class);
+@Slf4j
+public abstract class KafkaAbstractSink<K, V> implements Sink<byte[]> {
private Producer<K, V> producer;
private Properties props = new Properties();
private KafkaSinkConfig kafkaSinkConfig;
@Override
- public CompletableFuture<Void> write(byte[] message) {
- KeyValue<K, V> keyValue = extractKeyValue(message);
+ public void write(Record<byte[]> sourceRecord) {
+ KeyValue<K, V> keyValue = extractKeyValue(sourceRecord);
ProducerRecord<K, V> record = new
ProducerRecord<>(kafkaSinkConfig.getTopic(), keyValue.getKey(),
keyValue.getValue());
- LOG.debug("Record sending to kafka, record={}.", record);
- Future f = producer.send(record);
- return CompletableFuture.supplyAsync(() -> {
- try {
- f.get();
- return null;
- } catch (InterruptedException|ExecutionException e) {
- throw new RuntimeException(e);
+ if (log.isDebugEnabled()) {
+ log.debug("Record sending to kafka, record={}.", record);
+ }
+
+ producer.send(record, (metadata, exception) -> {
+ if (exception == null) {
+ sourceRecord.ack();
+ } else {
+ sourceRecord.fail();
}
});
}
@@ -68,7 +66,7 @@ public abstract class KafkaAbstractSink<K, V> extends
SimpleSink<byte[]> {
public void close() throws IOException {
if (producer != null) {
producer.close();
- LOG.info("Kafka sink stopped.");
+ log.info("Kafka sink stopped.");
}
}
@@ -93,8 +91,8 @@ public abstract class KafkaAbstractSink<K, V> extends
SimpleSink<byte[]> {
producer = new KafkaProducer<>(props);
- LOG.info("Kafka sink started.");
+ log.info("Kafka sink started.");
}
- public abstract KeyValue<K, V> extractKeyValue(byte[] message);
+ public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> message);
}
\ No newline at end of file
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index fe8a6a7..bbafc8e 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -44,7 +45,7 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaAbstractSource.class);
- private Consumer<byte[], byte[]> consumer;
+ private Consumer<String, byte[]> consumer;
private Properties props;
private KafkaSourceConfig kafkaSourceConfig;
Thread runnerThread;
@@ -98,12 +99,12 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(kafkaSourceConfig.getTopic()));
LOG.info("Kafka source started.");
- ConsumerRecords<byte[], byte[]> consumerRecords;
+ ConsumerRecords<String, byte[]> consumerRecords;
while(true){
consumerRecords = consumer.poll(1000);
CompletableFuture<?>[] futures = new
CompletableFuture<?>[consumerRecords.count()];
int index = 0;
- for (ConsumerRecord<byte[], byte[]> consumerRecord :
consumerRecords) {
+ for (ConsumerRecord<String, byte[]> consumerRecord :
consumerRecords) {
LOG.debug("Record received from kafka, key: {}. value:
{}", consumerRecord.key(), consumerRecord.value());
KafkaRecord<V> record = new KafkaRecord<>(consumerRecord,
extractValue(consumerRecord));
consume(record);
@@ -125,27 +126,32 @@ public abstract class KafkaAbstractSource<V> extends
PushSource<V> {
runnerThread.start();
}
- public abstract V extractValue(ConsumerRecord<byte[], byte[]> record);
+ public abstract V extractValue(ConsumerRecord<String, byte[]> record);
static private class KafkaRecord<V> implements Record<V> {
- private final ConsumerRecord<byte[], byte[]> record;
+ private final ConsumerRecord<String, byte[]> record;
private final V value;
@Getter
- private final CompletableFuture<Void> completableFuture = new
CompletableFuture();
+ private final CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
- public KafkaRecord(ConsumerRecord<byte[], byte[]> record,
+ public KafkaRecord(ConsumerRecord<String, byte[]> record,
V value) {
this.record = record;
this.value = value;
}
@Override
- public String getPartitionId() {
- return Integer.toString(record.partition());
+ public Optional<String> getPartitionId() {
+ return Optional.of(Integer.toString(record.partition()));
}
@Override
- public long getRecordSequence() {
- return record.offset();
+ public Optional<Long> getRecordSequence() {
+ return Optional.of(record.offset());
+ }
+
+ @Override
+ public Optional<String> getKey() {
+ return Optional.ofNullable(record.key());
}
@Override
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
index 775b5be..aab4474 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
@@ -20,6 +20,7 @@
package org.apache.pulsar.io.kafka;
import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Record;
/**
* Kafka sink that treats incoming messages on the input topic as Strings
@@ -27,7 +28,7 @@ import org.apache.pulsar.io.core.KeyValue;
*/
public class KafkaStringSink extends KafkaAbstractSink<String, String> {
@Override
- public KeyValue<String, String> extractKeyValue(byte[] message) {
- return new KeyValue<>(new String(message), new String(message));
+ public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
+ return new KeyValue<>(record.getKey().orElse(null), new
String(record.getValue()));
}
}
\ No newline at end of file
diff --git
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
index e31b75f..802d943 100644
---
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
+++
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
@@ -27,7 +27,7 @@ import org.apache.kafka.clients.consumer.*;
*/
public class KafkaStringSource extends KafkaAbstractSource<String> {
@Override
- public String extractValue(ConsumerRecord<byte[], byte[]> record) {
+ public String extractValue(ConsumerRecord<String, byte[]> record) {
return new String(record.value());
}
}
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index dbe1e75..c3b6cdc 100644
---
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -24,22 +24,6 @@ import static
com.google.common.util.concurrent.Futures.addCallback;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.pulsar.io.core.RecordContext;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
@@ -55,17 +39,32 @@ import com.google.gson.reflect.TypeToken;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
- * A Kinesis sink which can be configured by {@link KinesisSinkConfig}.
- * <pre>
+ * A Kinesis sink which can be configured by {@link KinesisSinkConfig}.
+ * <pre>
* {@link KinesisSinkConfig} accepts
* 1. <b>awsEndpoint:</b> kinesis end-point url can be found at :
https://docs.aws.amazon.com/general/latest/gr/rande.html
* 2. <b>awsRegion:</b> appropriate aws region eg: us-west-1, us-west-2
* 3. <b>awsKinesisStreamName:</b> kinesis stream name
* 4. <b>awsCredentialPluginName:</b> Fully-Qualified class name of
implementation of {@link AwsCredentialProviderPlugin}.
* - It is a factory class which creates an {@link AWSCredentialsProvider}
that will be used by {@link KinesisProducer}
- * - If it is empty then {@link KinesisSink} creates default {@link
AWSCredentialsProvider}
- * which accepts json-map of credentials in awsCredentialPluginParam
+ * - If it is empty then {@link KinesisSink} creates default {@link
AWSCredentialsProvider}
+ * which accepts json-map of credentials in awsCredentialPluginParam
* eg: awsCredentialPluginParam =
{"accessKey":"my-access-key","secretKey":"my-secret-key"}
* 5. <b>awsCredentialPluginParam:</b> json-parameters to initialize {@link
AwsCredentialProviderPlugin}
* 6. messageFormat: enum:["ONLY_RAW_PAYLOAD","FULL_MESSAGE_IN_JSON"]
@@ -76,9 +75,9 @@ import io.netty.util.Recycler.Handle;
* Example:
*
{"payloadBase64":"cGF5bG9hZA==","properties":{"prop1":"value"},"encryptionCtx":{"keysMapBase64":{"key1":"dGVzdDE=","key2":"dGVzdDI="},"keysMetadataMap":{"key1":{"ckms":"cmks-1","version":"v1"},"key2":{"ckms":"cmks-2","version":"v2"}},"metadata":{"ckms":"cmks-1","version":"v1"},"encParamBase64":"cGFyYW0=","algorithm":"algo","compressionType":"LZ4","uncompressedMessageSize":10,"batchSize":10}}
* </pre>
- *
- *
- *
+ *
+ *
+ *
*/
public class KinesisSink implements Sink<byte[]> {
@@ -94,20 +93,18 @@ public class KinesisSink implements Sink<byte[]> {
public static final String SECRET_KEY_NAME = "secretKey";
@Override
- public void write(RecordContext inputRecordContext, byte[] value) throws
Exception {
- String partitionedKey =
StringUtils.isNotBlank(inputRecordContext.getPartitionId())
- ? inputRecordContext.getPartitionId()
- : defaultPartitionedKey;
+ public void write(Record<byte[]> record) throws Exception {
+ String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
: partitionedKey; // partitionedKey Length must be at least
one, and at most 256
ListenableFuture<UserRecordResult> addRecordResult =
kinesisProducer.addUserRecord(this.streamName,
partitionedKey,
- createKinesisMessage(kinesisSinkConfig.getMessageFormat(),
inputRecordContext, value));
+ createKinesisMessage(kinesisSinkConfig.getMessageFormat(),
record));
addCallback(addRecordResult,
- ProducerSendCallback.create(this.streamName,
inputRecordContext, System.nanoTime()), directExecutor());
+ ProducerSendCallback.create(this.streamName, record,
System.nanoTime()), directExecutor());
if (LOG.isDebugEnabled()) {
- LOG.debug("Published message to kinesis stream {} with size {}",
streamName, value.length);
+ LOG.debug("Published message to kinesis stream {} with size {}",
streamName, record.getValue().length);
}
}
@@ -151,13 +148,13 @@ public class KinesisSink implements Sink<byte[]> {
if (isNotBlank(awsCredentialPluginName)) {
return createCredentialProviderWithPlugin(awsCredentialPluginName,
awsCredentialPluginParam);
} else {
- return defaultCredentialProvider(awsCredentialPluginParam);
+ return defaultCredentialProvider(awsCredentialPluginParam);
}
}
private static final class ProducerSendCallback implements
FutureCallback<UserRecordResult> {
- private RecordContext resultContext;
+ private Record<byte[]> resultContext;
private String streamName;
private long startTime = 0;
private final Handle<ProducerSendCallback> recyclerHandle;
@@ -166,9 +163,9 @@ public class KinesisSink implements Sink<byte[]> {
this.recyclerHandle = recyclerHandle;
}
- static ProducerSendCallback create(String streamName, RecordContext
result, long startTime) {
+ static ProducerSendCallback create(String streamName, Record<byte[]>
resultContext, long startTime) {
ProducerSendCallback sendCallback = RECYCLER.get();
- sendCallback.resultContext = result;
+ sendCallback.resultContext = resultContext;
sendCallback.streamName = streamName;
sendCallback.startTime = startTime;
return sendCallback;
@@ -210,7 +207,7 @@ public class KinesisSink implements Sink<byte[]> {
/**
* Creates a instance of credential provider which can return {@link
AWSCredentials} or {@link BasicAWSCredentials}
* based on IAM user/roles.
- *
+ *
* @param pluginFQClassName
* @param param
* @return
@@ -234,7 +231,7 @@ public class KinesisSink implements Sink<byte[]> {
/**
* It creates a default credential provider which takes accessKey and
secretKey form configuration and creates
* {@link AWSCredentials}
- *
+ *
* @param awsCredentialPluginParam
* @return
*/
@@ -274,15 +271,15 @@ public class KinesisSink implements Sink<byte[]> {
};
}
- public static ByteBuffer createKinesisMessage(MessageFormat msgFormat,
RecordContext recordCtx, byte[] data) {
+ public static ByteBuffer createKinesisMessage(MessageFormat msgFormat,
Record<byte[]> record) {
if (MessageFormat.FULL_MESSAGE_IN_JSON.equals(msgFormat)) {
- return ByteBuffer.wrap(Utils.serializeRecordToJson(recordCtx,
data).getBytes());
+ return
ByteBuffer.wrap(Utils.serializeRecordToJson(record).getBytes());
} else if (MessageFormat.FULL_MESSAGE_IN_FB.equals(msgFormat)) {
- return Utils.serializeRecordToFlatBuffer(recordCtx, data);
+ return Utils.serializeRecordToFlatBuffer(record);
} else {
// send raw-message
- return ByteBuffer.wrap(data);
+ return ByteBuffer.wrap(record.getValue());
}
}
-
+
}
\ No newline at end of file
diff --git
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
index 469151e..a7382b8 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
@@ -22,20 +22,21 @@ package org.apache.pulsar.io.kinesis;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Base64.getEncoder;
+import com.google.gson.JsonObject;
+
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import org.apache.pulsar.common.api.EncryptionContext;
-import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.kinesis.fbs.EncryptionCtx;
import org.apache.pulsar.io.kinesis.fbs.EncryptionKey;
import org.apache.pulsar.io.kinesis.fbs.KeyValue;
import org.apache.pulsar.io.kinesis.fbs.Message;
import com.google.flatbuffers.FlatBufferBuilder;
-import com.google.gson.JsonObject;
public class Utils {
@@ -49,25 +50,25 @@ public class Utils {
private static final String UNCPRESSED_MSG_SIZE_FIELD =
"uncompressedMessageSize";
private static final String BATCH_SIZE_FIELD = "batchSize";
private static final String ENCRYPTION_CTX_FIELD = "encryptionCtx";
-
+
private static final FlatBufferBuilder DEFAULT_FB_BUILDER = new
FlatBufferBuilder(0);
/**
* Serialize record to flat-buffer. it's not a thread-safe method.
- *
- * @param inputRecordContext
+ *
+ * @param record
* @param data
* @return
*/
- public static ByteBuffer serializeRecordToFlatBuffer(RecordContext
inputRecordContext, byte[] data) {
+ public static ByteBuffer serializeRecordToFlatBuffer(Record<byte[]>
record) {
DEFAULT_FB_BUILDER.clear();
- return serializeRecordToFlatBuffer(DEFAULT_FB_BUILDER,
inputRecordContext, data);
+ return serializeRecordToFlatBuffer(DEFAULT_FB_BUILDER, record);
}
-
- public static ByteBuffer serializeRecordToFlatBuffer(FlatBufferBuilder
builder, RecordContext inputRecordContext, byte[] data) {
- checkNotNull(inputRecordContext, "record-context can't be null");
- Optional<EncryptionContext> encryptionCtx =
inputRecordContext.getEncryptionCtx();
- Map<String, String> properties = inputRecordContext.getProperties();
+
+ public static ByteBuffer serializeRecordToFlatBuffer(FlatBufferBuilder
builder, Record<byte[]> record) {
+ checkNotNull(record, "record-context can't be null");
+ Optional<EncryptionContext> encryptionCtx = record.getEncryptionCtx();
+ Map<String, String> properties = record.getProperties();
int encryptionCtxOffset = -1;
int propertiesOffset = -1;
@@ -86,7 +87,7 @@ public class Utils {
encryptionCtxOffset = createEncryptionCtxOffset(builder,
encryptionCtx);
}
- int payloadOffset = Message.createPayloadVector(builder, data);
+ int payloadOffset = Message.createPayloadVector(builder,
record.getValue());
Message.startMessage(builder);
Message.addPayload(builder, payloadOffset);
if (encryptionCtxOffset != -1) {
@@ -98,11 +99,11 @@ public class Utils {
int endMessage = Message.endMessage(builder);
builder.finish(endMessage);
ByteBuffer bb = builder.dataBuffer();
-
+
// to avoid copying of data, use same byte[] wrapped by ByteBuffer.
But, ByteBuffer.array() returns entire array
// so, it requires to read from offset:
// builder.sizedByteArray()=>copies buffer: sizedByteArray(space,
bb.capacity() - space)
- int space = bb.capacity() - builder.offset();
+ int space = bb.capacity() - builder.offset();
return ByteBuffer.wrap(bb.array(), space, bb.capacity() - space);
}
@@ -133,7 +134,7 @@ public class Utils {
EncryptionKey.addKey(builder, key);
EncryptionKey.addValue(builder, value);
if(metadataOffset!=-1) {
- EncryptionKey.addMetadata(builder, metadataOffset);
+ EncryptionKey.addMetadata(builder, metadataOffset);
}
keysOffsets[keyIndex++] = EncryptionKey.endEncryptionKey(builder);
}
@@ -156,29 +157,31 @@ public class Utils {
}
return EncryptionCtx.createEncryptionCtx(builder, keysOffset, param,
algo, compressionType,
ctx.getUncompressedMessageSize(), batchSize,
ctx.getBatchSize().isPresent());
-
+
}
/**
* Serializes sink-record into json format. It encodes encryption-keys,
encryption-param and payload in base64
* format so, it can be sent in json.
- *
+ *
* @param inputRecordContext
* @param data
* @return
*/
- public static String serializeRecordToJson(RecordContext
inputRecordContext, byte[] data) {
- checkNotNull(inputRecordContext, "record-context can't be null");
+ public static String serializeRecordToJson(Record<byte[]> record) {
+ checkNotNull(record, "record can't be null");
+
JsonObject result = new JsonObject();
- result.addProperty(PAYLOAD_FIELD, getEncoder().encodeToString(data));
- if (inputRecordContext.getProperties() != null) {
+ result.addProperty(PAYLOAD_FIELD,
getEncoder().encodeToString(record.getValue()));
+ if (record.getProperties() != null) {
JsonObject properties = new JsonObject();
- inputRecordContext.getProperties().entrySet()
+ record.getProperties().entrySet()
.forEach(e -> properties.addProperty(e.getKey(),
e.getValue()));
result.add(PROPERTIES_FIELD, properties);
}
- if (inputRecordContext.getEncryptionCtx().isPresent()) {
- EncryptionContext encryptionCtx =
inputRecordContext.getEncryptionCtx().get();
+
+ if (record.getEncryptionCtx().isPresent()) {
+ EncryptionContext encryptionCtx = record.getEncryptionCtx().get();
JsonObject encryptionCtxJson = new JsonObject();
JsonObject keyBase64Map = new JsonObject();
JsonObject keyMetadataMap = new JsonObject();
diff --git
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
index b9f5c8a..45aa0c7 100644
---
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
+++
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
@@ -20,16 +20,22 @@ package org.apache.pulsar.io.kinesis;
import static java.util.Base64.getDecoder;
+import com.google.gson.Gson;
+
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
-import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.kinesis.fbs.KeyValue;
import org.apache.pulsar.io.kinesis.fbs.Message;
import org.testng.Assert;
@@ -37,12 +43,6 @@ import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import org.testng.collections.Maps;
-import com.google.gson.Gson;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-
/**
* Unit test of {@link UtilsTest}.
*/
@@ -52,7 +52,7 @@ public class UtilsTest {
public Object[][] encryptionProvider() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}
-
+
@Test
public void testJsonSerialization() throws Exception {
@@ -75,9 +75,9 @@ public class UtilsTest {
Map<String, String> metadata2 = Maps.newHashMap();
metadata2.put("version", "v2");
metadata2.put("ckms", "cmks-2");
- RecordContext recordCtx = createRecordContext(algo, keyNames,
keyValues, param.getBytes(), metadata1, metadata2,
+ Record<byte[]> recordCtx = createRecord(data, algo, keyNames,
keyValues, param.getBytes(), metadata1, metadata2,
batchSize, compressionMsgSize, properties, true);
- String json = Utils.serializeRecordToJson(recordCtx, data);
+ String json = Utils.serializeRecordToJson(recordCtx);
// deserialize from json and assert
KinesisMessageResponse kinesisJsonResponse =
deSerializeRecordFromJson(json);
@@ -118,9 +118,9 @@ public class UtilsTest {
Map<String, String> metadata2 = Maps.newHashMap();
metadata2.put("version", "v2");
metadata2.put("ckms", "cmks-2");
- RecordContext recordCtx = createRecordContext(algo, keyNames,
keyValues, param.getBytes(), metadata1,
+ Record<byte[]> record = createRecord(data, algo, keyNames,
keyValues, param.getBytes(), metadata1,
metadata2, batchSize, compressionMsgSize, properties,
isEncryption);
- ByteBuffer flatBuffer =
Utils.serializeRecordToFlatBuffer(recordCtx, data);
+ ByteBuffer flatBuffer = Utils.serializeRecordToFlatBuffer(record);
Message kinesisJsonResponse = Message.getRootAsMessage(flatBuffer);
byte[] fbPayloadBytes = new
byte[kinesisJsonResponse.payloadLength()];
@@ -164,7 +164,7 @@ public class UtilsTest {
Assert.assertEquals(param.getBytes(), paramBytes);
Assert.assertEquals(algo, encryptionCtxDeser.algo());
}
-
+
Map<String, String> fbproperties = Maps.newHashMap();
for (int i = 0; i < kinesisJsonResponse.propertiesLength(); i++) {
KeyValue property = kinesisJsonResponse.properties(i);
@@ -175,7 +175,7 @@ public class UtilsTest {
}
}
- private RecordContext createRecordContext(String algo, String[] keyNames,
byte[][] keyValues, byte[] param,
+ private Record<byte[]> createRecord(byte[] data, String algo, String[]
keyNames, byte[][] keyValues, byte[] param,
Map<String, String> metadata1, Map<String, String> metadata2, int
batchSize, int compressionMsgSize,
Map<String, String> properties, boolean isEncryption) {
EncryptionContext ctx = null;
@@ -198,14 +198,16 @@ public class UtilsTest {
ctx.setKeys(keys);
ctx.setParam(param);
}
- return new RecordContextImpl(properties, Optional.ofNullable(ctx));
+ return new RecordImpl(data, properties, Optional.ofNullable(ctx));
}
- class RecordContextImpl implements RecordContext {
+ class RecordImpl implements Record<byte[]> {
+ byte[] data;
Map<String, String> properties;
Optional<EncryptionContext> ectx;
- public RecordContextImpl(Map<String, String> properties,
Optional<EncryptionContext> ectx) {
+ public RecordImpl(byte[] data, Map<String, String> properties,
Optional<EncryptionContext> ectx) {
+ this.data = data;
this.properties = properties;
this.ectx = ectx;
}
@@ -217,6 +219,16 @@ public class UtilsTest {
public Optional<EncryptionContext> getEncryptionCtx() {
return ectx;
}
+
+ @Override
+ public Optional<String> getKey() {
+ return Optional.empty();
+ }
+
+ @Override
+ public byte[] getValue() {
+ return data;
+ }
}
public static KinesisMessageResponse deSerializeRecordFromJson(String
jsonRecord) {
diff --git
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index dab3fe9..c05828f 100644
---
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -25,15 +25,19 @@ import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+
+import lombok.Data;
+
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.Record;
import org.apache.pulsar.io.core.SourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.Map;
-
/**
* A simple connector to consume messages from a RabbitMQ queue
*/
@@ -82,20 +86,13 @@ public class RabbitMQSource extends PushSource<byte[]> {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
- source.consume(new RabbitMQRecord(body));
+ source.consume(new
RabbitMQRecord(Optional.ofNullable(envelope.getRoutingKey()), body));
}
}
+ @Data
static private class RabbitMQRecord implements Record<byte[]> {
- private final byte[] data;
-
- public RabbitMQRecord(byte[] data) {
- this.data = data;
- }
-
- @Override
- public byte[] getValue() {
- return data;
- }
+ private final Optional<String> key;
+ private final byte[] value;
}
}
\ No newline at end of file
diff --git
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index 56c52e7..494ae48 100644
---
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -33,6 +33,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Map;
+import java.util.Optional;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.Record;
@@ -166,6 +167,12 @@ public class TwitterFireHose extends PushSource<String> {
}
@Override
+ public Optional<String> getKey() {
+ // TODO: Could use user or tweet ID as key here
+ return Optional.empty();
+ }
+
+ @Override
public String getValue() {
return tweet;
}