sijie closed pull request #1636:  Rename Connect `Message` interface to `Record`
URL: https://github.com/apache/incubator-pulsar/pull/1636
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-admin-shaded-for-functions/pom.xml 
b/pulsar-client-admin-shaded-for-functions/pom.xml
index 5953b753dc..1e3833387f 100644
--- a/pulsar-client-admin-shaded-for-functions/pom.xml
+++ b/pulsar-client-admin-shaded-for-functions/pom.xml
@@ -61,6 +61,7 @@
                 <includes>
                   <include>org.apache.pulsar:pulsar-common</include>
                   <include>org.apache.bookkeeper:circe-checksum</include>
+                  <include>org.apache.pulsar:pulsar-connect-core</include>
                   <include>org.apache.pulsar:pulsar-client-original</include>
                   
<include>org.apache.pulsar:pulsar-client-admin-original</include>
                   <!-- client dependencies as below -->
@@ -106,6 +107,12 @@
                     <include>**</include>
                   </includes>
                 </filter>
+                <filter>
+                  <artifact>org.apache.pulsar:pulsar-connect-core</artifact>
+                  <includes>
+                    <include>**</include>
+                  </includes>
+                </filter>
                 <filter>
                   <artifact>org.apache.pulsar:pulsar-client-original</artifact>
                   <includes>
diff --git a/pulsar-client-admin-shaded/pom.xml 
b/pulsar-client-admin-shaded/pom.xml
index fcd934961b..df5f867fa6 100644
--- a/pulsar-client-admin-shaded/pom.xml
+++ b/pulsar-client-admin-shaded/pom.xml
@@ -57,6 +57,7 @@
 
               <artifactSet>
                 <includes>
+                  <include>org.apache.pulsar:pulsar-connect-core</include>
                   <include>org.apache.pulsar:pulsar-client-original</include>
                   
<include>org.apache.pulsar:pulsar-client-admin-original</include>
                   <include>org.apache.commons:commons-lang3</include>
@@ -92,6 +93,12 @@
                     <include>**</include>
                   </includes>
                 </filter>
+                <filter>
+                  <artifact>org.apache.pulsar:pulsar-connect-core</artifact>
+                  <includes>
+                    <include>**</include>
+                  </includes>
+                </filter>
                 <filter>
                   <artifact>org.apache.pulsar:pulsar-client-original</artifact>
                   <includes>
diff --git a/pulsar-client-shaded/pom.xml b/pulsar-client-shaded/pom.xml
index c67c5468c6..14ace60e0f 100644
--- a/pulsar-client-shaded/pom.xml
+++ b/pulsar-client-shaded/pom.xml
@@ -59,6 +59,7 @@
 
               <artifactSet>
                 <includes>
+                  <include>org.apache.pulsar:pulsar-connect-core</include>
                   <include>org.apache.pulsar:pulsar-client-original</include>
                   <include>org.apache.commons:commons-lang3</include>
                   <include>commons-codec:commons-codec</include>
@@ -93,6 +94,12 @@
                     <include>**</include>
                   </includes>
                 </filter>
+                <filter>
+                  <artifact>org.apache.pulsar:pulsar-connect-core</artifact>
+                  <includes>
+                    <include>**</include>
+                  </includes>
+                </filter>
                 <filter>
                   <artifact>org.apache.pulsar:pulsar-client-original</artifact>
                   <includes>
diff --git a/pulsar-client/pom.xml b/pulsar-client/pom.xml
index 267c5e7d74..f6b5b408c7 100644
--- a/pulsar-client/pom.xml
+++ b/pulsar-client/pom.xml
@@ -39,6 +39,12 @@
       <version>${project.parent.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-connect-core</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
index f9ba65caeb..2989d30bb5 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java
@@ -43,10 +43,9 @@
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 
-public class MessageImpl<T> implements Message<T> {
+public class MessageImpl<T> extends MessageRecordImpl<T, MessageId> {
 
     private MessageMetadata.Builder msgMetadataBuilder;
-    private MessageId messageId;
     private ClientCnx cnx;
     private ByteBuf payload;
     private Schema<T> schema;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
new file mode 100644
index 0000000000..5e8128ac3e
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageRecordImpl.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.connect.core.Record;
+
+/**
+ * Abstract class that implements message api and connect record api.
+ */
+public abstract class MessageRecordImpl<T, M extends MessageId> implements 
Message<T>, Record<T> {
+
+    protected M messageId;
+    protected java.util.function.Consumer<M> ackFunction;
+
+    public void setAckFunction(java.util.function.Consumer<M> ackFunction) {
+        this.ackFunction = ackFunction;
+    }
+
+    @Override
+    public String getPartitionId() {
+        if (null != messageId) {
+            if (messageId instanceof MessageIdImpl) {
+                return String.valueOf(((MessageIdImpl) 
messageId).getPartitionIndex());
+            } else {
+                return "";
+            }
+        }
+        return "";
+    }
+
+    @Override
+    public long getRecordSequence() {
+        return getSequenceId();
+    }
+
+    @Override
+    public void ack() {
+        if (null != ackFunction) {
+            ackFunction.accept((M) getMessageId());
+        }
+    }
+
+    @Override
+    public void fail() {
+        // no-op
+    }
+}
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
index 5ae452ee5c..18e2088a1c 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -23,17 +23,16 @@
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 
-public class TopicMessageImpl<T> implements Message<T> {
+public class TopicMessageImpl<T> extends MessageRecordImpl<T, 
TopicMessageIdImpl> {
 
     private final String topicName;
     private final Message<T> msg;
-    private final TopicMessageIdImpl msgId;
 
     TopicMessageImpl(String topicName,
                      Message<T> msg) {
         this.topicName = topicName;
         this.msg = msg;
-        this.msgId = new TopicMessageIdImpl(topicName, msg.getMessageId());
+        this.messageId = new TopicMessageIdImpl(topicName, msg.getMessageId());
     }
 
     /**
@@ -46,11 +45,11 @@ public String getTopicName() {
 
     @Override
     public MessageId getMessageId() {
-        return msgId;
+        return messageId;
     }
 
     public MessageId getInnerMessageId() {
-        return msgId.getInnerMessageId();
+        return messageId.getInnerMessageId();
     }
 
     @Override
diff --git 
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
new file mode 100644
index 0000000000..7df373acf9
--- /dev/null
+++ 
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageRecordImplTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertSame;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pulsar.client.api.MessageId;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class MessageRecordImplTest {
+
+    private MessageRecordImpl<byte[], MessageId> message;
+    private MessageId messageId;
+
+    @BeforeMethod
+    public void setup() {
+        this.messageId = mock(MessageId.class);
+        this.message = mock(MessageRecordImpl.class, CALLS_REAL_METHODS);
+        when(message.getMessageId()).thenReturn(messageId);
+    }
+
+    @Test
+    public void testAck() throws Exception {
+        final AtomicReference<MessageId> msgIdRef = new AtomicReference<>();
+        final CountDownLatch ackLatch = new CountDownLatch(1);
+
+        this.message.setAckFunction(msgId -> {
+            msgIdRef.set(msgId);
+            ackLatch.countDown();
+        });
+
+        this.message.ack();
+        ackLatch.await();
+
+        assertSame(messageId, msgIdRef.get());
+    }
+
+}
diff --git 
a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
 
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
index 73faf053a7..daab7667fd 100644
--- 
a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
+++ 
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSink.java
@@ -32,7 +32,6 @@
 import com.aerospike.client.policy.ClientPolicy;
 import com.aerospike.client.policy.WritePolicy;
 import org.apache.pulsar.common.util.KeyValue;
-import org.apache.pulsar.connect.core.Message;
 import org.apache.pulsar.connect.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +51,7 @@
     private EventLoop eventLoop;
 
     @Override
-    public void open(Map<String, String> config) throws Exception {
+    public void open(Map<String, Object> config) throws Exception {
         aerospikeSinkConfig = AerospikeSinkConfig.load(config);
         if (aerospikeSinkConfig.getSeedHosts() == null
                 || aerospikeSinkConfig.getKeyspace() == null
@@ -78,10 +77,10 @@ public void close() throws Exception {
     }
 
     @Override
-    public CompletableFuture<Void> write(Message<KeyValue<K, V>> tuple) {
+    public CompletableFuture<Void> write(KeyValue<K, V> record) {
         CompletableFuture<Void> future = new CompletableFuture<>();
-        Key key = new Key(aerospikeSinkConfig.getKeyspace(), 
aerospikeSinkConfig.getKeySet(), tuple.getData().getKey().toString());
-        Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), 
Value.getAsBlob(tuple.getData().getValue()));
+        Key key = new Key(aerospikeSinkConfig.getKeyspace(), 
aerospikeSinkConfig.getKeySet(), record.getKey().toString());
+        Bin bin = new Bin(aerospikeSinkConfig.getColumnName(), 
Value.getAsBlob(record.getValue()));
         AWriteListener listener = null;
         try {
             listener = queue.take();
diff --git 
a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java
 
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java
index 72f530b7f5..ef02c80ef3 100644
--- 
a/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java
+++ 
b/pulsar-connect/aerospike/src/main/java/org/apache/pulsar/connect/aerospike/AerospikeSinkConfig.java
@@ -57,7 +57,7 @@ public static AerospikeSinkConfig load(String yamlFile) 
throws IOException {
         return mapper.readValue(new File(yamlFile), AerospikeSinkConfig.class);
     }
 
-    public static AerospikeSinkConfig load(Map<String, String> map) throws 
IOException {
+    public static AerospikeSinkConfig load(Map<String, Object> map) throws 
IOException {
         ObjectMapper mapper = new ObjectMapper();
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
AerospikeSinkConfig.class);
     }
diff --git 
a/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java
 
b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java
index a003d8f2f4..bc87ec6b55 100644
--- 
a/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java
+++ 
b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSink.java
@@ -23,7 +23,6 @@
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import org.apache.pulsar.common.util.KeyValue;
-import org.apache.pulsar.connect.core.Message;
 import org.apache.pulsar.connect.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +45,7 @@
     private PreparedStatement statement;
 
     @Override
-    public void open(Map<String, String> config) throws Exception {
+    public void open(Map<String, Object> config) throws Exception {
         cassandraSinkConfig = CassandraSinkConfig.load(config);
         if (cassandraSinkConfig.getRoots() == null
                 || cassandraSinkConfig.getKeyspace() == null
@@ -67,8 +66,8 @@ public void close() throws Exception {
     }
 
     @Override
-    public CompletableFuture<Void> write(Message<KeyValue<K, V>> tuple) {
-        BoundStatement bound = statement.bind(tuple.getData().getKey(), 
tuple.getData().getValue());
+    public CompletableFuture<Void> write(KeyValue<K, V> record) {
+        BoundStatement bound = statement.bind(record.getKey(), 
record.getValue());
         ResultSetFuture future = session.executeAsync(bound);
         CompletableFuture<Void> completable = new CompletableFuture<Void>();
         Futures.addCallback(future,
diff --git 
a/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSinkConfig.java
 
b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSinkConfig.java
index b21c95e32f..5bcfb52b76 100644
--- 
a/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSinkConfig.java
+++ 
b/pulsar-connect/cassandra/src/main/java/org/apache/pulsar/connect/cassandra/CassandraSinkConfig.java
@@ -50,7 +50,7 @@ public static CassandraSinkConfig load(String yamlFile) 
throws IOException {
         return mapper.readValue(new File(yamlFile), CassandraSinkConfig.class);
     }
 
-    public static CassandraSinkConfig load(Map<String, String> map) throws 
IOException {
+    public static CassandraSinkConfig load(Map<String, Object> map) throws 
IOException {
         ObjectMapper mapper = new ObjectMapper();
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
CassandraSinkConfig.class);
     }
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
index 1da8f78b38..4e6f64bba3 100644
--- 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
+++ 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/PushSource.java
@@ -41,12 +41,12 @@
      * @param config initialization config
      * @throws Exception IO type exceptions when opening a connector
      */
-    void open(final Map<String, String> config) throws Exception;
+    void open(final Map<String, Object> config) throws Exception;
 
     /**
      * Attach a consumer function to this Source. This is invoked by the 
implementation
      * to pass messages whenever there is data to be pushed to Pulsar.
      * @param consumer
      */
-    void setConsumer(Function<Message<T>, CompletableFuture<Void>> consumer);
+    void setConsumer(Function<Record<T>, CompletableFuture<Void>> consumer);
 }
\ No newline at end of file
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Record.java 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Record.java
new file mode 100644
index 0000000000..c5137ea386
--- /dev/null
+++ 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Record.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.connect.core;
+
+/**
+ * Pulsar Connect's Record interface. Record encapsulates the
+ * information about a record being read from a Source.
+ */
+public interface Record<T> extends RecordContext {
+    /**
+     * Retrieves the actual data of the record
+     * @return The record data
+     */
+    T getValue();
+}
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Message.java 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/RecordContext.java
similarity index 60%
rename from 
pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Message.java
rename to 
pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/RecordContext.java
index 1fcdb25702..094ca8c3e7 100644
--- 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Message.java
+++ 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/RecordContext.java
@@ -19,35 +19,30 @@
 package org.apache.pulsar.connect.core;
 
 /**
- * Pulsar Connect's Message interface. Message encapsulates the
- * information about a message being read/written from/to a Source/Sink.
+ * A source context that can be used by the runtime to interact with source.
  */
-public interface Message<T> {
+public interface RecordContext {
+
     /**
-     * Retrieves the partition information if any of the message
+     * Retrieves the partition information if any of the record.
      * @return The partition id where the
      */
     default String getPartitionId() { return null; }
 
     /**
-     * Retrieves the sequence id of the message
-     * @return Sequence Id associated with the message
+     * Retrieves the sequence of the record from a source partition.
+     * @return Sequence Id associated with the record
      */
-    default Long getSequenceId() { return -1L; }
+    default long getRecordSequence() { return -1L; }
 
     /**
-     * Retrieves the actual data of the message
-     * @return The message data
+     * Acknowledge that this record is fully processed
      */
-    T getData();
+    default void ack() {}
 
     /**
-     * Acknowledge that this message is fully processed
+     * To indicate that this record has failed to be processed
      */
-    default void ack() {};
+    default void fail() {}
 
-    /**
-     * To indicate that this message has failed to be processed
-     */
-    default void fail() {};
 }
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
index f7d1b7b64a..ca569e794c 100644
--- a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
+++ b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Sink.java
@@ -37,13 +37,13 @@
      * @param config initialization config
      * @throws Exception IO type exceptions when opening a connector
      */
-    void open(final Map<String, String> config) throws Exception;
+    void open(final Map<String, Object> config) throws Exception;
 
     /**
      * Attempt to publish a type safe collection of messages
      *
-     * @param message Object to publish to the sink
+     * @param value output value
      * @return Completable future fo async publish request
      */
-    CompletableFuture<Void> write(final Message<T> message);
+    CompletableFuture<Void> write(T value);
 }
\ No newline at end of file
diff --git 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
index b63a4b737f..40f1820709 100644
--- 
a/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
+++ 
b/pulsar-connect/core/src/main/java/org/apache/pulsar/connect/core/Source.java
@@ -35,5 +35,5 @@
      * @return next message from source or null, if no new messages are 
available.
      * @throws Exception
      */
-    Message<T> read() throws Exception;
+    Record<T> read() throws Exception;
 }
diff --git 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
index 18919f4afd..fc8e2afddf 100644
--- 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
+++ 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSink.java
@@ -24,7 +24,6 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.pulsar.common.util.KeyValue;
-import org.apache.pulsar.connect.core.Message;
 import org.apache.pulsar.connect.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,9 +47,9 @@
     private KafkaSinkConfig kafkaSinkConfig;
 
     @Override
-    public CompletableFuture<Void> write(Message<KeyValue<K, V>> message) {
-        ProducerRecord<K, V> record = new 
ProducerRecord<>(kafkaSinkConfig.getTopic(), message.getData().getKey(), 
message.getData().getValue());
-        LOG.debug("Message sending to kafka, record={}.", record);
+    public CompletableFuture<Void> write(KeyValue<K, V> message) {
+        ProducerRecord<K, V> record = new 
ProducerRecord<>(kafkaSinkConfig.getTopic(), message.getKey(), 
message.getValue());
+        LOG.debug("Record sending to kafka, record={}.", record);
         Future f = producer.send(record);
         return CompletableFuture.supplyAsync(() -> {
             try {
@@ -69,7 +68,7 @@ public void close() throws IOException {
     }
 
     @Override
-    public void open(Map<String, String> config) throws Exception {
+    public void open(Map<String, Object> config) throws Exception {
         kafkaSinkConfig = KafkaSinkConfig.load(config);
         if (kafkaSinkConfig.getTopic() == null
                 || kafkaSinkConfig.getBootstrapServers() == null
diff --git 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java
 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java
index 45aea78786..6da494ea97 100644
--- 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java
+++ 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSinkConfig.java
@@ -52,7 +52,7 @@ public static KafkaSinkConfig load(String yamlFile) throws 
IOException {
         return mapper.readValue(new File(yamlFile), KafkaSinkConfig.class);
     }
 
-    public static KafkaSinkConfig load(Map<String, String> map) throws 
IOException {
+    public static KafkaSinkConfig load(Map<String, Object> map) throws 
IOException {
         ObjectMapper mapper = new ObjectMapper();
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
KafkaSinkConfig.class);
     }
diff --git 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java
 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java
index f9bb8c1b07..7520ee6ff7 100644
--- 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java
+++ 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSource.java
@@ -24,7 +24,7 @@
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.pulsar.connect.core.Message;
+import org.apache.pulsar.connect.core.Record;
 import org.apache.pulsar.connect.core.PushSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,10 +45,10 @@
     private KafkaSourceConfig kafkaSourceConfig;
     Thread runnerThread;
 
-    private java.util.function.Function<Message<V>, CompletableFuture<Void>> 
consumeFunction;
+    private java.util.function.Function<Record<V>, CompletableFuture<Void>> 
consumeFunction;
 
     @Override
-    public void open(Map<String, String> config) throws Exception {
+    public void open(Map<String, Object> config) throws Exception {
         kafkaSourceConfig = KafkaSourceConfig.load(config);
         if (kafkaSourceConfig.getTopic() == null
                 || kafkaSourceConfig.getBootstrapServers() == null
@@ -101,8 +101,8 @@ public void start() {
                 CompletableFuture<?>[] futures = new 
CompletableFuture<?>[records.count()];
                 int index = 0;
                 for (ConsumerRecord<String, V> record : records) {
-                    LOG.debug("Message received from kafka, key: {}. value: 
{}", record.key(), record.value());
-                    futures[index] = consumeFunction.apply(new 
KafkaMesssage<>(record));
+                    LOG.debug("Record received from kafka, key: {}. value: 
{}", record.key(), record.value());
+                    futures[index] = consumeFunction.apply(new 
KafkaRecord<>(record));
                     index++;
                 }
                 if (!kafkaSourceConfig.isAutoCommitEnabled()) {
@@ -121,14 +121,14 @@ public void start() {
     }
 
     @Override
-    public void setConsumer(java.util.function.Function<Message<V>, 
CompletableFuture<Void>> consumeFunction) {
+    public void setConsumer(java.util.function.Function<Record<V>, 
CompletableFuture<Void>> consumeFunction) {
         this.consumeFunction = consumeFunction;
     }
 
-    static private class KafkaMesssage<V> implements Message<V>  {
-        ConsumerRecord<String, V> record;
+    static private class KafkaRecord<V> implements Record<V> {
+        private final ConsumerRecord<String, V> record;
 
-        public KafkaMesssage(ConsumerRecord<String, V> record) {
+        public KafkaRecord(ConsumerRecord<String, V> record) {
             this.record = record;
 
         }
@@ -138,12 +138,12 @@ public String getPartitionId() {
         }
 
         @Override
-        public Long getSequenceId() {
+        public long getRecordSequence() {
             return record.offset();
         }
 
         @Override
-        public V getData() {
+        public V getValue() {
             return record.value();
         }
     }
diff --git 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java
 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java
index 77fd77b309..0d41b1b8ba 100644
--- 
a/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java
+++ 
b/pulsar-connect/kafka/src/main/java/org/apache/pulsar/connect/kafka/KafkaSourceConfig.java
@@ -54,7 +54,7 @@ public static KafkaSourceConfig load(String yamlFile) throws 
IOException {
         return mapper.readValue(new File(yamlFile), KafkaSourceConfig.class);
     }
 
-    public static KafkaSourceConfig load(Map<String, String> map) throws 
IOException {
+    public static KafkaSourceConfig load(Map<String, Object> map) throws 
IOException {
         ObjectMapper mapper = new ObjectMapper();
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
KafkaSourceConfig.class);
     }
diff --git 
a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java
 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java
index 1d7268e3f3..e76b03f8c0 100644
--- 
a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java
+++ 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQConfig.java
@@ -48,7 +48,7 @@ public static RabbitMQConfig load(String yamlFile) throws 
IOException {
         return mapper.readValue(new File(yamlFile), RabbitMQConfig.class);
     }
 
-    public static RabbitMQConfig load(Map<String, String> map) throws 
IOException {
+    public static RabbitMQConfig load(Map<String, Object> map) throws 
IOException {
         ObjectMapper mapper = new ObjectMapper();
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
RabbitMQConfig.class);
     }
diff --git 
a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
index 847791a7a9..c548f998cf 100644
--- 
a/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
+++ 
b/pulsar-connect/rabbitmq/src/main/java/org/apache/pulsar/connect/rabbitmq/RabbitMQSource.java
@@ -20,7 +20,7 @@
 package org.apache.pulsar.connect.rabbitmq;
 
 import com.rabbitmq.client.*;
-import org.apache.pulsar.connect.core.Message;
+import org.apache.pulsar.connect.core.Record;
 import org.apache.pulsar.connect.core.PushSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,18 +37,18 @@
 
     private static Logger logger = 
LoggerFactory.getLogger(RabbitMQSource.class);
 
-    private Function<Message<byte[]>, CompletableFuture<Void>> consumer;
+    private Function<Record<byte[]>, CompletableFuture<Void>> consumer;
     private Connection rabbitMQConnection;
     private Channel rabbitMQChannel;
     private RabbitMQConfig rabbitMQConfig;
 
     @Override
-    public void setConsumer(Function<Message<byte[]>, CompletableFuture<Void>> 
consumeFunction) {
+    public void setConsumer(Function<Record<byte[]>, CompletableFuture<Void>> 
consumeFunction) {
         this.consumer = consumeFunction;
     }
 
     @Override
-    public void open(Map<String, String> config) throws Exception {
+    public void open(Map<String, Object> config) throws Exception {
         rabbitMQConfig = RabbitMQConfig.load(config);
         if (rabbitMQConfig.getAmqUri() == null
                 || rabbitMQConfig.getQueueName() == null) {
@@ -75,28 +75,28 @@ public void close() throws Exception {
     }
 
     private class RabbitMQConsumer extends DefaultConsumer {
-        private Function<Message<byte[]>, CompletableFuture<Void>> 
consumeFunction;
+        private Function<Record<byte[]>, CompletableFuture<Void>> 
consumeFunction;
 
-        public RabbitMQConsumer(Function<Message<byte[]>, 
CompletableFuture<Void>> consumeFunction, Channel channel) {
+        public RabbitMQConsumer(Function<Record<byte[]>, 
CompletableFuture<Void>> consumeFunction, Channel channel) {
             super(channel);
             this.consumeFunction = consumeFunction;
         }
 
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
-            consumeFunction.apply(new RabbitMQMessage(body));
+            consumeFunction.apply(new RabbitMQRecord(body));
         }
     }
 
-    static private class RabbitMQMessage implements Message<byte[]> {
-        private byte[] data;
+    static private class RabbitMQRecord implements Record<byte[]> {
+        private final byte[] data;
 
-        public RabbitMQMessage(byte[] data) {
+        public RabbitMQRecord(byte[] data) {
             this.data = data;
         }
 
         @Override
-        public byte[] getData() {
+        public byte[] getValue() {
             return data;
         }
     }
diff --git 
a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
index 3f2316e315..1dcbb17104 100644
--- 
a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
+++ 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHose.java
@@ -26,7 +26,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
-import org.apache.pulsar.connect.core.Message;
+import org.apache.pulsar.connect.core.Record;
 import org.apache.pulsar.connect.core.PushSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,10 +52,10 @@
 
     // ----- Runtime fields
     private Object waitObject;
-    private Function<Message<String>, CompletableFuture<Void>> consumeFunction;
+    private Function<Record<String>, CompletableFuture<Void>> consumeFunction;
 
     @Override
-    public void open(Map<String, String> config) throws IOException {
+    public void open(Map<String, Object> config) throws IOException {
         TwitterFireHoseConfig hoseConfig = TwitterFireHoseConfig.load(config);
         if (hoseConfig.getConsumerKey() == null
                 || hoseConfig.getConsumerSecret() == null
@@ -68,7 +68,7 @@ public void open(Map<String, String> config) throws 
IOException {
     }
 
     @Override
-    public void setConsumer(Function<Message<String>, CompletableFuture<Void>> 
consumeFunction) {
+    public void setConsumer(Function<Record<String>, CompletableFuture<Void>> 
consumeFunction) {
         this.consumeFunction = consumeFunction;
     }
 
@@ -127,7 +127,7 @@ public boolean process() throws IOException, 
InterruptedException {
                             // We don't really care if the future succeeds or 
not.
                             // However might be in the future to count failures
                             // TODO:- Figure out the metrics story for 
connectors
-                            consumeFunction.apply(new TwitterMessage(line));
+                            consumeFunction.apply(new TwitterRecord(line));
                         } catch (Exception e) {
                             LOG.error("Exception thrown");
                         }
@@ -165,15 +165,15 @@ private void stopThread() {
         }
     }
 
-    static private class TwitterMessage implements Message<String> {
+    static private class TwitterRecord implements Record<String> {
         private String tweet;
 
-        public TwitterMessage(String tweet) {
+        public TwitterRecord(String tweet) {
             this.tweet = tweet;
         }
 
         @Override
-        public String getData() {
+        public String getValue() {
             return tweet;
         }
     }
diff --git 
a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java
 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java
index f0614bde78..57782e2e08 100644
--- 
a/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java
+++ 
b/pulsar-connect/twitter/src/main/java/org/apache/pulsar/connect/twitter/TwitterFireHoseConfig.java
@@ -56,7 +56,7 @@ public static TwitterFireHoseConfig load(String yamlFile) 
throws IOException {
         return mapper.readValue(new File(yamlFile), 
TwitterFireHoseConfig.class);
     }
 
-    public static TwitterFireHoseConfig load(Map<String, String> map) throws 
IOException {
+    public static TwitterFireHoseConfig load(Map<String, Object> map) throws 
IOException {
         ObjectMapper mapper = new ObjectMapper();
         return mapper.readValue(new ObjectMapper().writeValueAsString(map), 
TwitterFireHoseConfig.class);
     }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
new file mode 100644
index 0000000000..8e0d37aa3d
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.sink;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.connect.core.RecordContext;
+import org.apache.pulsar.connect.core.Sink;
+
+/**
+ * The default implementation of runtime sink.
+ *
+ * @param <T>
+ */
+public class DefaultRuntimeSink<T> implements RuntimeSink<T> {
+
+    public static <T> DefaultRuntimeSink<T> of(Sink<T> sink) {
+        return new DefaultRuntimeSink<>(sink);
+    }
+
+    private final Sink<T> sink;
+
+    private DefaultRuntimeSink(Sink<T> sink) {
+        this.sink = sink;
+    }
+
+    /**
+     * Open connector with configuration
+     *
+     * @param config initialization config
+     * @throws Exception IO type exceptions when opening a connector
+     */
+    @Override
+    public void open(final Map<String, Object> config) throws Exception {
+        sink.open(config);
+    }
+
+    /**
+     * Attempt to publish a type safe collection of messages
+     *
+     * @param value output value
+     * @return Completable future fo async publish request
+     */
+    @Override
+    public CompletableFuture<Void> write(T value) {
+        return sink.write(value);
+    }
+
+    @Override
+    public void close() throws Exception {
+        sink.close();
+    }
+}
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
new file mode 100644
index 0000000000..63a48ecb96
--- /dev/null
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.sink;
+
+import org.apache.pulsar.connect.core.RecordContext;
+import org.apache.pulsar.connect.core.Sink;
+
+/**
+ * This class extends connect sink.
+ *
+ * <p>Runtime should interact sink rather than interact directly to the public 
{@link Sink} interface.
+ *
+ * <p>There is a default implementation provided for wrapping up the user 
provided {@link Sink}. Pulsar sink
+ * should be implemented using this interface to ensure supporting 
effective-once.
+ */
+public interface RuntimeSink<T> extends Sink<T> {
+
+    /**
+     * Write the <tt>value</tt>value.
+     *
+     * <p>The implementation of this class is responsible for notifying the 
runtime whether the input record
+     * for generating this value is done with processing by {@link 
RecordContext#ack} and {@link RecordContext#fail}.
+     *
+     * @param inputRecordContext input record context
+     * @param value output value computed from the runtime.
+     */
+    default void write(RecordContext inputRecordContext, T value) {
+        write(value)
+            .thenAccept(ignored -> inputRecordContext.ack())
+            .exceptionally(cause -> {
+                inputRecordContext.fail();
+                return null;
+            });
+    }
+
+}
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
new file mode 100644
index 0000000000..7c58c30985
--- /dev/null
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.sink;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.pulsar.connect.core.RecordContext;
+import org.apache.pulsar.connect.core.Sink;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Unit test {@link DefaultRuntimeSink}.
+ */
+public class DefaultRuntimeSinkTest {
+
+    private Sink<String> mockSink;
+    private RuntimeSink<String> runtimeSink;
+
+    @BeforeMethod
+    public void setup() {
+        this.mockSink = mock(Sink.class);
+        this.runtimeSink = DefaultRuntimeSink.of(mockSink);
+    }
+
+    @Test
+    public void testOpen() throws Exception {
+        this.runtimeSink.open(Collections.emptyMap());
+
+        verify(mockSink, times(1)).open(any(Map.class));
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        this.runtimeSink.close();
+
+        verify(mockSink, times(1)).close();
+    }
+
+    @Test
+    public void testWrite() {
+        this.runtimeSink.write("test-record");
+        verify(mockSink, times(1)).write(eq("test-record"));
+    }
+
+    @Test
+    public void testWriteAck() {
+        RecordContext context = mock(RecordContext.class);
+
+        CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+        writeFuture.complete(null);
+        when(mockSink.write(anyString())).thenReturn(writeFuture);
+
+        runtimeSink.write(context, "test-record");
+
+        verify(context, times(1)).ack();
+    }
+
+    @Test
+    public void testWriteFail() {
+        RecordContext context = mock(RecordContext.class);
+
+        CompletableFuture<Void> writeFuture = new CompletableFuture<>();
+        writeFuture.completeExceptionally(new Exception("test-exception"));
+        when(mockSink.write(anyString())).thenReturn(writeFuture);
+
+        runtimeSink.write(context, "test-record");
+
+        verify(context, times(1)).fail();
+    }
+}
diff --git a/pulsar-functions/utils/pom.xml b/pulsar-functions/utils/pom.xml
index e4bde12609..94a2c509af 100644
--- a/pulsar-functions/utils/pom.xml
+++ b/pulsar-functions/utils/pom.xml
@@ -47,6 +47,10 @@
           <groupId>org.apache.pulsar</groupId>
           <artifactId>pulsar-functions-proto</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.pulsar</groupId>
+          <artifactId>pulsar-connect-core</artifactId>
+        </exclusion>
         <exclusion>
           <groupId>org.apache.pulsar</groupId>
           <artifactId>pulsar-client-original</artifactId>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to