merlimat closed pull request #2116: Added optional key in pulsar IO
URL: https://github.com/apache/incubator-pulsar/pull/2116
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index bda7f6526f..edfb1c4459 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -44,7 +44,6 @@
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.io.core.RecordContext;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 504cc6161b..5ea8bfef5e 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -20,6 +20,13 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.common.collect.Maps;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
@@ -32,22 +39,15 @@
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 
-import com.google.common.collect.Maps;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-
-public class MessageImpl<T> extends MessageRecordImpl<T, MessageId> {
+public class MessageImpl<T> implements Message<T> {
 
+    protected MessageId messageId;
     private MessageMetadata.Builder msgMetadataBuilder;
     private ClientCnx cnx;
     private ByteBuf payload;
@@ -86,7 +86,7 @@
             Schema<T> schema) {
         this(messageId, msgMetadata, payload, null, cnx, schema);
     }
-    
+
     MessageImpl(MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf 
payload,
             Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, 
Schema<T> schema) {
         this.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
@@ -330,7 +330,7 @@ public boolean hasReplicateTo() {
     void setMessageId(MessageIdImpl messageId) {
         this.messageId = messageId;
     }
-    
+
     @Override
     public Optional<EncryptionContext> getEncryptionCtx() {
         return encryptionCtx;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index f7f1f9b964..4f5ac13f7d 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -26,10 +26,11 @@
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.EncryptionContext;
 
-public class TopicMessageImpl<T> extends MessageRecordImpl<T, 
TopicMessageIdImpl> {
+public class TopicMessageImpl<T> implements Message<T> {
 
     private final String topicName;
     private final Message<T> msg;
+    private final TopicMessageIdImpl messageId;
 
     TopicMessageImpl(String topicName,
                      Message<T> msg) {
@@ -109,9 +110,9 @@ public String getKey() {
     public T getValue() {
         return msg.getValue();
     }
-    
+
     @Override
     public Optional<EncryptionContext> getEncryptionCtx() {
-        return (msg instanceof MessageImpl) ? ((MessageImpl) 
msg).getEncryptionCtx() : Optional.empty();
+        return msg.getEncryptionCtx();
     }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
deleted file mode 100644
index 7df373acf9..0000000000
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import static org.mockito.Mockito.CALLS_REAL_METHODS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertSame;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.pulsar.client.api.MessageId;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-public class MessageRecordImplTest {
-
-    private MessageRecordImpl<byte[], MessageId> message;
-    private MessageId messageId;
-
-    @BeforeMethod
-    public void setup() {
-        this.messageId = mock(MessageId.class);
-        this.message = mock(MessageRecordImpl.class, CALLS_REAL_METHODS);
-        when(message.getMessageId()).thenReturn(messageId);
-    }
-
-    @Test
-    public void testAck() throws Exception {
-        final AtomicReference<MessageId> msgIdRef = new AtomicReference<>();
-        final CountDownLatch ackLatch = new CountDownLatch(1);
-
-        this.message.setAckFunction(msgId -> {
-            msgIdRef.set(msgId);
-            ackLatch.countDown();
-        });
-
-        this.message.ack();
-        ackLatch.await();
-
-        assertSame(messageId, msgIdRef.get());
-    }
-
-}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index ba0de385b2..4c29232b85 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -324,7 +324,7 @@ private void processResult(Record srcRecord,
 
     private void sendOutputMessage(Record srcRecord, Object output) {
         try {
-            this.sink.write(srcRecord, output);
+            this.sink.write(new SinkRecord<>(srcRecord, output));
         } catch (Exception e) {
             log.info("Encountered exception in sink write: ", e);
             throw new RuntimeException(e);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
similarity index 53%
rename from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
rename to 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index ea1d892c18..4e2edbec23 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -16,65 +16,66 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.functions.instance;
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.io.core.Record;
 
-/**
- * Abstract class that implements message api and connect record api.
- */
-public abstract class MessageRecordImpl<T, M extends MessageId> implements 
Message<T>, Record<T> {
+@Data
+@AllArgsConstructor
+public class SinkRecord<T> implements Record<T> {
 
-    protected M messageId;
-    protected java.util.function.Consumer<M> ackFunction;
+    private final Record<T> sourceRecord;
+    private final T value;
 
-    public void setAckFunction(java.util.function.Consumer<M> ackFunction) {
-        this.ackFunction = ackFunction;
+    public Record<T> getSourceRecord() {
+        return sourceRecord;
     }
 
     @Override
-    public String getPartitionId() {
-        if (null != messageId) {
-            if (messageId instanceof MessageIdImpl) {
-                return String.valueOf(((MessageIdImpl) 
messageId).getPartitionIndex());
-            } else {
-                return "";
-            }
-        }
-        return "";
+    public Optional<String> getKey() {
+        return sourceRecord.getKey();
     }
 
     @Override
-    public long getRecordSequence() {
-        return getSequenceId();
+    public T getValue() {
+        return value;
     }
 
     @Override
-    public void ack() {
-        if (null != ackFunction) {
-            ackFunction.accept((M) getMessageId());
-        }
+    public Optional<String> getPartitionId() {
+        return sourceRecord.getPartitionId();
     }
 
     @Override
-    public void fail() {
-        // no-op
+    public Optional<Long> getRecordSequence() {
+        return sourceRecord.getRecordSequence();
+    }
+
+    @Override
+    public Optional<EncryptionContext> getEncryptionCtx() {
+        return sourceRecord.getEncryptionCtx();
     }
 
     @Override
     public Map<String, String> getProperties() {
-        return Collections.emptyMap();
+        return sourceRecord.getProperties();
     }
 
     @Override
-    public Optional<EncryptionContext> getEncryptionCtx() {
-        return Optional.empty();
+    public void ack() {
+        sourceRecord.ack();
+    }
+
+    @Override
+    public void fail() {
+        sourceRecord.fail();
     }
+
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 7aebd93b1f..2170879ce3 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -19,10 +19,13 @@
 package org.apache.pulsar.functions.sink;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Base64;
+import java.util.Map;
+
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
@@ -35,18 +38,18 @@
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.InstanceUtils;
+import org.apache.pulsar.functions.instance.SinkRecord;
 import 
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
 import 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
 import org.apache.pulsar.functions.instance.producers.Producers;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 
-import java.util.Base64;
-import java.util.Map;
+import net.jodah.typetools.TypeResolver;
 
 @Slf4j
 public class PulsarSink<T> implements Sink<T> {
@@ -57,16 +60,16 @@
 
     private PulsarSinkProcessor pulsarSinkProcessor;
 
-    private interface PulsarSinkProcessor {
+    private interface PulsarSinkProcessor<T> {
         void initializeOutputProducer(String outputTopic) throws Exception;
 
         void sendOutputMessage(MessageBuilder outputMsgBuilder,
-                               RecordContext recordContext) throws Exception;
+                               Record<T> recordContext) throws Exception;
 
         void close() throws Exception;
     }
 
-    private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor 
{
+    private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor<T> {
         private Producer<byte[]> producer;
 
         @Override
@@ -77,7 +80,7 @@ public void initializeOutputProducer(String outputTopic) 
throws Exception {
 
         @Override
         public void sendOutputMessage(MessageBuilder outputMsgBuilder,
-                                      RecordContext recordContext) throws 
Exception {
+                                      Record<T> recordContext) throws 
Exception {
             Message<byte[]> outputMsg = outputMsgBuilder.build();
             this.producer.sendAsync(outputMsg);
         }
@@ -94,7 +97,7 @@ public void close() throws Exception {
         }
     }
 
-    private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
+    private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor<T> {
         private Producer<byte[]> producer;
 
         @Override
@@ -105,7 +108,7 @@ public void initializeOutputProducer(String outputTopic) 
throws Exception {
 
         @Override
         public void sendOutputMessage(MessageBuilder outputMsgBuilder,
-                                      RecordContext recordContext) throws 
Exception {
+                                      Record<T> recordContext) throws 
Exception {
             Message<byte[]> outputMsg = outputMsgBuilder.build();
             this.producer.sendAsync(outputMsg).thenAccept(messageId -> 
recordContext.ack());
         }
@@ -122,7 +125,7 @@ public void close() throws Exception {
         }
     }
 
-    private class PulsarSinkEffectivelyOnceProcessor implements 
PulsarSinkProcessor, ConsumerEventListener {
+    private class PulsarSinkEffectivelyOnceProcessor implements 
PulsarSinkProcessor<T>, ConsumerEventListener {
 
         @Getter(AccessLevel.PACKAGE)
         protected Producers outputProducer;
@@ -134,15 +137,16 @@ public void initializeOutputProducer(String outputTopic) 
throws Exception {
         }
 
         @Override
-        public void sendOutputMessage(MessageBuilder outputMsgBuilder, 
RecordContext recordContext)
+        public void sendOutputMessage(MessageBuilder outputMsgBuilder, 
Record<T> recordContext)
                 throws Exception {
 
             // assign sequence id to output message for idempotent producing
-            outputMsgBuilder = outputMsgBuilder
-                    .setSequenceId(recordContext.getRecordSequence());
+            if (recordContext.getRecordSequence().isPresent()) {
+                
outputMsgBuilder.setSequenceId(recordContext.getRecordSequence().get());
+            }
 
             // currently on PulsarRecord
-            Producer producer = 
outputProducer.getProducer(recordContext.getPartitionId());
+            Producer producer = 
outputProducer.getProducer(recordContext.getPartitionId().get());
 
             org.apache.pulsar.client.api.Message outputMsg = 
outputMsgBuilder.build();
             producer.sendAsync(outputMsg)
@@ -211,29 +215,37 @@ public void open(Map<String, Object> config, SinkContext 
sinkContext) throws Exc
     }
 
     @Override
-    public void write(RecordContext recordContext, T value) throws Exception {
+    public void write(Record<T> record) throws Exception {
 
         byte[] output;
         try {
-            output = this.outputSerDe.serialize(value);
+            output = this.outputSerDe.serialize(record.getValue());
         } catch (Exception e) {
             //TODO Add serialization exception stats
             throw new RuntimeException("Error occured when attempting to 
serialize output:", e);
         }
+
         MessageBuilder msgBuilder = MessageBuilder.create();
+        if (record.getKey().isPresent()) {
+            msgBuilder.setKey(record.getKey().get());
+        }
+
         msgBuilder.setContent(output);
-        if (recordContext instanceof PulsarRecord) {
-            PulsarRecord pulsarRecord = (PulsarRecord) recordContext;
+
+        if (!record.getProperties().isEmpty()) {
+            msgBuilder.setProperties(record.getProperties());
+        }
+
+        SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
+        if (sinkRecord.getSourceRecord() instanceof PulsarRecord) {
+            PulsarRecord<T> pulsarRecord = (PulsarRecord<T>) 
sinkRecord.getSourceRecord();
             // forward user properties to sink-topic
-            if (pulsarRecord.getProperties() != null) {
-                msgBuilder.setProperties(pulsarRecord.getProperties());
-            }
             msgBuilder.setProperty("__pfn_input_topic__", 
pulsarRecord.getTopicName()).setProperty(
                     "__pfn_input_msg_id__",
                     new 
String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
         }
 
-        this.pulsarSinkProcessor.sendOutputMessage(msgBuilder, recordContext);
+        this.pulsarSinkProcessor.sendOutputMessage(msgBuilder, record);
     }
 
     @Override
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 113cf82060..7a6d11b1bd 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -18,35 +18,72 @@
  */
 package org.apache.pulsar.functions.source;
 
+import java.util.Map;
+import java.util.Optional;
+
 import lombok.Builder;
-import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.ToString;
 
-import java.util.Map;
-import java.util.Optional;
-
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.Record;
 
-@Data
 @Builder
 @Getter
 @ToString
 @EqualsAndHashCode
 public class PulsarRecord<T> implements Record<T> {
 
-    private String partitionId;
-    private long recordSequence;
-    private T value;
-    private MessageId messageId;
-    private String topicName;
-    private Map<String, String> properties;
-    private Optional<EncryptionContext> encryptionCtx = Optional.empty();
-    private Runnable failFunction;
-    private Runnable ackFunction;
+    private final String topicName;
+    private final int partition;
+
+    // TODO: When we switch to schema for functions, we should just rely on 
the message object value
+    private final T value;
+    private final Message<byte[]> message;
+
+    private final Runnable failFunction;
+    private final Runnable ackFunction;
+
+    @Override
+    public Optional<String> getKey() {
+        if (message.hasKey()) {
+            return Optional.of(message.getKey());
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public Optional<String> getPartitionId() {
+        return Optional.of(String.format("%s-%s", topicName, partition));
+    }
+
+    @Override
+    public Optional<Long> getRecordSequence() {
+        return Optional.of(Utils.getSequenceId(message.getMessageId()));
+    }
+
+    /**
+     * Retrieves encryption-context that is attached to record.
+     *
+     * @return {@link Optional}<{@link EncryptionContext}>
+     */
+    public Optional<EncryptionContext> getEncryptionCtx() {
+        return message.getEncryptionCtx();
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return message.getProperties();
+    }
+
+    public MessageId getMessageId() {
+        return message.getMessageId();
+    }
 
     @Override
     public void ack() {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 680d754265..d5a1df9ada 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -54,7 +54,7 @@
     private boolean isTopicsPattern;
 
     @Getter
-    private org.apache.pulsar.client.api.Consumer inputConsumer;
+    private org.apache.pulsar.client.api.Consumer<byte[]> inputConsumer;
 
     public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig 
pulsarConfig) {
         this.pulsarClient = pulsarClient;
@@ -69,17 +69,17 @@ public void open(Map<String, Object> config, SourceContext 
sourceContext) throws
         // Setup pulsar consumer
         ConsumerBuilder<byte[]> consumerBuilder = 
this.pulsarClient.newConsumer()
                 //consume message even if can't decrypt and deliver it along 
with encryption-ctx
-                .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)  
+                .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
                 
.subscriptionName(this.pulsarSourceConfig.getSubscriptionName())
                 
.subscriptionType(this.pulsarSourceConfig.getSubscriptionType());
 
         if(isNotBlank(this.pulsarSourceConfig.getTopicsPattern())) {
-            
consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern());    
+            
consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern());
             isTopicsPattern = true;
         }else {
-            consumerBuilder.topics(new 
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()));    
+            consumerBuilder.topics(new 
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()));
         }
-        
+
         if (pulsarSourceConfig.getTimeoutMs() != null) {
             consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), 
TimeUnit.MILLISECONDS);
         }
@@ -88,21 +88,21 @@ public void open(Map<String, Object> config, SourceContext 
sourceContext) throws
 
     @Override
     public Record<T> read() throws Exception {
-        org.apache.pulsar.client.api.Message<T> message = 
this.inputConsumer.receive();
+        org.apache.pulsar.client.api.Message<byte[]> message = 
this.inputConsumer.receive();
 
         String topicName;
-        String partitionId;
+        int partitionId;
 
         // If more than one topics are being read than the Message return by 
the consumer will be TopicMessageImpl
         // If there is only topic being read then the Message returned by the 
consumer wil be MessageImpl
         if (message instanceof TopicMessageImpl) {
-            topicName = ((TopicMessageImpl) message).getTopicName();
+            topicName = ((TopicMessageImpl<?>) message).getTopicName();
             TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) 
message.getMessageId();
             MessageIdImpl messageId = (MessageIdImpl) 
topicMessageId.getInnerMessageId();
-            partitionId = Long.toString(messageId.getPartitionIndex());
+            partitionId = messageId.getPartitionIndex();
         } else {
             topicName = 
this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet().iterator().next();
-            partitionId = Long.toString(((MessageIdImpl) 
message.getMessageId()).getPartitionIndex());
+            partitionId = ((MessageIdImpl) 
message.getMessageId()).getPartitionIndex();
         }
 
         Object object;
@@ -130,14 +130,10 @@ public void open(Map<String, Object> config, 
SourceContext sourceContext) throws
             throw new RuntimeException("Error in casting input to expected 
type:", e);
         }
 
-        PulsarRecord<T> pulsarMessage = (PulsarRecord<T>) 
PulsarRecord.builder()
+        return PulsarRecord.<T>builder()
                 .value(input)
-                .messageId(message.getMessageId())
-                .partitionId(String.format("%s-%s", topicName, partitionId))
-                .recordSequence(Utils.getSequenceId(message.getMessageId()))
+                .message(message)
                 .topicName(topicName)
-                .properties(message.getProperties())
-                .encryptionCtx(message.getEncryptionCtx())
                 .ackFunction(() -> {
                     if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                         inputConsumer.acknowledgeCumulativeAsync(message);
@@ -150,7 +146,6 @@ public void open(Map<String, Object> config, SourceContext 
sourceContext) throws
                     }
                 })
                 .build();
-        return pulsarMessage;
     }
 
     @Override
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 0bac0a68b2..415912a7eb 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -29,6 +29,9 @@
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.testng.Assert.assertEquals;
 
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -65,7 +68,7 @@
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -77,9 +80,6 @@
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-import com.google.gson.Gson;
-
 /**
  * Unit test of {@link FunctionApiV2Resource}.
  */
@@ -98,7 +98,7 @@ public String process(String input, Context context) throws 
Exception {
             return input;
         }
     }
-    
+
     public static final class TestSink implements Sink<byte[]> {
 
         @Override
@@ -110,7 +110,7 @@ public void open(Map config, SinkContext sinkContext) 
throws Exception {
         }
 
         @Override
-        public void write(RecordContext inputRecordContext, byte[] value) 
throws Exception {
+        public void write(Record<byte[]> record) throws Exception {
         }
     }
 
@@ -670,7 +670,7 @@ public void testUpdateFunctionWithUrl() throws IOException {
 
         String fileLocation = 
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         String filePackageUrl = "file://" + fileLocation;
-        
+
         SinkSpec sinkSpec = SinkSpec.newBuilder()
                 .setTopic(outputTopic)
                 .setSerDeClassName(outputSerdeClassName).build();
@@ -681,14 +681,14 @@ public void testUpdateFunctionWithUrl() throws 
IOException {
                 .setParallelism(parallelism)
                 
.setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
                         
.putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
-        
+
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(function))).thenReturn(true);
         RequestResult rr = new RequestResult()
                 .setSuccess(true)
                 .setMessage("function registered");
             CompletableFuture<RequestResult> requestResult = 
CompletableFuture.completedFuture(rr);
             
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-        
+
         Response response = resource.updateFunction(
             tenant,
             namespace,
@@ -697,10 +697,10 @@ public void testUpdateFunctionWithUrl() throws 
IOException {
             null,
             filePackageUrl,
             
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
-        
+
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
-    
+
     @Test
     public void testUpdateFunctionFailure() throws Exception {
         mockStatic(Utils.class);
@@ -990,7 +990,7 @@ public void testListFunctionsSuccess() throws Exception {
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
         assertEquals(new Gson().toJson(functions), response.getEntity());
     }
-    
+
     @Test
     public void testDownloadFunctionHttpUrl() throws Exception {
         String jarHttpUrl = 
"http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";;
@@ -1006,7 +1006,7 @@ public void testDownloadFunctionHttpUrl() throws 
Exception {
             pkgFile.delete();
         }
     }
-    
+
     @Test
     public void testDownloadFunctionFile() throws Exception {
         String fileLocation = 
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
@@ -1022,7 +1022,7 @@ public void testDownloadFunctionFile() throws Exception {
             pkgFile.delete();
         }
     }
-    
+
     @Test
     public void testRegisterFunctionFileUrlWithValidSinkClass() throws 
IOException {
         Configurator.setRootLevel(Level.DEBUG);
diff --git 
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
 
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
index ba296161fd..6ea657e659 100644
--- 
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
+++ 
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
@@ -31,22 +31,23 @@
 import com.aerospike.client.listener.WriteListener;
 import com.aerospike.client.policy.ClientPolicy;
 import com.aerospike.client.policy.WritePolicy;
-import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.SimpleSink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingDeque;
 
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * A Simple abstract class for Aerospike sink
  * Users need to implement extractKeyValue function to use this sink
  */
-public abstract class AerospikeAbstractSink<K, V> extends SimpleSink<byte[]> {
+public abstract class AerospikeAbstractSink<K, V> implements Sink<byte[]> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AerospikeAbstractSink.class);
 
@@ -55,6 +56,7 @@
     private AerospikeClient client;
     private WritePolicy writePolicy;
     private BlockingQueue<AWriteListener> queue;
+    private NioEventLoops eventLoops;
     private EventLoop eventLoop;
 
     @Override
@@ -74,18 +76,25 @@ public void open(Map<String, Object> config, SinkContext 
sinkContext) throws Exc
         for (int i = 0; i < aerospikeSinkConfig.getMaxConcurrentRequests(); 
++i) {
             queue.put(new AWriteListener(queue));
         }
-        eventLoop = new NioEventLoops(new EventPolicy(), 1).next();
+
+        eventLoops = new NioEventLoops(new EventPolicy(), 1);
+        eventLoop = eventLoops.next();
     }
 
     @Override
     public void close() throws Exception {
-        client.close();
+        if (client != null) {
+            client.close();
+        }
+
+        if (eventLoops != null) {
+            eventLoops.close();
+        }
         LOG.info("Connection Closed");
     }
 
     @Override
-    public CompletableFuture<Void> write(byte[] record) {
-        CompletableFuture<Void> future = new CompletableFuture<>();
+    public void write(Record<byte[]> record) {
         KeyValue<K, V> keyValue = extractKeyValue(record);
         Key key = new Key(aerospikeSinkConfig.getKeyspace(), 
aerospikeSinkConfig.getKeySet(), keyValue.getKey().toString());
         Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), 
Value.getAsBlob(keyValue.getValue()));
@@ -93,12 +102,11 @@ public void close() throws Exception {
         try {
             listener = queue.take();
         } catch (InterruptedException ex) {
-            future.completeExceptionally(ex);
-            return future;
+            record.fail();
+            return;
         }
-        listener.setFuture(future);
+        listener.setContext(record);
         client.put(eventLoop, listener, writePolicy, key, bin);
-        return future;
     }
 
     private void createClient() {
@@ -121,21 +129,21 @@ private void createClient() {
     }
 
     private class AWriteListener implements WriteListener {
-        private CompletableFuture<Void> future;
+        private Record<byte[]> context;
         private BlockingQueue<AWriteListener> queue;
 
         public AWriteListener(BlockingQueue<AWriteListener> queue) {
             this.queue = queue;
         }
 
-        public void setFuture(CompletableFuture<Void> future) {
-            this.future = future;
+        public void setContext(Record<byte[]> record) {
+            this.context = record;
         }
 
         @Override
         public void onSuccess(Key key) {
-            if (future != null) {
-                future.complete(null);
+            if (context != null) {
+                context.ack();
             }
             try {
                 queue.put(this);
@@ -146,8 +154,8 @@ public void onSuccess(Key key) {
 
         @Override
         public void onFailure(AerospikeException e) {
-            if (future != null) {
-                future.completeExceptionally(e);
+            if (context != null) {
+                context.fail();
             }
             try {
                 queue.put(this);
@@ -157,5 +165,5 @@ public void onFailure(AerospikeException e) {
         }
     }
 
-    public abstract KeyValue<K, V> extractKeyValue(byte[] message);
+    public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> message);
 }
\ No newline at end of file
diff --git 
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
 
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
index 8bf0174603..40ffd9035b 100644
--- 
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
+++ 
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.io.aerospike;
 
 import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Record;
 
 /**
  * Aerospike sink that treats incoming messages on the input topic as Strings
@@ -27,7 +28,8 @@
  */
 public class AerospikeStringSink extends AerospikeAbstractSink<String, String> 
{
     @Override
-    public KeyValue<String, String> extractKeyValue(byte[] message) {
-        return new KeyValue<>(new String(message), new String(message));
+    public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
+        String key = record.getKey().orElseGet(() -> new 
String(record.getValue()));
+        return new KeyValue<>(key, new String(record.getValue()));
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
 
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
index c8a9ca51a3..fd76c0c630 100644
--- 
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
+++ 
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
@@ -29,17 +29,17 @@
 import com.google.common.util.concurrent.Futures;
 
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.SimpleSink;
+import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 
 /**
  * A Simple abstract class for Cassandra sink
  * Users need to implement extractKeyValue function to use this sink
  */
-public abstract class CassandraAbstractSink<K, V> extends SimpleSink<byte[]> {
+public abstract class CassandraAbstractSink<K, V> implements Sink<byte[]> {
 
     // ----- Runtime fields
     private Cluster cluster;
@@ -69,24 +69,22 @@ public void close() throws Exception {
     }
 
     @Override
-    public CompletableFuture<Void> write(byte[] record) {
+    public void write(Record<byte[]> record) {
         KeyValue<K, V> keyValue = extractKeyValue(record);
         BoundStatement bound = statement.bind(keyValue.getKey(), 
keyValue.getValue());
         ResultSetFuture future = session.executeAsync(bound);
-        CompletableFuture<Void> completable = new CompletableFuture<Void>();
         Futures.addCallback(future,
                 new FutureCallback<ResultSet>() {
                     @Override
                     public void onSuccess(ResultSet result) {
-                        completable.complete(null);
+                        record.ack();
                     }
 
                     @Override
                     public void onFailure(Throwable t) {
-                        completable.completeExceptionally(t);
+                        record.fail();
                     }
                 });
-        return completable;
     }
 
     private void createClient(String roots) {
@@ -107,5 +105,5 @@ private void createClient(String roots) {
         session.execute("USE " + cassandraSinkConfig.getKeyspace());
     }
 
-    public abstract KeyValue<K, V> extractKeyValue(byte[] message);
+    public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> record);
 }
\ No newline at end of file
diff --git 
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
 
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
index 5c0cb16c84..c2e72b8b3e 100644
--- 
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
+++ 
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.io.cassandra;
 
 import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Record;
 
 /**
  * Cassandra sink that treats incoming messages on the input topic as Strings
@@ -27,7 +28,8 @@
  */
 public class CassandraStringSink extends CassandraAbstractSink<String, String> 
{
     @Override
-    public KeyValue<String, String> extractKeyValue(byte[] message) {
-        return new KeyValue<>(new String(message), new String(message));
+    public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
+        String key = record.getKey().orElseGet(() -> new 
String(record.getValue()));
+        return new KeyValue<>(key, new String(record.getValue()));
     }
 }
\ No newline at end of file
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
index 08c78bfdef..504d5f4f6b 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
@@ -18,14 +18,74 @@
  */
 package org.apache.pulsar.io.core;
 
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.pulsar.common.api.EncryptionContext;
+
 /**
- * Pulsar Connect's Record interface. Record encapsulates the
- * information about a record being read from a Source.
+ * Pulsar Connect's Record interface. Record encapsulates the information 
about a record being read from a Source.
  */
-public interface Record<T> extends RecordContext {
+public interface Record<T> {
+
+    /**
+     * Return a key if the key has one associated
+     */
+    Optional<String> getKey();
+
     /**
      * Retrieves the actual data of the record
+     *
      * @return The record data
      */
     T getValue();
+
+    /**
+     * Retrieves the partition information if any of the record.
+     *
+     * @return The partition id where the
+     */
+    default Optional<String> getPartitionId() {
+        return Optional.empty();
+    }
+
+    /**
+     * Retrieves the sequence of the record from a source partition.
+     *
+     * @return Sequence Id associated with the record
+     */
+    default Optional<Long> getRecordSequence() {
+        return Optional.empty();
+    }
+
+    /**
+     * Retrieves encryption-context that is attached to record.
+     *
+     * @return {@link Optional}<{@link EncryptionContext}>
+     */
+    default public Optional<EncryptionContext> getEncryptionCtx() {
+        return Optional.empty();
+    }
+
+    /**
+     * Retrieves user-defined properties attached to record.
+     *
+     * @return Map of user-properties
+     */
+    default Map<String, String> getProperties() {
+        return Collections.emptyMap();
+    }
+
+    /**
+     * Acknowledge that this record is fully processed
+     */
+    default void ack() {
+    }
+
+    /**
+     * To indicate that this record has failed to be processed
+     */
+    default void fail() {
+    }
 }
diff --git 
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
deleted file mode 100644
index 5e1212562b..0000000000
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.core;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Optional;
-
-import org.apache.pulsar.common.api.EncryptionContext;
-
-/**
- * A source context that can be used by the runtime to interact with source.
- */
-public interface RecordContext {
-
-    /**
-     * Retrieves the partition information if any of the record.
-     * @return The partition id where the
-     */
-    default String getPartitionId() { return null; }
-
-    /**
-     * Retrieves the sequence of the record from a source partition.
-     * @return Sequence Id associated with the record
-     */
-    default long getRecordSequence() { return -1L; }
-    
-    /**
-     * Retrieves user-properties attached to record. 
-     * 
-     * @return Map of user-properties
-     */
-    default Map<String, String> getProperties() { return 
Collections.emptyMap();}
-    
-    /**
-     * Retrieves encryption-context that is attached to record. 
-     * 
-     * @return {@link Optional}<{@link EncryptionContext}>
-     */
-    default Optional<EncryptionContext> getEncryptionCtx() { return 
Optional.empty();}
-
-    /**
-     * Acknowledge that this record is fully processed
-     */
-    default void ack() {}
-
-    /**
-     * To indicate that this record has failed to be processed
-     */
-    default void fail() {}
-
-}
diff --git 
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
deleted file mode 100644
index 352a0593a4..0000000000
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.core;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * A simpler version of the Sink interface users can extend for use cases to
- * don't require fine grained delivery control
- */
-public abstract class SimpleSink<T> implements Sink<T> {
-
-    @Override
-    public void write(RecordContext inputRecordContext, T value) throws 
Exception {
-        write(value)
-                .thenAccept(ignored -> inputRecordContext.ack())
-                .exceptionally(cause -> {
-                    inputRecordContext.fail();
-                    return null;
-                });
-    }
-
-    /**
-     * Attempt to publish a type safe collection of messages
-     *
-     * @param value output value
-     * @return Completable future fo async publish request
-     */
-    public abstract CompletableFuture<Void> write(T value);
-}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
index 31a044b7f2..51d635ac31 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
@@ -23,7 +23,7 @@
 /**
  * Generic sink interface users can implement to run Sink on top of Pulsar 
Functions
  */
-public interface Sink<T> extends AutoCloseable{
+public interface Sink<T> extends AutoCloseable {
     /**
      * Open connector with configuration
      *
@@ -32,12 +32,12 @@
      * @throws Exception IO type exceptions when opening a connector
      */
     void open(final Map<String, Object> config, SinkContext sinkContext) 
throws Exception;
-    
+
     /**
      * Write a message to Sink
-     * @param inputRecordContext Context of value
-     * @param value value to write to sink
+     * @param inputRecordContext Context of input record from the source
+     * @param record record to write to sink
      * @throws Exception
      */
-    void write(RecordContext inputRecordContext, T value) throws Exception;
+    void write(Record<T> record) throws Exception;
 }
diff --git 
a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java 
b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
index 4fb4a7d4e1..bb1100cf80 100644
--- a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
+++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
@@ -38,7 +38,7 @@ public void open(Map<String, Object> config, SinkContext 
sinkContext) throws Exc
         }
 
         @Override
-        public void write(RecordContext inputRecordContext, String value) 
throws Exception {
+        public void write(Record<String> record) throws Exception {
 
         }
 
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index c597489adc..d6acae83c7 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -19,47 +19,45 @@
 
 package org.apache.pulsar.io.kafka;
 
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.SimpleSink;
+import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
  * A Simple abstract class for Kafka sink
  * Users need to implement extractKeyValue function to use this sink
  */
-public abstract class KafkaAbstractSink<K, V> extends SimpleSink<byte[]> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaAbstractSink.class);
+@Slf4j
+public abstract class KafkaAbstractSink<K, V> implements Sink<byte[]> {
 
     private Producer<K, V> producer;
     private Properties props = new Properties();
     private KafkaSinkConfig kafkaSinkConfig;
 
     @Override
-    public CompletableFuture<Void> write(byte[] message) {
-        KeyValue<K, V> keyValue = extractKeyValue(message);
+    public void write(Record<byte[]> sourceRecord) {
+        KeyValue<K, V> keyValue = extractKeyValue(sourceRecord);
         ProducerRecord<K, V> record = new 
ProducerRecord<>(kafkaSinkConfig.getTopic(), keyValue.getKey(), 
keyValue.getValue());
-        LOG.debug("Record sending to kafka, record={}.", record);
-        Future f = producer.send(record);
-        return CompletableFuture.supplyAsync(() -> {
-            try {
-                f.get();
-                return null;
-            } catch (InterruptedException|ExecutionException e) {
-                throw new RuntimeException(e);
+        if (log.isDebugEnabled()) {
+            log.debug("Record sending to kafka, record={}.", record);
+        }
+
+        producer.send(record, (metadata, exception) -> {
+            if (exception == null) {
+                sourceRecord.ack();
+            } else {
+                sourceRecord.fail();
             }
         });
     }
@@ -68,7 +66,7 @@
     public void close() throws IOException {
         if (producer != null) {
             producer.close();
-            LOG.info("Kafka sink stopped.");
+            log.info("Kafka sink stopped.");
         }
     }
 
@@ -93,8 +91,8 @@ public void open(Map<String, Object> config, SinkContext 
sinkContext) throws Exc
 
         producer = new KafkaProducer<>(props);
 
-        LOG.info("Kafka sink started.");
+        log.info("Kafka sink started.");
     }
 
-    public abstract KeyValue<K, V> extractKeyValue(byte[] message);
+    public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> message);
 }
\ No newline at end of file
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index fe8a6a70ef..bbafc8e5eb 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -33,6 +33,7 @@
 
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -44,7 +45,7 @@
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaAbstractSource.class);
 
-    private Consumer<byte[], byte[]> consumer;
+    private Consumer<String, byte[]> consumer;
     private Properties props;
     private KafkaSourceConfig kafkaSourceConfig;
     Thread runnerThread;
@@ -98,12 +99,12 @@ public void start() {
             consumer = new KafkaConsumer<>(props);
             consumer.subscribe(Arrays.asList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
-            ConsumerRecords<byte[], byte[]> consumerRecords;
+            ConsumerRecords<String, byte[]> consumerRecords;
             while(true){
                 consumerRecords = consumer.poll(1000);
                 CompletableFuture<?>[] futures = new 
CompletableFuture<?>[consumerRecords.count()];
                 int index = 0;
-                for (ConsumerRecord<byte[], byte[]> consumerRecord : 
consumerRecords) {
+                for (ConsumerRecord<String, byte[]> consumerRecord : 
consumerRecords) {
                     LOG.debug("Record received from kafka, key: {}. value: 
{}", consumerRecord.key(), consumerRecord.value());
                     KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, 
extractValue(consumerRecord));
                     consume(record);
@@ -125,27 +126,32 @@ public void start() {
         runnerThread.start();
     }
 
-    public abstract V extractValue(ConsumerRecord<byte[], byte[]> record);
+    public abstract V extractValue(ConsumerRecord<String, byte[]> record);
 
     static private class KafkaRecord<V> implements Record<V> {
-        private final ConsumerRecord<byte[], byte[]> record;
+        private final ConsumerRecord<String, byte[]> record;
         private final V value;
         @Getter
-        private final CompletableFuture<Void> completableFuture = new 
CompletableFuture();
+        private final CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
 
-        public KafkaRecord(ConsumerRecord<byte[], byte[]> record,
+        public KafkaRecord(ConsumerRecord<String, byte[]> record,
                            V value) {
             this.record = record;
             this.value = value;
         }
         @Override
-        public String getPartitionId() {
-            return Integer.toString(record.partition());
+        public Optional<String> getPartitionId() {
+            return Optional.of(Integer.toString(record.partition()));
         }
 
         @Override
-        public long getRecordSequence() {
-            return record.offset();
+        public Optional<Long> getRecordSequence() {
+            return Optional.of(record.offset());
+        }
+
+        @Override
+        public Optional<String> getKey() {
+            return Optional.ofNullable(record.key());
         }
 
         @Override
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
index 775b5bec0c..aab44748cb 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.io.kafka;
 
 import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Record;
 
 /**
  * Kafka sink that treats incoming messages on the input topic as Strings
@@ -27,7 +28,7 @@
  */
 public class KafkaStringSink extends KafkaAbstractSink<String, String> {
     @Override
-    public KeyValue<String, String> extractKeyValue(byte[] message) {
-        return new KeyValue<>(new String(message), new String(message));
+    public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
+        return new KeyValue<>(record.getKey().orElse(null), new 
String(record.getValue()));
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
index e31b75f4a3..802d943e8d 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
@@ -27,7 +27,7 @@
  */
 public class KafkaStringSource extends KafkaAbstractSource<String> {
     @Override
-    public String extractValue(ConsumerRecord<byte[], byte[]> record) {
+    public String extractValue(ConsumerRecord<String, byte[]> record) {
         return new String(record.value());
     }
 }
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index dbe1e756e5..c3b6cdcd26 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -24,22 +24,6 @@
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.pulsar.io.core.RecordContext;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
@@ -55,17 +39,32 @@
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
- * A Kinesis sink which can be configured by {@link KinesisSinkConfig}. 
- * <pre> 
+ * A Kinesis sink which can be configured by {@link KinesisSinkConfig}.
+ * <pre>
  * {@link KinesisSinkConfig} accepts
  * 1. <b>awsEndpoint:</b> kinesis end-point url can be found at : 
https://docs.aws.amazon.com/general/latest/gr/rande.html
  * 2. <b>awsRegion:</b> appropriate aws region eg: us-west-1, us-west-2
  * 3. <b>awsKinesisStreamName:</b> kinesis stream name
  * 4. <b>awsCredentialPluginName:</b> Fully-Qualified class name of 
implementation of {@link AwsCredentialProviderPlugin}.
  *    - It is a factory class which creates an {@link AWSCredentialsProvider} 
that will be used by {@link KinesisProducer}
- *    - If it is empty then {@link KinesisSink} creates default {@link 
AWSCredentialsProvider} 
- *      which accepts json-map of credentials in awsCredentialPluginParam 
+ *    - If it is empty then {@link KinesisSink} creates default {@link 
AWSCredentialsProvider}
+ *      which accepts json-map of credentials in awsCredentialPluginParam
  *      eg: awsCredentialPluginParam = 
{"accessKey":"my-access-key","secretKey":"my-secret-key"}
  * 5. <b>awsCredentialPluginParam:</b> json-parameters to initialize {@link 
AwsCredentialProviderPlugin}
  * 6. messageFormat: enum:["ONLY_RAW_PAYLOAD","FULL_MESSAGE_IN_JSON"]
@@ -76,9 +75,9 @@
  *   Example:
  *   
{"payloadBase64":"cGF5bG9hZA==","properties":{"prop1":"value"},"encryptionCtx":{"keysMapBase64":{"key1":"dGVzdDE=","key2":"dGVzdDI="},"keysMetadataMap":{"key1":{"ckms":"cmks-1","version":"v1"},"key2":{"ckms":"cmks-2","version":"v2"}},"metadata":{"ckms":"cmks-1","version":"v1"},"encParamBase64":"cGFyYW0=","algorithm":"algo","compressionType":"LZ4","uncompressedMessageSize":10,"batchSize":10}}
  * </pre>
- * 
- * 
- * 
+ *
+ *
+ *
  */
 public class KinesisSink implements Sink<byte[]> {
 
@@ -94,20 +93,18 @@
     public static final String SECRET_KEY_NAME = "secretKey";
 
     @Override
-    public void write(RecordContext inputRecordContext, byte[] value) throws 
Exception {
-        String partitionedKey = 
StringUtils.isNotBlank(inputRecordContext.getPartitionId())
-                ? inputRecordContext.getPartitionId()
-                : defaultPartitionedKey;
+    public void write(Record<byte[]> record) throws Exception {
+        String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
         partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
                 ? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
                 : partitionedKey; // partitionedKey Length must be at least 
one, and at most 256
         ListenableFuture<UserRecordResult> addRecordResult = 
kinesisProducer.addUserRecord(this.streamName,
                 partitionedKey,
-                createKinesisMessage(kinesisSinkConfig.getMessageFormat(), 
inputRecordContext, value));
+                createKinesisMessage(kinesisSinkConfig.getMessageFormat(), 
record));
         addCallback(addRecordResult,
-                ProducerSendCallback.create(this.streamName, 
inputRecordContext, System.nanoTime()), directExecutor());
+                ProducerSendCallback.create(this.streamName, record, 
System.nanoTime()), directExecutor());
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Published message to kinesis stream {} with size {}", 
streamName, value.length);
+            LOG.debug("Published message to kinesis stream {} with size {}", 
streamName, record.getValue().length);
         }
     }
 
@@ -151,13 +148,13 @@ protected AWSCredentialsProvider 
createCredentialProvider(String awsCredentialPl
         if (isNotBlank(awsCredentialPluginName)) {
             return createCredentialProviderWithPlugin(awsCredentialPluginName, 
awsCredentialPluginParam);
         } else {
-            return defaultCredentialProvider(awsCredentialPluginParam);        
    
+            return defaultCredentialProvider(awsCredentialPluginParam);
         }
     }
 
     private static final class ProducerSendCallback implements 
FutureCallback<UserRecordResult> {
 
-        private RecordContext resultContext;
+        private Record<byte[]> resultContext;
         private String streamName;
         private long startTime = 0;
         private final Handle<ProducerSendCallback> recyclerHandle;
@@ -166,9 +163,9 @@ private ProducerSendCallback(Handle<ProducerSendCallback> 
recyclerHandle) {
             this.recyclerHandle = recyclerHandle;
         }
 
-        static ProducerSendCallback create(String streamName, RecordContext 
result, long startTime) {
+        static ProducerSendCallback create(String streamName, Record<byte[]> 
resultContext, long startTime) {
             ProducerSendCallback sendCallback = RECYCLER.get();
-            sendCallback.resultContext = result;
+            sendCallback.resultContext = resultContext;
             sendCallback.streamName = streamName;
             sendCallback.startTime = startTime;
             return sendCallback;
@@ -210,7 +207,7 @@ public void onFailure(Throwable exception) {
     /**
      * Creates a instance of credential provider which can return {@link 
AWSCredentials} or {@link BasicAWSCredentials}
      * based on IAM user/roles.
-     * 
+     *
      * @param pluginFQClassName
      * @param param
      * @return
@@ -234,7 +231,7 @@ public static AWSCredentialsProvider 
createCredentialProviderWithPlugin(String p
     /**
      * It creates a default credential provider which takes accessKey and 
secretKey form configuration and creates
      * {@link AWSCredentials}
-     * 
+     *
      * @param awsCredentialPluginParam
      * @return
      */
@@ -274,15 +271,15 @@ public void refresh() {
         };
     }
 
-    public static ByteBuffer createKinesisMessage(MessageFormat msgFormat, 
RecordContext recordCtx, byte[] data) {
+    public static ByteBuffer createKinesisMessage(MessageFormat msgFormat, 
Record<byte[]> record) {
         if (MessageFormat.FULL_MESSAGE_IN_JSON.equals(msgFormat)) {
-            return ByteBuffer.wrap(Utils.serializeRecordToJson(recordCtx, 
data).getBytes());
+            return 
ByteBuffer.wrap(Utils.serializeRecordToJson(record).getBytes());
         } else if (MessageFormat.FULL_MESSAGE_IN_FB.equals(msgFormat)) {
-            return Utils.serializeRecordToFlatBuffer(recordCtx, data);
+            return Utils.serializeRecordToFlatBuffer(record);
         } else {
             // send raw-message
-            return ByteBuffer.wrap(data);
+            return ByteBuffer.wrap(record.getValue());
         }
     }
-    
+
 }
\ No newline at end of file
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
index 469151eeb9..a7382b80e0 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
@@ -22,20 +22,21 @@
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Base64.getEncoder;
 
+import com.google.gson.JsonObject;
+
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 
 import org.apache.pulsar.common.api.EncryptionContext;
-import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.kinesis.fbs.EncryptionCtx;
 import org.apache.pulsar.io.kinesis.fbs.EncryptionKey;
 import org.apache.pulsar.io.kinesis.fbs.KeyValue;
 import org.apache.pulsar.io.kinesis.fbs.Message;
 
 import com.google.flatbuffers.FlatBufferBuilder;
-import com.google.gson.JsonObject;
 
 public class Utils {
 
@@ -49,25 +50,25 @@
     private static final String UNCPRESSED_MSG_SIZE_FIELD = 
"uncompressedMessageSize";
     private static final String BATCH_SIZE_FIELD = "batchSize";
     private static final String ENCRYPTION_CTX_FIELD = "encryptionCtx";
-    
+
     private static final FlatBufferBuilder DEFAULT_FB_BUILDER = new 
FlatBufferBuilder(0);
 
     /**
      * Serialize record to flat-buffer. it's not a thread-safe method.
-     * 
-     * @param inputRecordContext
+     *
+     * @param record
      * @param data
      * @return
      */
-    public static ByteBuffer serializeRecordToFlatBuffer(RecordContext 
inputRecordContext, byte[] data) {
+    public static ByteBuffer serializeRecordToFlatBuffer(Record<byte[]> 
record) {
         DEFAULT_FB_BUILDER.clear();
-        return serializeRecordToFlatBuffer(DEFAULT_FB_BUILDER, 
inputRecordContext, data);
+        return serializeRecordToFlatBuffer(DEFAULT_FB_BUILDER, record);
     }
-    
-    public static ByteBuffer serializeRecordToFlatBuffer(FlatBufferBuilder 
builder, RecordContext inputRecordContext, byte[] data) {
-        checkNotNull(inputRecordContext, "record-context can't be null");
-        Optional<EncryptionContext> encryptionCtx = 
inputRecordContext.getEncryptionCtx();
-        Map<String, String> properties = inputRecordContext.getProperties();
+
+    public static ByteBuffer serializeRecordToFlatBuffer(FlatBufferBuilder 
builder, Record<byte[]> record) {
+        checkNotNull(record, "record-context can't be null");
+        Optional<EncryptionContext> encryptionCtx = record.getEncryptionCtx();
+        Map<String, String> properties = record.getProperties();
 
         int encryptionCtxOffset = -1;
         int propertiesOffset = -1;
@@ -86,7 +87,7 @@ public static ByteBuffer 
serializeRecordToFlatBuffer(FlatBufferBuilder builder,
             encryptionCtxOffset = createEncryptionCtxOffset(builder, 
encryptionCtx);
         }
 
-        int payloadOffset = Message.createPayloadVector(builder, data);
+        int payloadOffset = Message.createPayloadVector(builder, 
record.getValue());
         Message.startMessage(builder);
         Message.addPayload(builder, payloadOffset);
         if (encryptionCtxOffset != -1) {
@@ -98,11 +99,11 @@ public static ByteBuffer 
serializeRecordToFlatBuffer(FlatBufferBuilder builder,
         int endMessage = Message.endMessage(builder);
         builder.finish(endMessage);
         ByteBuffer bb = builder.dataBuffer();
-        
+
         // to avoid copying of data, use same byte[] wrapped by ByteBuffer. 
But, ByteBuffer.array() returns entire array
         // so, it requires to read from offset:
         // builder.sizedByteArray()=>copies buffer: sizedByteArray(space, 
bb.capacity() - space)
-        int space = bb.capacity() - builder.offset(); 
+        int space = bb.capacity() - builder.offset();
         return ByteBuffer.wrap(bb.array(), space, bb.capacity() - space);
     }
 
@@ -133,7 +134,7 @@ private static int createEncryptionCtxOffset(final 
FlatBufferBuilder builder, Op
             EncryptionKey.addKey(builder, key);
             EncryptionKey.addValue(builder, value);
             if(metadataOffset!=-1) {
-                EncryptionKey.addMetadata(builder, metadataOffset);            
    
+                EncryptionKey.addMetadata(builder, metadataOffset);
             }
             keysOffsets[keyIndex++] = EncryptionKey.endEncryptionKey(builder);
         }
@@ -156,29 +157,31 @@ private static int createEncryptionCtxOffset(final 
FlatBufferBuilder builder, Op
         }
         return EncryptionCtx.createEncryptionCtx(builder, keysOffset, param, 
algo, compressionType,
                 ctx.getUncompressedMessageSize(), batchSize, 
ctx.getBatchSize().isPresent());
-    
+
     }
 
     /**
      * Serializes sink-record into json format. It encodes encryption-keys, 
encryption-param and payload in base64
      * format so, it can be sent in json.
-     * 
+     *
      * @param inputRecordContext
      * @param data
      * @return
      */
-    public static String serializeRecordToJson(RecordContext 
inputRecordContext, byte[] data) {
-        checkNotNull(inputRecordContext, "record-context can't be null");
+    public static String serializeRecordToJson(Record<byte[]> record) {
+        checkNotNull(record, "record can't be null");
+
         JsonObject result = new JsonObject();
-        result.addProperty(PAYLOAD_FIELD, getEncoder().encodeToString(data));
-        if (inputRecordContext.getProperties() != null) {
+        result.addProperty(PAYLOAD_FIELD, 
getEncoder().encodeToString(record.getValue()));
+        if (record.getProperties() != null) {
             JsonObject properties = new JsonObject();
-            inputRecordContext.getProperties().entrySet()
+            record.getProperties().entrySet()
                     .forEach(e -> properties.addProperty(e.getKey(), 
e.getValue()));
             result.add(PROPERTIES_FIELD, properties);
         }
-        if (inputRecordContext.getEncryptionCtx().isPresent()) {
-            EncryptionContext encryptionCtx = 
inputRecordContext.getEncryptionCtx().get();
+
+        if (record.getEncryptionCtx().isPresent()) {
+            EncryptionContext encryptionCtx = record.getEncryptionCtx().get();
             JsonObject encryptionCtxJson = new JsonObject();
             JsonObject keyBase64Map = new JsonObject();
             JsonObject keyMetadataMap = new JsonObject();
diff --git 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
index b9f5c8a6b1..45aa0c7237 100644
--- 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
+++ 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
@@ -20,16 +20,22 @@
 
 import static java.util.Base64.getDecoder;
 
+import com.google.gson.Gson;
+
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Optional;
 
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
-import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.kinesis.fbs.KeyValue;
 import org.apache.pulsar.io.kinesis.fbs.Message;
 import org.testng.Assert;
@@ -37,12 +43,6 @@
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
 
-import com.google.gson.Gson;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-
 /**
  * Unit test of {@link UtilsTest}.
  */
@@ -52,7 +52,7 @@
     public Object[][] encryptionProvider() {
         return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
     }
-    
+
     @Test
     public void testJsonSerialization() throws Exception {
 
@@ -75,9 +75,9 @@ public void testJsonSerialization() throws Exception {
         Map<String, String> metadata2 = Maps.newHashMap();
         metadata2.put("version", "v2");
         metadata2.put("ckms", "cmks-2");
-        RecordContext recordCtx = createRecordContext(algo, keyNames, 
keyValues, param.getBytes(), metadata1, metadata2,
+        Record<byte[]> recordCtx = createRecord(data, algo, keyNames, 
keyValues, param.getBytes(), metadata1, metadata2,
                 batchSize, compressionMsgSize, properties, true);
-        String json = Utils.serializeRecordToJson(recordCtx, data);
+        String json = Utils.serializeRecordToJson(recordCtx);
 
         // deserialize from json and assert
         KinesisMessageResponse kinesisJsonResponse = 
deSerializeRecordFromJson(json);
@@ -118,9 +118,9 @@ public void testFbSerialization(boolean isEncryption) 
throws Exception {
             Map<String, String> metadata2 = Maps.newHashMap();
             metadata2.put("version", "v2");
             metadata2.put("ckms", "cmks-2");
-            RecordContext recordCtx = createRecordContext(algo, keyNames, 
keyValues, param.getBytes(), metadata1,
+            Record<byte[]> record = createRecord(data, algo, keyNames, 
keyValues, param.getBytes(), metadata1,
                     metadata2, batchSize, compressionMsgSize, properties, 
isEncryption);
-            ByteBuffer flatBuffer = 
Utils.serializeRecordToFlatBuffer(recordCtx, data);
+            ByteBuffer flatBuffer = Utils.serializeRecordToFlatBuffer(record);
 
             Message kinesisJsonResponse = Message.getRootAsMessage(flatBuffer);
             byte[] fbPayloadBytes = new 
byte[kinesisJsonResponse.payloadLength()];
@@ -164,7 +164,7 @@ public void testFbSerialization(boolean isEncryption) 
throws Exception {
                 Assert.assertEquals(param.getBytes(), paramBytes);
                 Assert.assertEquals(algo, encryptionCtxDeser.algo());
             }
-            
+
             Map<String, String> fbproperties = Maps.newHashMap();
             for (int i = 0; i < kinesisJsonResponse.propertiesLength(); i++) {
                 KeyValue property = kinesisJsonResponse.properties(i);
@@ -175,7 +175,7 @@ public void testFbSerialization(boolean isEncryption) 
throws Exception {
         }
     }
 
-    private RecordContext createRecordContext(String algo, String[] keyNames, 
byte[][] keyValues, byte[] param,
+    private Record<byte[]> createRecord(byte[] data, String algo, String[] 
keyNames, byte[][] keyValues, byte[] param,
             Map<String, String> metadata1, Map<String, String> metadata2, int 
batchSize, int compressionMsgSize,
             Map<String, String> properties, boolean isEncryption) {
         EncryptionContext ctx = null;
@@ -198,14 +198,16 @@ private RecordContext createRecordContext(String algo, 
String[] keyNames, byte[]
             ctx.setKeys(keys);
             ctx.setParam(param);
         }
-        return new RecordContextImpl(properties, Optional.ofNullable(ctx)); 
+        return new RecordImpl(data, properties, Optional.ofNullable(ctx));
     }
 
-    class RecordContextImpl implements RecordContext {
+    class RecordImpl implements Record<byte[]> {
+        byte[] data;
         Map<String, String> properties;
         Optional<EncryptionContext> ectx;
 
-        public RecordContextImpl(Map<String, String> properties, 
Optional<EncryptionContext> ectx) {
+        public RecordImpl(byte[] data, Map<String, String> properties, 
Optional<EncryptionContext> ectx) {
+            this.data = data;
             this.properties = properties;
             this.ectx = ectx;
         }
@@ -217,6 +219,16 @@ public RecordContextImpl(Map<String, String> properties, 
Optional<EncryptionCont
         public Optional<EncryptionContext> getEncryptionCtx() {
             return ectx;
         }
+
+        @Override
+        public Optional<String> getKey() {
+            return Optional.empty();
+        }
+
+        @Override
+        public byte[] getValue() {
+            return data;
+        }
     }
 
     public static KinesisMessageResponse deSerializeRecordFromJson(String 
jsonRecord) {
diff --git 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index dab3fe96eb..c05828fac1 100644
--- 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++ 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -25,15 +25,19 @@
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.DefaultConsumer;
 import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+
+import lombok.Data;
+
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Map;
-
 /**
  * A simple connector to consume messages from a RabbitMQ queue
  */
@@ -82,20 +86,13 @@ public RabbitMQConsumer(RabbitMQSource source, Channel 
channel) {
 
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
-            source.consume(new RabbitMQRecord(body));
+            source.consume(new 
RabbitMQRecord(Optional.ofNullable(envelope.getRoutingKey()), body));
         }
     }
 
+    @Data
     static private class RabbitMQRecord implements Record<byte[]> {
-        private final byte[] data;
-
-        public RabbitMQRecord(byte[] data) {
-            this.data = data;
-        }
-
-        @Override
-        public byte[] getValue() {
-            return data;
-        }
+        private final Optional<String> key;
+        private final byte[] value;
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index 56c52e7ca4..494ae48e5b 100644
--- 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++ 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -33,6 +33,7 @@
 import java.io.InputStream;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.Record;
@@ -165,6 +166,12 @@ public TwitterRecord(String tweet) {
             this.tweet = tweet;
         }
 
+        @Override
+        public Optional<String> getKey() {
+            // TODO: Could use user or tweet ID as key here
+            return Optional.empty();
+        }
+
         @Override
         public String getValue() {
             return tweet;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to