This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ea6eae7  Added optional key in pulsar IO (#2116)
ea6eae7 is described below

commit ea6eae7d22e94fd2a998813ec9b2af42d9035fc0
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Jul 13 15:54:07 2018 -0700

    Added optional key in pulsar IO (#2116)
    
    * Added optional key in pulsar IO
    
    * Unified Record and RecordContext interfaces
    
    * Fixed pulsar sink
    
    * Fixed test compilation
    
    * Fixed imports
    
    * Removed wrong import
---
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  |  1 -
 .../org/apache/pulsar/client/impl/MessageImpl.java | 22 +++---
 .../pulsar/client/impl/TopicMessageImpl.java       |  7 +-
 .../pulsar/client/impl/MessageRecordImplTest.java  | 60 ----------------
 .../functions/instance/JavaInstanceRunnable.java   |  2 +-
 .../pulsar/functions/instance/SinkRecord.java      | 65 +++++++++---------
 .../apache/pulsar/functions/sink/PulsarSink.java   | 58 +++++++++-------
 .../pulsar/functions/source/PulsarRecord.java      | 65 ++++++++++++++----
 .../pulsar/functions/source/PulsarSource.java      | 29 ++++----
 .../rest/api/v2/FunctionApiV2ResourceTest.java     | 28 ++++----
 .../pulsar/io/aerospike/AerospikeAbstractSink.java | 54 ++++++++-------
 .../pulsar/io/aerospike/AerospikeStringSink.java   |  6 +-
 .../pulsar/io/cassandra/CassandraAbstractSink.java | 16 ++---
 .../pulsar/io/cassandra/CassandraStringSink.java   |  6 +-
 .../java/org/apache/pulsar/io/core/Record.java     | 66 +++++++++++++++++-
 .../org/apache/pulsar/io/core/RecordContext.java   | 68 -------------------
 .../java/org/apache/pulsar/io/core/SimpleSink.java | 47 -------------
 .../main/java/org/apache/pulsar/io/core/Sink.java  | 10 +--
 .../java/org/apache/pulsar/io/core/SinkTest.java   |  2 +-
 .../apache/pulsar/io/kafka/KafkaAbstractSink.java  | 50 +++++++-------
 .../pulsar/io/kafka/KafkaAbstractSource.java       | 28 +++++---
 .../apache/pulsar/io/kafka/KafkaStringSink.java    |  5 +-
 .../apache/pulsar/io/kafka/KafkaStringSource.java  |  2 +-
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  | 79 +++++++++++-----------
 .../java/org/apache/pulsar/io/kinesis/Utils.java   | 53 ++++++++-------
 .../org/apache/pulsar/io/kinesis/UtilsTest.java    | 46 ++++++++-----
 .../apache/pulsar/io/rabbitmq/RabbitMQSource.java  | 25 +++----
 .../apache/pulsar/io/twitter/TwitterFireHose.java  |  7 ++
 28 files changed, 434 insertions(+), 473 deletions(-)

diff --git 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index bda7f65..edfb1c4 100644
--- 
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ 
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -44,7 +44,6 @@ import 
org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
-import org.apache.pulsar.io.core.RecordContext;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index 504cc61..5ea8bfe 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -20,6 +20,13 @@ package org.apache.pulsar.client.impl;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import com.google.common.collect.Maps;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Collections;
@@ -32,22 +39,15 @@ import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SchemaSerializationException;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 
-import com.google.common.collect.Maps;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.util.Recycler;
-import io.netty.util.Recycler.Handle;
-
-public class MessageImpl<T> extends MessageRecordImpl<T, MessageId> {
+public class MessageImpl<T> implements Message<T> {
 
+    protected MessageId messageId;
     private MessageMetadata.Builder msgMetadataBuilder;
     private ClientCnx cnx;
     private ByteBuf payload;
@@ -86,7 +86,7 @@ public class MessageImpl<T> extends MessageRecordImpl<T, 
MessageId> {
             Schema<T> schema) {
         this(messageId, msgMetadata, payload, null, cnx, schema);
     }
-    
+
     MessageImpl(MessageIdImpl messageId, MessageMetadata msgMetadata, ByteBuf 
payload,
             Optional<EncryptionContext> encryptionCtx, ClientCnx cnx, 
Schema<T> schema) {
         this.msgMetadataBuilder = MessageMetadata.newBuilder(msgMetadata);
@@ -330,7 +330,7 @@ public class MessageImpl<T> extends MessageRecordImpl<T, 
MessageId> {
     void setMessageId(MessageIdImpl messageId) {
         this.messageId = messageId;
     }
-    
+
     @Override
     public Optional<EncryptionContext> getEncryptionCtx() {
         return encryptionCtx;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index f7f1f9b..4f5ac13 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -26,10 +26,11 @@ import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.EncryptionContext;
 
-public class TopicMessageImpl<T> extends MessageRecordImpl<T, 
TopicMessageIdImpl> {
+public class TopicMessageImpl<T> implements Message<T> {
 
     private final String topicName;
     private final Message<T> msg;
+    private final TopicMessageIdImpl messageId;
 
     TopicMessageImpl(String topicName,
                      Message<T> msg) {
@@ -109,9 +110,9 @@ public class TopicMessageImpl<T> extends 
MessageRecordImpl<T, TopicMessageIdImpl
     public T getValue() {
         return msg.getValue();
     }
-    
+
     @Override
     public Optional<EncryptionContext> getEncryptionCtx() {
-        return (msg instanceof MessageImpl) ? ((MessageImpl) 
msg).getEncryptionCtx() : Optional.empty();
+        return msg.getEncryptionCtx();
     }
 }
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
deleted file mode 100644
index 7df373a..0000000
--- 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.client.impl;
-
-import static org.mockito.Mockito.CALLS_REAL_METHODS;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertSame;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.pulsar.client.api.MessageId;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-public class MessageRecordImplTest {
-
-    private MessageRecordImpl<byte[], MessageId> message;
-    private MessageId messageId;
-
-    @BeforeMethod
-    public void setup() {
-        this.messageId = mock(MessageId.class);
-        this.message = mock(MessageRecordImpl.class, CALLS_REAL_METHODS);
-        when(message.getMessageId()).thenReturn(messageId);
-    }
-
-    @Test
-    public void testAck() throws Exception {
-        final AtomicReference<MessageId> msgIdRef = new AtomicReference<>();
-        final CountDownLatch ackLatch = new CountDownLatch(1);
-
-        this.message.setAckFunction(msgId -> {
-            msgIdRef.set(msgId);
-            ackLatch.countDown();
-        });
-
-        this.message.ack();
-        ackLatch.await();
-
-        assertSame(messageId, msgIdRef.get());
-    }
-
-}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index ba0de38..4c29232 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -324,7 +324,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
 
     private void sendOutputMessage(Record srcRecord, Object output) {
         try {
-            this.sink.write(srcRecord, output);
+            this.sink.write(new SinkRecord<>(srcRecord, output));
         } catch (Exception e) {
             log.info("Encountered exception in sink write: ", e);
             throw new RuntimeException(e);
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
similarity index 53%
rename from 
pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
rename to 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
index ea1d892..4e2edbe 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/SinkRecord.java
@@ -16,65 +16,66 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.client.impl;
+package org.apache.pulsar.functions.instance;
 
-import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.io.core.Record;
 
-/**
- * Abstract class that implements message api and connect record api.
- */
-public abstract class MessageRecordImpl<T, M extends MessageId> implements 
Message<T>, Record<T> {
+@Data
+@AllArgsConstructor
+public class SinkRecord<T> implements Record<T> {
 
-    protected M messageId;
-    protected java.util.function.Consumer<M> ackFunction;
+    private final Record<T> sourceRecord;
+    private final T value;
 
-    public void setAckFunction(java.util.function.Consumer<M> ackFunction) {
-        this.ackFunction = ackFunction;
+    public Record<T> getSourceRecord() {
+        return sourceRecord;
     }
 
     @Override
-    public String getPartitionId() {
-        if (null != messageId) {
-            if (messageId instanceof MessageIdImpl) {
-                return String.valueOf(((MessageIdImpl) 
messageId).getPartitionIndex());
-            } else {
-                return "";
-            }
-        }
-        return "";
+    public Optional<String> getKey() {
+        return sourceRecord.getKey();
     }
 
     @Override
-    public long getRecordSequence() {
-        return getSequenceId();
+    public T getValue() {
+        return value;
     }
 
     @Override
-    public void ack() {
-        if (null != ackFunction) {
-            ackFunction.accept((M) getMessageId());
-        }
+    public Optional<String> getPartitionId() {
+        return sourceRecord.getPartitionId();
     }
 
     @Override
-    public void fail() {
-        // no-op
+    public Optional<Long> getRecordSequence() {
+        return sourceRecord.getRecordSequence();
+    }
+
+    @Override
+    public Optional<EncryptionContext> getEncryptionCtx() {
+        return sourceRecord.getEncryptionCtx();
     }
 
     @Override
     public Map<String, String> getProperties() {
-        return Collections.emptyMap();
+        return sourceRecord.getProperties();
     }
 
     @Override
-    public Optional<EncryptionContext> getEncryptionCtx() {
-        return Optional.empty();
+    public void ack() {
+        sourceRecord.ack();
+    }
+
+    @Override
+    public void fail() {
+        sourceRecord.fail();
     }
+
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 7aebd93..2170879 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -19,10 +19,13 @@
 package org.apache.pulsar.functions.sink;
 
 import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Base64;
+import java.util.Map;
+
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import net.jodah.typetools.TypeResolver;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
@@ -35,18 +38,18 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.InstanceUtils;
+import org.apache.pulsar.functions.instance.SinkRecord;
 import 
org.apache.pulsar.functions.instance.producers.AbstractOneOuputTopicProducers;
 import 
org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
 import org.apache.pulsar.functions.instance.producers.Producers;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 
-import java.util.Base64;
-import java.util.Map;
+import net.jodah.typetools.TypeResolver;
 
 @Slf4j
 public class PulsarSink<T> implements Sink<T> {
@@ -57,16 +60,16 @@ public class PulsarSink<T> implements Sink<T> {
 
     private PulsarSinkProcessor pulsarSinkProcessor;
 
-    private interface PulsarSinkProcessor {
+    private interface PulsarSinkProcessor<T> {
         void initializeOutputProducer(String outputTopic) throws Exception;
 
         void sendOutputMessage(MessageBuilder outputMsgBuilder,
-                               RecordContext recordContext) throws Exception;
+                               Record<T> recordContext) throws Exception;
 
         void close() throws Exception;
     }
 
-    private class PulsarSinkAtMostOnceProcessor implements PulsarSinkProcessor 
{
+    private class PulsarSinkAtMostOnceProcessor implements 
PulsarSinkProcessor<T> {
         private Producer<byte[]> producer;
 
         @Override
@@ -77,7 +80,7 @@ public class PulsarSink<T> implements Sink<T> {
 
         @Override
         public void sendOutputMessage(MessageBuilder outputMsgBuilder,
-                                      RecordContext recordContext) throws 
Exception {
+                                      Record<T> recordContext) throws 
Exception {
             Message<byte[]> outputMsg = outputMsgBuilder.build();
             this.producer.sendAsync(outputMsg);
         }
@@ -94,7 +97,7 @@ public class PulsarSink<T> implements Sink<T> {
         }
     }
 
-    private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor {
+    private class PulsarSinkAtLeastOnceProcessor implements 
PulsarSinkProcessor<T> {
         private Producer<byte[]> producer;
 
         @Override
@@ -105,7 +108,7 @@ public class PulsarSink<T> implements Sink<T> {
 
         @Override
         public void sendOutputMessage(MessageBuilder outputMsgBuilder,
-                                      RecordContext recordContext) throws 
Exception {
+                                      Record<T> recordContext) throws 
Exception {
             Message<byte[]> outputMsg = outputMsgBuilder.build();
             this.producer.sendAsync(outputMsg).thenAccept(messageId -> 
recordContext.ack());
         }
@@ -122,7 +125,7 @@ public class PulsarSink<T> implements Sink<T> {
         }
     }
 
-    private class PulsarSinkEffectivelyOnceProcessor implements 
PulsarSinkProcessor, ConsumerEventListener {
+    private class PulsarSinkEffectivelyOnceProcessor implements 
PulsarSinkProcessor<T>, ConsumerEventListener {
 
         @Getter(AccessLevel.PACKAGE)
         protected Producers outputProducer;
@@ -134,15 +137,16 @@ public class PulsarSink<T> implements Sink<T> {
         }
 
         @Override
-        public void sendOutputMessage(MessageBuilder outputMsgBuilder, 
RecordContext recordContext)
+        public void sendOutputMessage(MessageBuilder outputMsgBuilder, 
Record<T> recordContext)
                 throws Exception {
 
             // assign sequence id to output message for idempotent producing
-            outputMsgBuilder = outputMsgBuilder
-                    .setSequenceId(recordContext.getRecordSequence());
+            if (recordContext.getRecordSequence().isPresent()) {
+                
outputMsgBuilder.setSequenceId(recordContext.getRecordSequence().get());
+            }
 
             // currently on PulsarRecord
-            Producer producer = 
outputProducer.getProducer(recordContext.getPartitionId());
+            Producer producer = 
outputProducer.getProducer(recordContext.getPartitionId().get());
 
             org.apache.pulsar.client.api.Message outputMsg = 
outputMsgBuilder.build();
             producer.sendAsync(outputMsg)
@@ -211,29 +215,37 @@ public class PulsarSink<T> implements Sink<T> {
     }
 
     @Override
-    public void write(RecordContext recordContext, T value) throws Exception {
+    public void write(Record<T> record) throws Exception {
 
         byte[] output;
         try {
-            output = this.outputSerDe.serialize(value);
+            output = this.outputSerDe.serialize(record.getValue());
         } catch (Exception e) {
             //TODO Add serialization exception stats
             throw new RuntimeException("Error occured when attempting to 
serialize output:", e);
         }
+
         MessageBuilder msgBuilder = MessageBuilder.create();
+        if (record.getKey().isPresent()) {
+            msgBuilder.setKey(record.getKey().get());
+        }
+
         msgBuilder.setContent(output);
-        if (recordContext instanceof PulsarRecord) {
-            PulsarRecord pulsarRecord = (PulsarRecord) recordContext;
+
+        if (!record.getProperties().isEmpty()) {
+            msgBuilder.setProperties(record.getProperties());
+        }
+
+        SinkRecord<T> sinkRecord = (SinkRecord<T>) record;
+        if (sinkRecord.getSourceRecord() instanceof PulsarRecord) {
+            PulsarRecord<T> pulsarRecord = (PulsarRecord<T>) 
sinkRecord.getSourceRecord();
             // forward user properties to sink-topic
-            if (pulsarRecord.getProperties() != null) {
-                msgBuilder.setProperties(pulsarRecord.getProperties());
-            }
             msgBuilder.setProperty("__pfn_input_topic__", 
pulsarRecord.getTopicName()).setProperty(
                     "__pfn_input_msg_id__",
                     new 
String(Base64.getEncoder().encode(pulsarRecord.getMessageId().toByteArray())));
         }
 
-        this.pulsarSinkProcessor.sendOutputMessage(msgBuilder, recordContext);
+        this.pulsarSinkProcessor.sendOutputMessage(msgBuilder, record);
     }
 
     @Override
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
index 113cf82..7a6d11b 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarRecord.java
@@ -18,35 +18,72 @@
  */
 package org.apache.pulsar.functions.source;
 
+import java.util.Map;
+import java.util.Optional;
+
 import lombok.Builder;
-import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.ToString;
 
-import java.util.Map;
-import java.util.Optional;
-
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.api.EncryptionContext;
+import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.Record;
 
-@Data
 @Builder
 @Getter
 @ToString
 @EqualsAndHashCode
 public class PulsarRecord<T> implements Record<T> {
 
-    private String partitionId;
-    private long recordSequence;
-    private T value;
-    private MessageId messageId;
-    private String topicName;
-    private Map<String, String> properties;
-    private Optional<EncryptionContext> encryptionCtx = Optional.empty();
-    private Runnable failFunction;
-    private Runnable ackFunction;
+    private final String topicName;
+    private final int partition;
+
+    // TODO: When we switch to schema for functions, we should just rely on 
the message object value
+    private final T value;
+    private final Message<byte[]> message;
+
+    private final Runnable failFunction;
+    private final Runnable ackFunction;
+
+    @Override
+    public Optional<String> getKey() {
+        if (message.hasKey()) {
+            return Optional.of(message.getKey());
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    @Override
+    public Optional<String> getPartitionId() {
+        return Optional.of(String.format("%s-%s", topicName, partition));
+    }
+
+    @Override
+    public Optional<Long> getRecordSequence() {
+        return Optional.of(Utils.getSequenceId(message.getMessageId()));
+    }
+
+    /**
+     * Retrieves encryption-context that is attached to record.
+     *
+     * @return {@link Optional}<{@link EncryptionContext}>
+     */
+    public Optional<EncryptionContext> getEncryptionCtx() {
+        return message.getEncryptionCtx();
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return message.getProperties();
+    }
+
+    public MessageId getMessageId() {
+        return message.getMessageId();
+    }
 
     @Override
     public void ack() {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 680d754..d5a1df9 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -54,7 +54,7 @@ public class PulsarSource<T> implements Source<T> {
     private boolean isTopicsPattern;
 
     @Getter
-    private org.apache.pulsar.client.api.Consumer inputConsumer;
+    private org.apache.pulsar.client.api.Consumer<byte[]> inputConsumer;
 
     public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig 
pulsarConfig) {
         this.pulsarClient = pulsarClient;
@@ -69,17 +69,17 @@ public class PulsarSource<T> implements Source<T> {
         // Setup pulsar consumer
         ConsumerBuilder<byte[]> consumerBuilder = 
this.pulsarClient.newConsumer()
                 //consume message even if can't decrypt and deliver it along 
with encryption-ctx
-                .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)  
+                .cryptoFailureAction(ConsumerCryptoFailureAction.CONSUME)
                 
.subscriptionName(this.pulsarSourceConfig.getSubscriptionName())
                 
.subscriptionType(this.pulsarSourceConfig.getSubscriptionType());
 
         if(isNotBlank(this.pulsarSourceConfig.getTopicsPattern())) {
-            
consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern());    
+            
consumerBuilder.topicsPattern(this.pulsarSourceConfig.getTopicsPattern());
             isTopicsPattern = true;
         }else {
-            consumerBuilder.topics(new 
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()));    
+            consumerBuilder.topics(new 
ArrayList<>(this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet()));
         }
-        
+
         if (pulsarSourceConfig.getTimeoutMs() != null) {
             consumerBuilder.ackTimeout(pulsarSourceConfig.getTimeoutMs(), 
TimeUnit.MILLISECONDS);
         }
@@ -88,21 +88,21 @@ public class PulsarSource<T> implements Source<T> {
 
     @Override
     public Record<T> read() throws Exception {
-        org.apache.pulsar.client.api.Message<T> message = 
this.inputConsumer.receive();
+        org.apache.pulsar.client.api.Message<byte[]> message = 
this.inputConsumer.receive();
 
         String topicName;
-        String partitionId;
+        int partitionId;
 
         // If more than one topics are being read than the Message return by 
the consumer will be TopicMessageImpl
         // If there is only topic being read then the Message returned by the 
consumer wil be MessageImpl
         if (message instanceof TopicMessageImpl) {
-            topicName = ((TopicMessageImpl) message).getTopicName();
+            topicName = ((TopicMessageImpl<?>) message).getTopicName();
             TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) 
message.getMessageId();
             MessageIdImpl messageId = (MessageIdImpl) 
topicMessageId.getInnerMessageId();
-            partitionId = Long.toString(messageId.getPartitionIndex());
+            partitionId = messageId.getPartitionIndex();
         } else {
             topicName = 
this.pulsarSourceConfig.getTopicSerdeClassNameMap().keySet().iterator().next();
-            partitionId = Long.toString(((MessageIdImpl) 
message.getMessageId()).getPartitionIndex());
+            partitionId = ((MessageIdImpl) 
message.getMessageId()).getPartitionIndex();
         }
 
         Object object;
@@ -130,14 +130,10 @@ public class PulsarSource<T> implements Source<T> {
             throw new RuntimeException("Error in casting input to expected 
type:", e);
         }
 
-        PulsarRecord<T> pulsarMessage = (PulsarRecord<T>) 
PulsarRecord.builder()
+        return PulsarRecord.<T>builder()
                 .value(input)
-                .messageId(message.getMessageId())
-                .partitionId(String.format("%s-%s", topicName, partitionId))
-                .recordSequence(Utils.getSequenceId(message.getMessageId()))
+                .message(message)
                 .topicName(topicName)
-                .properties(message.getProperties())
-                .encryptionCtx(message.getEncryptionCtx())
                 .ackFunction(() -> {
                     if (pulsarSourceConfig.getProcessingGuarantees() == 
FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE) {
                         inputConsumer.acknowledgeCumulativeAsync(message);
@@ -150,7 +146,6 @@ public class PulsarSource<T> implements Source<T> {
                     }
                 })
                 .build();
-        return pulsarMessage;
     }
 
     @Override
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 0bac0a6..415912a 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -29,6 +29,9 @@ import static org.powermock.api.mockito.PowerMockito.doThrow;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.testng.Assert.assertEquals;
 
+import com.google.common.collect.Lists;
+import com.google.gson.Gson;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -65,7 +68,7 @@ import org.apache.pulsar.functions.worker.WorkerConfig;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
-import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
@@ -77,9 +80,6 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
-import com.google.common.collect.Lists;
-import com.google.gson.Gson;
-
 /**
  * Unit test of {@link FunctionApiV2Resource}.
  */
@@ -98,7 +98,7 @@ public class FunctionApiV2ResourceTest {
             return input;
         }
     }
-    
+
     public static final class TestSink implements Sink<byte[]> {
 
         @Override
@@ -110,7 +110,7 @@ public class FunctionApiV2ResourceTest {
         }
 
         @Override
-        public void write(RecordContext inputRecordContext, byte[] value) 
throws Exception {
+        public void write(Record<byte[]> record) throws Exception {
         }
     }
 
@@ -670,7 +670,7 @@ public class FunctionApiV2ResourceTest {
 
         String fileLocation = 
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         String filePackageUrl = "file://" + fileLocation;
-        
+
         SinkSpec sinkSpec = SinkSpec.newBuilder()
                 .setTopic(outputTopic)
                 .setSerDeClassName(outputSerdeClassName).build();
@@ -681,14 +681,14 @@ public class FunctionApiV2ResourceTest {
                 .setParallelism(parallelism)
                 
.setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
                         
.putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
-        
+
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), 
eq(function))).thenReturn(true);
         RequestResult rr = new RequestResult()
                 .setSuccess(true)
                 .setMessage("function registered");
             CompletableFuture<RequestResult> requestResult = 
CompletableFuture.completedFuture(rr);
             
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
-        
+
         Response response = resource.updateFunction(
             tenant,
             namespace,
@@ -697,10 +697,10 @@ public class FunctionApiV2ResourceTest {
             null,
             filePackageUrl,
             
org.apache.pulsar.functions.utils.Utils.printJson(functionDetails));
-        
+
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
-    
+
     @Test
     public void testUpdateFunctionFailure() throws Exception {
         mockStatic(Utils.class);
@@ -990,7 +990,7 @@ public class FunctionApiV2ResourceTest {
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
         assertEquals(new Gson().toJson(functions), response.getEntity());
     }
-    
+
     @Test
     public void testDownloadFunctionHttpUrl() throws Exception {
         String jarHttpUrl = 
"http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";;
@@ -1006,7 +1006,7 @@ public class FunctionApiV2ResourceTest {
             pkgFile.delete();
         }
     }
-    
+
     @Test
     public void testDownloadFunctionFile() throws Exception {
         String fileLocation = 
FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
@@ -1022,7 +1022,7 @@ public class FunctionApiV2ResourceTest {
             pkgFile.delete();
         }
     }
-    
+
     @Test
     public void testRegisterFunctionFileUrlWithValidSinkClass() throws 
IOException {
         Configurator.setRootLevel(Level.DEBUG);
diff --git 
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
 
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
index ba29616..6ea657e 100644
--- 
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
+++ 
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
@@ -31,22 +31,23 @@ import com.aerospike.client.async.NioEventLoops;
 import com.aerospike.client.listener.WriteListener;
 import com.aerospike.client.policy.ClientPolicy;
 import com.aerospike.client.policy.WritePolicy;
-import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.SimpleSink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingDeque;
 
+import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * A Simple abstract class for Aerospike sink
  * Users need to implement extractKeyValue function to use this sink
  */
-public abstract class AerospikeAbstractSink<K, V> extends SimpleSink<byte[]> {
+public abstract class AerospikeAbstractSink<K, V> implements Sink<byte[]> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AerospikeAbstractSink.class);
 
@@ -55,6 +56,7 @@ public abstract class AerospikeAbstractSink<K, V> extends 
SimpleSink<byte[]> {
     private AerospikeClient client;
     private WritePolicy writePolicy;
     private BlockingQueue<AWriteListener> queue;
+    private NioEventLoops eventLoops;
     private EventLoop eventLoop;
 
     @Override
@@ -74,18 +76,25 @@ public abstract class AerospikeAbstractSink<K, V> extends 
SimpleSink<byte[]> {
         for (int i = 0; i < aerospikeSinkConfig.getMaxConcurrentRequests(); 
++i) {
             queue.put(new AWriteListener(queue));
         }
-        eventLoop = new NioEventLoops(new EventPolicy(), 1).next();
+
+        eventLoops = new NioEventLoops(new EventPolicy(), 1);
+        eventLoop = eventLoops.next();
     }
 
     @Override
     public void close() throws Exception {
-        client.close();
+        if (client != null) {
+            client.close();
+        }
+
+        if (eventLoops != null) {
+            eventLoops.close();
+        }
         LOG.info("Connection Closed");
     }
 
     @Override
-    public CompletableFuture<Void> write(byte[] record) {
-        CompletableFuture<Void> future = new CompletableFuture<>();
+    public void write(Record<byte[]> record) {
         KeyValue<K, V> keyValue = extractKeyValue(record);
         Key key = new Key(aerospikeSinkConfig.getKeyspace(), 
aerospikeSinkConfig.getKeySet(), keyValue.getKey().toString());
         Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), 
Value.getAsBlob(keyValue.getValue()));
@@ -93,12 +102,11 @@ public abstract class AerospikeAbstractSink<K, V> extends 
SimpleSink<byte[]> {
         try {
             listener = queue.take();
         } catch (InterruptedException ex) {
-            future.completeExceptionally(ex);
-            return future;
+            record.fail();
+            return;
         }
-        listener.setFuture(future);
+        listener.setContext(record);
         client.put(eventLoop, listener, writePolicy, key, bin);
-        return future;
     }
 
     private void createClient() {
@@ -121,21 +129,21 @@ public abstract class AerospikeAbstractSink<K, V> extends 
SimpleSink<byte[]> {
     }
 
     private class AWriteListener implements WriteListener {
-        private CompletableFuture<Void> future;
+        private Record<byte[]> context;
         private BlockingQueue<AWriteListener> queue;
 
         public AWriteListener(BlockingQueue<AWriteListener> queue) {
             this.queue = queue;
         }
 
-        public void setFuture(CompletableFuture<Void> future) {
-            this.future = future;
+        public void setContext(Record<byte[]> record) {
+            this.context = record;
         }
 
         @Override
         public void onSuccess(Key key) {
-            if (future != null) {
-                future.complete(null);
+            if (context != null) {
+                context.ack();
             }
             try {
                 queue.put(this);
@@ -146,8 +154,8 @@ public abstract class AerospikeAbstractSink<K, V> extends 
SimpleSink<byte[]> {
 
         @Override
         public void onFailure(AerospikeException e) {
-            if (future != null) {
-                future.completeExceptionally(e);
+            if (context != null) {
+                context.fail();
             }
             try {
                 queue.put(this);
@@ -157,5 +165,5 @@ public abstract class AerospikeAbstractSink<K, V> extends 
SimpleSink<byte[]> {
         }
     }
 
-    public abstract KeyValue<K, V> extractKeyValue(byte[] message);
+    public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> message);
 }
\ No newline at end of file
diff --git 
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
 
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
index 8bf0174..40ffd90 100644
--- 
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
+++ 
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeStringSink.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.io.aerospike;
 
 import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Record;
 
 /**
  * Aerospike sink that treats incoming messages on the input topic as Strings
@@ -27,7 +28,8 @@ import org.apache.pulsar.io.core.KeyValue;
  */
 public class AerospikeStringSink extends AerospikeAbstractSink<String, String> 
{
     @Override
-    public KeyValue<String, String> extractKeyValue(byte[] message) {
-        return new KeyValue<>(new String(message), new String(message));
+    public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
+        String key = record.getKey().orElseGet(() -> new 
String(record.getValue()));
+        return new KeyValue<>(key, new String(record.getValue()));
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
 
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
index c8a9ca5..fd76c0c 100644
--- 
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
+++ 
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
@@ -29,17 +29,17 @@ import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.SimpleSink;
+import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 
 /**
  * A Simple abstract class for Cassandra sink
  * Users need to implement extractKeyValue function to use this sink
  */
-public abstract class CassandraAbstractSink<K, V> extends SimpleSink<byte[]> {
+public abstract class CassandraAbstractSink<K, V> implements Sink<byte[]> {
 
     // ----- Runtime fields
     private Cluster cluster;
@@ -69,24 +69,22 @@ public abstract class CassandraAbstractSink<K, V> extends 
SimpleSink<byte[]> {
     }
 
     @Override
-    public CompletableFuture<Void> write(byte[] record) {
+    public void write(Record<byte[]> record) {
         KeyValue<K, V> keyValue = extractKeyValue(record);
         BoundStatement bound = statement.bind(keyValue.getKey(), 
keyValue.getValue());
         ResultSetFuture future = session.executeAsync(bound);
-        CompletableFuture<Void> completable = new CompletableFuture<Void>();
         Futures.addCallback(future,
                 new FutureCallback<ResultSet>() {
                     @Override
                     public void onSuccess(ResultSet result) {
-                        completable.complete(null);
+                        record.ack();
                     }
 
                     @Override
                     public void onFailure(Throwable t) {
-                        completable.completeExceptionally(t);
+                        record.fail();
                     }
                 });
-        return completable;
     }
 
     private void createClient(String roots) {
@@ -107,5 +105,5 @@ public abstract class CassandraAbstractSink<K, V> extends 
SimpleSink<byte[]> {
         session.execute("USE " + cassandraSinkConfig.getKeyspace());
     }
 
-    public abstract KeyValue<K, V> extractKeyValue(byte[] message);
+    public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> record);
 }
\ No newline at end of file
diff --git 
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
 
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
index 5c0cb16..c2e72b8 100644
--- 
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
+++ 
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.io.cassandra;
 
 import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Record;
 
 /**
  * Cassandra sink that treats incoming messages on the input topic as Strings
@@ -27,7 +28,8 @@ import org.apache.pulsar.io.core.KeyValue;
  */
 public class CassandraStringSink extends CassandraAbstractSink<String, String> 
{
     @Override
-    public KeyValue<String, String> extractKeyValue(byte[] message) {
-        return new KeyValue<>(new String(message), new String(message));
+    public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
+        String key = record.getKey().orElseGet(() -> new 
String(record.getValue()));
+        return new KeyValue<>(key, new String(record.getValue()));
     }
 }
\ No newline at end of file
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
index 08c78bf..504d5f4 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Record.java
@@ -18,14 +18,74 @@
  */
 package org.apache.pulsar.io.core;
 
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.pulsar.common.api.EncryptionContext;
+
 /**
- * Pulsar Connect's Record interface. Record encapsulates the
- * information about a record being read from a Source.
+ * Pulsar Connect's Record interface. Record encapsulates the information 
about a record being read from a Source.
  */
-public interface Record<T> extends RecordContext {
+public interface Record<T> {
+
+    /**
+     * Return a key if the key has one associated
+     */
+    Optional<String> getKey();
+
     /**
      * Retrieves the actual data of the record
+     *
      * @return The record data
      */
     T getValue();
+
+    /**
+     * Retrieves the partition information if any of the record.
+     *
+     * @return The partition id where the
+     */
+    default Optional<String> getPartitionId() {
+        return Optional.empty();
+    }
+
+    /**
+     * Retrieves the sequence of the record from a source partition.
+     *
+     * @return Sequence Id associated with the record
+     */
+    default Optional<Long> getRecordSequence() {
+        return Optional.empty();
+    }
+
+    /**
+     * Retrieves encryption-context that is attached to record.
+     *
+     * @return {@link Optional}<{@link EncryptionContext}>
+     */
+    default public Optional<EncryptionContext> getEncryptionCtx() {
+        return Optional.empty();
+    }
+
+    /**
+     * Retrieves user-defined properties attached to record.
+     *
+     * @return Map of user-properties
+     */
+    default Map<String, String> getProperties() {
+        return Collections.emptyMap();
+    }
+
+    /**
+     * Acknowledge that this record is fully processed
+     */
+    default void ack() {
+    }
+
+    /**
+     * To indicate that this record has failed to be processed
+     */
+    default void fail() {
+    }
 }
diff --git 
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
deleted file mode 100644
index 5e12125..0000000
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/RecordContext.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.core;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.Optional;
-
-import org.apache.pulsar.common.api.EncryptionContext;
-
-/**
- * A source context that can be used by the runtime to interact with source.
- */
-public interface RecordContext {
-
-    /**
-     * Retrieves the partition information if any of the record.
-     * @return The partition id where the
-     */
-    default String getPartitionId() { return null; }
-
-    /**
-     * Retrieves the sequence of the record from a source partition.
-     * @return Sequence Id associated with the record
-     */
-    default long getRecordSequence() { return -1L; }
-    
-    /**
-     * Retrieves user-properties attached to record. 
-     * 
-     * @return Map of user-properties
-     */
-    default Map<String, String> getProperties() { return 
Collections.emptyMap();}
-    
-    /**
-     * Retrieves encryption-context that is attached to record. 
-     * 
-     * @return {@link Optional}<{@link EncryptionContext}>
-     */
-    default Optional<EncryptionContext> getEncryptionCtx() { return 
Optional.empty();}
-
-    /**
-     * Acknowledge that this record is fully processed
-     */
-    default void ack() {}
-
-    /**
-     * To indicate that this record has failed to be processed
-     */
-    default void fail() {}
-
-}
diff --git 
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
deleted file mode 100644
index 352a059..0000000
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.core;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * A simpler version of the Sink interface users can extend for use cases to
- * don't require fine grained delivery control
- */
-public abstract class SimpleSink<T> implements Sink<T> {
-
-    @Override
-    public void write(RecordContext inputRecordContext, T value) throws 
Exception {
-        write(value)
-                .thenAccept(ignored -> inputRecordContext.ack())
-                .exceptionally(cause -> {
-                    inputRecordContext.fail();
-                    return null;
-                });
-    }
-
-    /**
-     * Attempt to publish a type safe collection of messages
-     *
-     * @param value output value
-     * @return Completable future fo async publish request
-     */
-    public abstract CompletableFuture<Void> write(T value);
-}
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
index 31a044b..51d635a 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
@@ -23,7 +23,7 @@ import java.util.Map;
 /**
  * Generic sink interface users can implement to run Sink on top of Pulsar 
Functions
  */
-public interface Sink<T> extends AutoCloseable{
+public interface Sink<T> extends AutoCloseable {
     /**
      * Open connector with configuration
      *
@@ -32,12 +32,12 @@ public interface Sink<T> extends AutoCloseable{
      * @throws Exception IO type exceptions when opening a connector
      */
     void open(final Map<String, Object> config, SinkContext sinkContext) 
throws Exception;
-    
+
     /**
      * Write a message to Sink
-     * @param inputRecordContext Context of value
-     * @param value value to write to sink
+     * @param inputRecordContext Context of input record from the source
+     * @param record record to write to sink
      * @throws Exception
      */
-    void write(RecordContext inputRecordContext, T value) throws Exception;
+    void write(Record<T> record) throws Exception;
 }
diff --git 
a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java 
b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
index 4fb4a7d..bb1100c 100644
--- a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
+++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
@@ -38,7 +38,7 @@ public class SinkTest {
         }
 
         @Override
-        public void write(RecordContext inputRecordContext, String value) 
throws Exception {
+        public void write(Record<String> record) throws Exception {
 
         }
 
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index c597489..d6acae8 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -19,47 +19,45 @@
 
 package org.apache.pulsar.io.kafka;
 
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+
+import lombok.extern.slf4j.Slf4j;
+
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.SimpleSink;
+import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 
 /**
  * A Simple abstract class for Kafka sink
  * Users need to implement extractKeyValue function to use this sink
  */
-public abstract class KafkaAbstractSink<K, V> extends SimpleSink<byte[]> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(KafkaAbstractSink.class);
+@Slf4j
+public abstract class KafkaAbstractSink<K, V> implements Sink<byte[]> {
 
     private Producer<K, V> producer;
     private Properties props = new Properties();
     private KafkaSinkConfig kafkaSinkConfig;
 
     @Override
-    public CompletableFuture<Void> write(byte[] message) {
-        KeyValue<K, V> keyValue = extractKeyValue(message);
+    public void write(Record<byte[]> sourceRecord) {
+        KeyValue<K, V> keyValue = extractKeyValue(sourceRecord);
         ProducerRecord<K, V> record = new 
ProducerRecord<>(kafkaSinkConfig.getTopic(), keyValue.getKey(), 
keyValue.getValue());
-        LOG.debug("Record sending to kafka, record={}.", record);
-        Future f = producer.send(record);
-        return CompletableFuture.supplyAsync(() -> {
-            try {
-                f.get();
-                return null;
-            } catch (InterruptedException|ExecutionException e) {
-                throw new RuntimeException(e);
+        if (log.isDebugEnabled()) {
+            log.debug("Record sending to kafka, record={}.", record);
+        }
+
+        producer.send(record, (metadata, exception) -> {
+            if (exception == null) {
+                sourceRecord.ack();
+            } else {
+                sourceRecord.fail();
             }
         });
     }
@@ -68,7 +66,7 @@ public abstract class KafkaAbstractSink<K, V> extends 
SimpleSink<byte[]> {
     public void close() throws IOException {
         if (producer != null) {
             producer.close();
-            LOG.info("Kafka sink stopped.");
+            log.info("Kafka sink stopped.");
         }
     }
 
@@ -93,8 +91,8 @@ public abstract class KafkaAbstractSink<K, V> extends 
SimpleSink<byte[]> {
 
         producer = new KafkaProducer<>(props);
 
-        LOG.info("Kafka sink started.");
+        log.info("Kafka sink started.");
     }
 
-    public abstract KeyValue<K, V> extractKeyValue(byte[] message);
+    public abstract KeyValue<K, V> extractKeyValue(Record<byte[]> message);
 }
\ No newline at end of file
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index fe8a6a7..bbafc8e 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -44,7 +45,7 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaAbstractSource.class);
 
-    private Consumer<byte[], byte[]> consumer;
+    private Consumer<String, byte[]> consumer;
     private Properties props;
     private KafkaSourceConfig kafkaSourceConfig;
     Thread runnerThread;
@@ -98,12 +99,12 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
             consumer = new KafkaConsumer<>(props);
             consumer.subscribe(Arrays.asList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
-            ConsumerRecords<byte[], byte[]> consumerRecords;
+            ConsumerRecords<String, byte[]> consumerRecords;
             while(true){
                 consumerRecords = consumer.poll(1000);
                 CompletableFuture<?>[] futures = new 
CompletableFuture<?>[consumerRecords.count()];
                 int index = 0;
-                for (ConsumerRecord<byte[], byte[]> consumerRecord : 
consumerRecords) {
+                for (ConsumerRecord<String, byte[]> consumerRecord : 
consumerRecords) {
                     LOG.debug("Record received from kafka, key: {}. value: 
{}", consumerRecord.key(), consumerRecord.value());
                     KafkaRecord<V> record = new KafkaRecord<>(consumerRecord, 
extractValue(consumerRecord));
                     consume(record);
@@ -125,27 +126,32 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
         runnerThread.start();
     }
 
-    public abstract V extractValue(ConsumerRecord<byte[], byte[]> record);
+    public abstract V extractValue(ConsumerRecord<String, byte[]> record);
 
     static private class KafkaRecord<V> implements Record<V> {
-        private final ConsumerRecord<byte[], byte[]> record;
+        private final ConsumerRecord<String, byte[]> record;
         private final V value;
         @Getter
-        private final CompletableFuture<Void> completableFuture = new 
CompletableFuture();
+        private final CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
 
-        public KafkaRecord(ConsumerRecord<byte[], byte[]> record,
+        public KafkaRecord(ConsumerRecord<String, byte[]> record,
                            V value) {
             this.record = record;
             this.value = value;
         }
         @Override
-        public String getPartitionId() {
-            return Integer.toString(record.partition());
+        public Optional<String> getPartitionId() {
+            return Optional.of(Integer.toString(record.partition()));
         }
 
         @Override
-        public long getRecordSequence() {
-            return record.offset();
+        public Optional<Long> getRecordSequence() {
+            return Optional.of(record.offset());
+        }
+
+        @Override
+        public Optional<String> getKey() {
+            return Optional.ofNullable(record.key());
         }
 
         @Override
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
index 775b5be..aab4474 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSink.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.io.kafka;
 
 import org.apache.pulsar.io.core.KeyValue;
+import org.apache.pulsar.io.core.Record;
 
 /**
  * Kafka sink that treats incoming messages on the input topic as Strings
@@ -27,7 +28,7 @@ import org.apache.pulsar.io.core.KeyValue;
  */
 public class KafkaStringSink extends KafkaAbstractSink<String, String> {
     @Override
-    public KeyValue<String, String> extractKeyValue(byte[] message) {
-        return new KeyValue<>(new String(message), new String(message));
+    public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
+        return new KeyValue<>(record.getKey().orElse(null), new 
String(record.getValue()));
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
index e31b75f..802d943 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaStringSource.java
@@ -27,7 +27,7 @@ import org.apache.kafka.clients.consumer.*;
  */
 public class KafkaStringSource extends KafkaAbstractSource<String> {
     @Override
-    public String extractValue(ConsumerRecord<byte[], byte[]> record) {
+    public String extractValue(ConsumerRecord<String, byte[]> record) {
         return new String(record.value());
     }
 }
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index dbe1e75..c3b6cdc 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -24,22 +24,6 @@ import static 
com.google.common.util.concurrent.Futures.addCallback;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.nio.ByteBuffer;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
-import org.apache.pulsar.io.core.RecordContext;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
@@ -55,17 +39,32 @@ import com.google.gson.reflect.TypeToken;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
- * A Kinesis sink which can be configured by {@link KinesisSinkConfig}. 
- * <pre> 
+ * A Kinesis sink which can be configured by {@link KinesisSinkConfig}.
+ * <pre>
  * {@link KinesisSinkConfig} accepts
  * 1. <b>awsEndpoint:</b> kinesis end-point url can be found at : 
https://docs.aws.amazon.com/general/latest/gr/rande.html
  * 2. <b>awsRegion:</b> appropriate aws region eg: us-west-1, us-west-2
  * 3. <b>awsKinesisStreamName:</b> kinesis stream name
  * 4. <b>awsCredentialPluginName:</b> Fully-Qualified class name of 
implementation of {@link AwsCredentialProviderPlugin}.
  *    - It is a factory class which creates an {@link AWSCredentialsProvider} 
that will be used by {@link KinesisProducer}
- *    - If it is empty then {@link KinesisSink} creates default {@link 
AWSCredentialsProvider} 
- *      which accepts json-map of credentials in awsCredentialPluginParam 
+ *    - If it is empty then {@link KinesisSink} creates default {@link 
AWSCredentialsProvider}
+ *      which accepts json-map of credentials in awsCredentialPluginParam
  *      eg: awsCredentialPluginParam = 
{"accessKey":"my-access-key","secretKey":"my-secret-key"}
  * 5. <b>awsCredentialPluginParam:</b> json-parameters to initialize {@link 
AwsCredentialProviderPlugin}
  * 6. messageFormat: enum:["ONLY_RAW_PAYLOAD","FULL_MESSAGE_IN_JSON"]
@@ -76,9 +75,9 @@ import io.netty.util.Recycler.Handle;
  *   Example:
  *   
{"payloadBase64":"cGF5bG9hZA==","properties":{"prop1":"value"},"encryptionCtx":{"keysMapBase64":{"key1":"dGVzdDE=","key2":"dGVzdDI="},"keysMetadataMap":{"key1":{"ckms":"cmks-1","version":"v1"},"key2":{"ckms":"cmks-2","version":"v2"}},"metadata":{"ckms":"cmks-1","version":"v1"},"encParamBase64":"cGFyYW0=","algorithm":"algo","compressionType":"LZ4","uncompressedMessageSize":10,"batchSize":10}}
  * </pre>
- * 
- * 
- * 
+ *
+ *
+ *
  */
 public class KinesisSink implements Sink<byte[]> {
 
@@ -94,20 +93,18 @@ public class KinesisSink implements Sink<byte[]> {
     public static final String SECRET_KEY_NAME = "secretKey";
 
     @Override
-    public void write(RecordContext inputRecordContext, byte[] value) throws 
Exception {
-        String partitionedKey = 
StringUtils.isNotBlank(inputRecordContext.getPartitionId())
-                ? inputRecordContext.getPartitionId()
-                : defaultPartitionedKey;
+    public void write(Record<byte[]> record) throws Exception {
+        String partitionedKey = record.getKey().orElse(defaultPartitionedKey);
         partitionedKey = partitionedKey.length() > maxPartitionedKeyLength
                 ? partitionedKey.substring(0, maxPartitionedKeyLength - 1)
                 : partitionedKey; // partitionedKey Length must be at least 
one, and at most 256
         ListenableFuture<UserRecordResult> addRecordResult = 
kinesisProducer.addUserRecord(this.streamName,
                 partitionedKey,
-                createKinesisMessage(kinesisSinkConfig.getMessageFormat(), 
inputRecordContext, value));
+                createKinesisMessage(kinesisSinkConfig.getMessageFormat(), 
record));
         addCallback(addRecordResult,
-                ProducerSendCallback.create(this.streamName, 
inputRecordContext, System.nanoTime()), directExecutor());
+                ProducerSendCallback.create(this.streamName, record, 
System.nanoTime()), directExecutor());
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Published message to kinesis stream {} with size {}", 
streamName, value.length);
+            LOG.debug("Published message to kinesis stream {} with size {}", 
streamName, record.getValue().length);
         }
     }
 
@@ -151,13 +148,13 @@ public class KinesisSink implements Sink<byte[]> {
         if (isNotBlank(awsCredentialPluginName)) {
             return createCredentialProviderWithPlugin(awsCredentialPluginName, 
awsCredentialPluginParam);
         } else {
-            return defaultCredentialProvider(awsCredentialPluginParam);        
    
+            return defaultCredentialProvider(awsCredentialPluginParam);
         }
     }
 
     private static final class ProducerSendCallback implements 
FutureCallback<UserRecordResult> {
 
-        private RecordContext resultContext;
+        private Record<byte[]> resultContext;
         private String streamName;
         private long startTime = 0;
         private final Handle<ProducerSendCallback> recyclerHandle;
@@ -166,9 +163,9 @@ public class KinesisSink implements Sink<byte[]> {
             this.recyclerHandle = recyclerHandle;
         }
 
-        static ProducerSendCallback create(String streamName, RecordContext 
result, long startTime) {
+        static ProducerSendCallback create(String streamName, Record<byte[]> 
resultContext, long startTime) {
             ProducerSendCallback sendCallback = RECYCLER.get();
-            sendCallback.resultContext = result;
+            sendCallback.resultContext = resultContext;
             sendCallback.streamName = streamName;
             sendCallback.startTime = startTime;
             return sendCallback;
@@ -210,7 +207,7 @@ public class KinesisSink implements Sink<byte[]> {
     /**
      * Creates a instance of credential provider which can return {@link 
AWSCredentials} or {@link BasicAWSCredentials}
      * based on IAM user/roles.
-     * 
+     *
      * @param pluginFQClassName
      * @param param
      * @return
@@ -234,7 +231,7 @@ public class KinesisSink implements Sink<byte[]> {
     /**
      * It creates a default credential provider which takes accessKey and 
secretKey form configuration and creates
      * {@link AWSCredentials}
-     * 
+     *
      * @param awsCredentialPluginParam
      * @return
      */
@@ -274,15 +271,15 @@ public class KinesisSink implements Sink<byte[]> {
         };
     }
 
-    public static ByteBuffer createKinesisMessage(MessageFormat msgFormat, 
RecordContext recordCtx, byte[] data) {
+    public static ByteBuffer createKinesisMessage(MessageFormat msgFormat, 
Record<byte[]> record) {
         if (MessageFormat.FULL_MESSAGE_IN_JSON.equals(msgFormat)) {
-            return ByteBuffer.wrap(Utils.serializeRecordToJson(recordCtx, 
data).getBytes());
+            return 
ByteBuffer.wrap(Utils.serializeRecordToJson(record).getBytes());
         } else if (MessageFormat.FULL_MESSAGE_IN_FB.equals(msgFormat)) {
-            return Utils.serializeRecordToFlatBuffer(recordCtx, data);
+            return Utils.serializeRecordToFlatBuffer(record);
         } else {
             // send raw-message
-            return ByteBuffer.wrap(data);
+            return ByteBuffer.wrap(record.getValue());
         }
     }
-    
+
 }
\ No newline at end of file
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
index 469151e..a7382b8 100644
--- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
+++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/Utils.java
@@ -22,20 +22,21 @@ package org.apache.pulsar.io.kinesis;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.Base64.getEncoder;
 
+import com.google.gson.JsonObject;
+
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Optional;
 
 import org.apache.pulsar.common.api.EncryptionContext;
-import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.kinesis.fbs.EncryptionCtx;
 import org.apache.pulsar.io.kinesis.fbs.EncryptionKey;
 import org.apache.pulsar.io.kinesis.fbs.KeyValue;
 import org.apache.pulsar.io.kinesis.fbs.Message;
 
 import com.google.flatbuffers.FlatBufferBuilder;
-import com.google.gson.JsonObject;
 
 public class Utils {
 
@@ -49,25 +50,25 @@ public class Utils {
     private static final String UNCPRESSED_MSG_SIZE_FIELD = 
"uncompressedMessageSize";
     private static final String BATCH_SIZE_FIELD = "batchSize";
     private static final String ENCRYPTION_CTX_FIELD = "encryptionCtx";
-    
+
     private static final FlatBufferBuilder DEFAULT_FB_BUILDER = new 
FlatBufferBuilder(0);
 
     /**
      * Serialize record to flat-buffer. it's not a thread-safe method.
-     * 
-     * @param inputRecordContext
+     *
+     * @param record
      * @param data
      * @return
      */
-    public static ByteBuffer serializeRecordToFlatBuffer(RecordContext 
inputRecordContext, byte[] data) {
+    public static ByteBuffer serializeRecordToFlatBuffer(Record<byte[]> 
record) {
         DEFAULT_FB_BUILDER.clear();
-        return serializeRecordToFlatBuffer(DEFAULT_FB_BUILDER, 
inputRecordContext, data);
+        return serializeRecordToFlatBuffer(DEFAULT_FB_BUILDER, record);
     }
-    
-    public static ByteBuffer serializeRecordToFlatBuffer(FlatBufferBuilder 
builder, RecordContext inputRecordContext, byte[] data) {
-        checkNotNull(inputRecordContext, "record-context can't be null");
-        Optional<EncryptionContext> encryptionCtx = 
inputRecordContext.getEncryptionCtx();
-        Map<String, String> properties = inputRecordContext.getProperties();
+
+    public static ByteBuffer serializeRecordToFlatBuffer(FlatBufferBuilder 
builder, Record<byte[]> record) {
+        checkNotNull(record, "record-context can't be null");
+        Optional<EncryptionContext> encryptionCtx = record.getEncryptionCtx();
+        Map<String, String> properties = record.getProperties();
 
         int encryptionCtxOffset = -1;
         int propertiesOffset = -1;
@@ -86,7 +87,7 @@ public class Utils {
             encryptionCtxOffset = createEncryptionCtxOffset(builder, 
encryptionCtx);
         }
 
-        int payloadOffset = Message.createPayloadVector(builder, data);
+        int payloadOffset = Message.createPayloadVector(builder, 
record.getValue());
         Message.startMessage(builder);
         Message.addPayload(builder, payloadOffset);
         if (encryptionCtxOffset != -1) {
@@ -98,11 +99,11 @@ public class Utils {
         int endMessage = Message.endMessage(builder);
         builder.finish(endMessage);
         ByteBuffer bb = builder.dataBuffer();
-        
+
         // to avoid copying of data, use same byte[] wrapped by ByteBuffer. 
But, ByteBuffer.array() returns entire array
         // so, it requires to read from offset:
         // builder.sizedByteArray()=>copies buffer: sizedByteArray(space, 
bb.capacity() - space)
-        int space = bb.capacity() - builder.offset(); 
+        int space = bb.capacity() - builder.offset();
         return ByteBuffer.wrap(bb.array(), space, bb.capacity() - space);
     }
 
@@ -133,7 +134,7 @@ public class Utils {
             EncryptionKey.addKey(builder, key);
             EncryptionKey.addValue(builder, value);
             if(metadataOffset!=-1) {
-                EncryptionKey.addMetadata(builder, metadataOffset);            
    
+                EncryptionKey.addMetadata(builder, metadataOffset);
             }
             keysOffsets[keyIndex++] = EncryptionKey.endEncryptionKey(builder);
         }
@@ -156,29 +157,31 @@ public class Utils {
         }
         return EncryptionCtx.createEncryptionCtx(builder, keysOffset, param, 
algo, compressionType,
                 ctx.getUncompressedMessageSize(), batchSize, 
ctx.getBatchSize().isPresent());
-    
+
     }
 
     /**
      * Serializes sink-record into json format. It encodes encryption-keys, 
encryption-param and payload in base64
      * format so, it can be sent in json.
-     * 
+     *
      * @param inputRecordContext
      * @param data
      * @return
      */
-    public static String serializeRecordToJson(RecordContext 
inputRecordContext, byte[] data) {
-        checkNotNull(inputRecordContext, "record-context can't be null");
+    public static String serializeRecordToJson(Record<byte[]> record) {
+        checkNotNull(record, "record can't be null");
+
         JsonObject result = new JsonObject();
-        result.addProperty(PAYLOAD_FIELD, getEncoder().encodeToString(data));
-        if (inputRecordContext.getProperties() != null) {
+        result.addProperty(PAYLOAD_FIELD, 
getEncoder().encodeToString(record.getValue()));
+        if (record.getProperties() != null) {
             JsonObject properties = new JsonObject();
-            inputRecordContext.getProperties().entrySet()
+            record.getProperties().entrySet()
                     .forEach(e -> properties.addProperty(e.getKey(), 
e.getValue()));
             result.add(PROPERTIES_FIELD, properties);
         }
-        if (inputRecordContext.getEncryptionCtx().isPresent()) {
-            EncryptionContext encryptionCtx = 
inputRecordContext.getEncryptionCtx().get();
+
+        if (record.getEncryptionCtx().isPresent()) {
+            EncryptionContext encryptionCtx = record.getEncryptionCtx().get();
             JsonObject encryptionCtxJson = new JsonObject();
             JsonObject keyBase64Map = new JsonObject();
             JsonObject keyMetadataMap = new JsonObject();
diff --git 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
index b9f5c8a..45aa0c7 100644
--- 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
+++ 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/UtilsTest.java
@@ -20,16 +20,22 @@ package org.apache.pulsar.io.kinesis;
 
 import static java.util.Base64.getDecoder;
 
+import com.google.gson.Gson;
+
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.Optional;
 
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
-import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.kinesis.fbs.KeyValue;
 import org.apache.pulsar.io.kinesis.fbs.Message;
 import org.testng.Assert;
@@ -37,12 +43,6 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import org.testng.collections.Maps;
 
-import com.google.gson.Gson;
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-
 /**
  * Unit test of {@link UtilsTest}.
  */
@@ -52,7 +52,7 @@ public class UtilsTest {
     public Object[][] encryptionProvider() {
         return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
     }
-    
+
     @Test
     public void testJsonSerialization() throws Exception {
 
@@ -75,9 +75,9 @@ public class UtilsTest {
         Map<String, String> metadata2 = Maps.newHashMap();
         metadata2.put("version", "v2");
         metadata2.put("ckms", "cmks-2");
-        RecordContext recordCtx = createRecordContext(algo, keyNames, 
keyValues, param.getBytes(), metadata1, metadata2,
+        Record<byte[]> recordCtx = createRecord(data, algo, keyNames, 
keyValues, param.getBytes(), metadata1, metadata2,
                 batchSize, compressionMsgSize, properties, true);
-        String json = Utils.serializeRecordToJson(recordCtx, data);
+        String json = Utils.serializeRecordToJson(recordCtx);
 
         // deserialize from json and assert
         KinesisMessageResponse kinesisJsonResponse = 
deSerializeRecordFromJson(json);
@@ -118,9 +118,9 @@ public class UtilsTest {
             Map<String, String> metadata2 = Maps.newHashMap();
             metadata2.put("version", "v2");
             metadata2.put("ckms", "cmks-2");
-            RecordContext recordCtx = createRecordContext(algo, keyNames, 
keyValues, param.getBytes(), metadata1,
+            Record<byte[]> record = createRecord(data, algo, keyNames, 
keyValues, param.getBytes(), metadata1,
                     metadata2, batchSize, compressionMsgSize, properties, 
isEncryption);
-            ByteBuffer flatBuffer = 
Utils.serializeRecordToFlatBuffer(recordCtx, data);
+            ByteBuffer flatBuffer = Utils.serializeRecordToFlatBuffer(record);
 
             Message kinesisJsonResponse = Message.getRootAsMessage(flatBuffer);
             byte[] fbPayloadBytes = new 
byte[kinesisJsonResponse.payloadLength()];
@@ -164,7 +164,7 @@ public class UtilsTest {
                 Assert.assertEquals(param.getBytes(), paramBytes);
                 Assert.assertEquals(algo, encryptionCtxDeser.algo());
             }
-            
+
             Map<String, String> fbproperties = Maps.newHashMap();
             for (int i = 0; i < kinesisJsonResponse.propertiesLength(); i++) {
                 KeyValue property = kinesisJsonResponse.properties(i);
@@ -175,7 +175,7 @@ public class UtilsTest {
         }
     }
 
-    private RecordContext createRecordContext(String algo, String[] keyNames, 
byte[][] keyValues, byte[] param,
+    private Record<byte[]> createRecord(byte[] data, String algo, String[] 
keyNames, byte[][] keyValues, byte[] param,
             Map<String, String> metadata1, Map<String, String> metadata2, int 
batchSize, int compressionMsgSize,
             Map<String, String> properties, boolean isEncryption) {
         EncryptionContext ctx = null;
@@ -198,14 +198,16 @@ public class UtilsTest {
             ctx.setKeys(keys);
             ctx.setParam(param);
         }
-        return new RecordContextImpl(properties, Optional.ofNullable(ctx)); 
+        return new RecordImpl(data, properties, Optional.ofNullable(ctx));
     }
 
-    class RecordContextImpl implements RecordContext {
+    class RecordImpl implements Record<byte[]> {
+        byte[] data;
         Map<String, String> properties;
         Optional<EncryptionContext> ectx;
 
-        public RecordContextImpl(Map<String, String> properties, 
Optional<EncryptionContext> ectx) {
+        public RecordImpl(byte[] data, Map<String, String> properties, 
Optional<EncryptionContext> ectx) {
+            this.data = data;
             this.properties = properties;
             this.ectx = ectx;
         }
@@ -217,6 +219,16 @@ public class UtilsTest {
         public Optional<EncryptionContext> getEncryptionCtx() {
             return ectx;
         }
+
+        @Override
+        public Optional<String> getKey() {
+            return Optional.empty();
+        }
+
+        @Override
+        public byte[] getValue() {
+            return data;
+        }
     }
 
     public static KinesisMessageResponse deSerializeRecordFromJson(String 
jsonRecord) {
diff --git 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index dab3fe9..c05828f 100644
--- 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++ 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -25,15 +25,19 @@ import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import com.rabbitmq.client.DefaultConsumer;
 import com.rabbitmq.client.Envelope;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+
+import lombok.Data;
+
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.Map;
-
 /**
  * A simple connector to consume messages from a RabbitMQ queue
  */
@@ -82,20 +86,13 @@ public class RabbitMQSource extends PushSource<byte[]> {
 
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
-            source.consume(new RabbitMQRecord(body));
+            source.consume(new 
RabbitMQRecord(Optional.ofNullable(envelope.getRoutingKey()), body));
         }
     }
 
+    @Data
     static private class RabbitMQRecord implements Record<byte[]> {
-        private final byte[] data;
-
-        public RabbitMQRecord(byte[] data) {
-            this.data = data;
-        }
-
-        @Override
-        public byte[] getValue() {
-            return data;
-        }
+        private final Optional<String> key;
+        private final byte[] value;
     }
 }
\ No newline at end of file
diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index 56c52e7..494ae48 100644
--- 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++ 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.Record;
@@ -166,6 +167,12 @@ public class TwitterFireHose extends PushSource<String> {
         }
 
         @Override
+        public Optional<String> getKey() {
+            // TODO: Could use user or tweet ID as key here
+            return Optional.empty();
+        }
+
+        @Override
         public String getValue() {
             return tweet;
         }

Reply via email to