This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 423a9af  Add context to source and sink (#2098)
423a9af is described below

commit 423a9af9973c3df6275f2abd97957bd772e3da43
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Sat Jul 7 11:38:43 2018 -0700

    Add context to source and sink (#2098)
    
    * adding contexts to sink and source interface
    
    * cleaning up
    
    * adding unit tests
---
 .../pulsar/functions/instance/ContextImpl.java     |  4 +-
 .../pulsar/functions/instance/JavaInstance.java    | 21 +-------
 .../functions/instance/JavaInstanceRunnable.java   | 37 +++++++++----
 .../apache/pulsar/functions/sink/PulsarSink.java   |  3 +-
 .../pulsar/functions/source/PulsarSource.java      |  3 +-
 .../functions/instance/JavaInstanceTest.java       | 26 +++-------
 .../pulsar/functions/source/PulsarSourceTest.java  |  9 ++--
 .../rest/api/v2/FunctionApiV2ResourceTest.java     |  3 +-
 .../pulsar/io/aerospike/AerospikeAbstractSink.java |  3 +-
 .../pulsar/io/cassandra/CassandraAbstractSink.java |  3 +-
 .../java/org/apache/pulsar/io/core/PushSource.java |  3 +-
 .../java/org/apache/pulsar/io/core/SimpleSink.java |  1 +
 .../main/java/org/apache/pulsar/io/core/Sink.java  |  3 +-
 .../io/core/{Source.java => SinkContext.java}      | 21 ++------
 .../java/org/apache/pulsar/io/core/Source.java     |  3 +-
 .../io/core/{Source.java => SourceContext.java}    | 21 ++------
 .../java/org/apache/pulsar/io/core/SinkTest.java   | 60 ++++++++++++++++++++++
 .../java/org/apache/pulsar/io/core/SourceTest.java | 59 +++++++++++++++++++++
 .../apache/pulsar/io/kafka/KafkaAbstractSink.java  |  3 +-
 .../pulsar/io/kafka/KafkaAbstractSource.java       |  3 +-
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  |  3 +-
 .../apache/pulsar/io/rabbitmq/RabbitMQSource.java  |  4 +-
 .../apache/pulsar/io/twitter/TwitterFireHose.java  |  3 +-
 23 files changed, 198 insertions(+), 101 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index e2887bf..0a1b965 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -36,6 +36,8 @@ import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
 import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
 
 import java.io.IOException;
@@ -56,7 +58,7 @@ import static com.google.common.base.Preconditions.checkState;
 /**
  * This class implements the Context interface exposed to the user.
  */
-class ContextImpl implements Context {
+class ContextImpl implements Context, SinkContext, SourceContext {
     private InstanceConfig config;
     private Logger logger;
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 3bd563e..496b43f 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -21,16 +21,9 @@ package org.apache.pulsar.functions.instance;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
-import org.apache.pulsar.io.core.Source;
-
-import org.apache.pulsar.functions.source.PulsarSource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * This is the Java Instance. This is started by the runtimeSpawner using the 
JavaInstanceClient
@@ -45,19 +38,9 @@ public class JavaInstance implements AutoCloseable {
     private Function function;
     private java.util.function.Function javaUtilFunction;
 
-    public JavaInstance(InstanceConfig config, Object userClassObject,
-                 ClassLoader clsLoader,
-                 PulsarClient pulsarClient,
-                 Source source) {
-        // TODO: cache logger instances by functions?
-        Logger instanceLog = LoggerFactory.getLogger("function-" + 
config.getFunctionDetails().getName());
-
-        Consumer consumer = null;
-        if (source instanceof PulsarSource) {
-            consumer = ((PulsarSource) source).getInputConsumer();
-        }
+    public JavaInstance(ContextImpl contextImpl, Object userClassObject) {
 
-        this.context = new ContextImpl(config, instanceLog, pulsarClient, 
clsLoader, consumer);
+        this.context = contextImpl;
 
         // create the functions
         if (userClassObject instanceof Function) {
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 5485d72..5b206fb 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -47,6 +47,7 @@ import org.apache.logging.log4j.ThreadContext;
 import org.apache.logging.log4j.core.LoggerContext;
 import org.apache.logging.log4j.core.config.Configuration;
 import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -68,6 +69,8 @@ import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.Source;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A function container implemented using java thread.
@@ -121,7 +124,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
     /**
      * NOTE: this method should be called in the instance thread, in order to 
make class loading work.
      */
-    JavaInstance setupJavaInstance() throws Exception {
+    JavaInstance setupJavaInstance(ContextImpl contextImpl) throws Exception {
         // initialize the thread context
         ThreadContext.put("function", 
FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
         ThreadContext.put("functionname", 
instanceConfig.getFunctionDetails().getName());
@@ -143,13 +146,24 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         // start the state table
         setupStateTable();
         // start the output producer
-        setupOutput();
+        setupOutput(contextImpl);
         // start the input consumer
-        setupInput();
+        setupInput(contextImpl);
         // start any log topic handler
         setupLogHandler();
 
-        return new JavaInstance(instanceConfig, object, clsLoader, client, 
this.source);
+        return new JavaInstance(contextImpl, object);
+    }
+
+    ContextImpl setupContext() {
+        Consumer consumer = null;
+        if (source instanceof PulsarSource) {
+            consumer = ((PulsarSource) source).getInputConsumer();
+        }
+        Logger instanceLog = LoggerFactory.getLogger(
+                "function-" + instanceConfig.getFunctionDetails().getName());
+        return new ContextImpl(instanceConfig, instanceLog, client,
+                Thread.currentThread().getContextClassLoader(), consumer);
     }
 
     /**
@@ -158,7 +172,8 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
     @Override
     public void run() {
         try {
-            javaInstance = setupJavaInstance();
+            ContextImpl contextImpl = setupContext();
+            javaInstance = setupJavaInstance(contextImpl);
             if (null != stateTable) {
                 StateContextImpl stateContext = new 
StateContextImpl(stateTable);
                 javaInstance.getContext().setStateContext(stateContext);
@@ -438,7 +453,7 @@ public class JavaInstanceRunnable implements AutoCloseable, 
Runnable {
         config.getRootLogger().removeAppender(logAppender.getName());
     }
 
-    public void setupInput() throws Exception {
+    public void setupInput(ContextImpl contextImpl) throws Exception {
 
         SourceSpec sourceSpec = 
this.instanceConfig.getFunctionDetails().getSource();
         Object object;
@@ -484,14 +499,14 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
         this.source = (Source) object;
 
         if (sourceSpec.getConfigs().isEmpty()) {
-            this.source.open(new HashMap<>());
+            this.source.open(new HashMap<>(), contextImpl);
         } else {
             this.source.open(new Gson().fromJson(sourceSpec.getConfigs(),
-                    new TypeToken<Map<String, Object>>(){}.getType()));
+                    new TypeToken<Map<String, Object>>(){}.getType()), 
contextImpl);
         }
     }
 
-    public void setupOutput() throws Exception {
+    public void setupOutput(ContextImpl contextImpl) throws Exception {
 
         SinkSpec sinkSpec = this.instanceConfig.getFunctionDetails().getSink();
         Object object;
@@ -522,10 +537,10 @@ public class JavaInstanceRunnable implements 
AutoCloseable, Runnable {
             throw new RuntimeException("Sink does not implement correct 
interface");
         }
         if (sinkSpec.getConfigs().isEmpty()) {
-            this.sink.open(new HashMap<>());
+            this.sink.open(new HashMap<>(), contextImpl);
         } else {
             this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(),
-                    new TypeToken<Map<String, Object>>() {}.getType()));
+                    new TypeToken<Map<String, Object>>() {}.getType()), 
contextImpl);
         }
     }
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index a561096..adb551d 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -41,6 +41,7 @@ import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.io.core.RecordContext;
 import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
 
 import java.util.Base64;
 import java.util.Map;
@@ -187,7 +188,7 @@ public class PulsarSink<T> implements Sink<T> {
     }
 
     @Override
-    public void open(Map<String, Object> config) throws Exception {
+    public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
 
         // Setup Serialization/Deserialization
         setupSerDe();
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 54373ba..680d754 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.Record;
 import org.apache.pulsar.io.core.Source;
+import org.apache.pulsar.io.core.SourceContext;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -61,7 +62,7 @@ public class PulsarSource<T> implements Source<T> {
     }
 
     @Override
-    public void open(Map<String, Object> config) throws Exception {
+    public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
         // Setup Serialization/Deserialization
         setupSerDe();
 
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index 4c30160..f69172a 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -18,27 +18,15 @@
  */
 package org.apache.pulsar.functions.instance;
 
-import static org.mockito.Mockito.mock;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.functions.api.Function;
-import org.apache.pulsar.functions.api.utils.DefaultSerDe;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
-import org.apache.pulsar.functions.proto.Function.SinkSpec;
-import org.apache.pulsar.functions.source.PulsarSource;
 import org.testng.annotations.Test;
 
-public class JavaInstanceTest {
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 
-    private static InstanceConfig createInstanceConfig() {
-        FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder();
-        
functionDetailsBuilder.setSink(SinkSpec.newBuilder().setSerDeClassName(DefaultSerDe.class.getName()).build());
-        InstanceConfig instanceConfig = new InstanceConfig();
-        instanceConfig.setFunctionDetails(functionDetailsBuilder.build());
-        return instanceConfig;
-    }
+public class JavaInstanceTest {
 
     /**
      * Verify that be able to run lambda functions.
@@ -46,11 +34,9 @@ public class JavaInstanceTest {
      */
     @Test
     public void testLambda() {
-        InstanceConfig config = createInstanceConfig();
         JavaInstance instance = new JavaInstance(
-            config,
-            (Function<String, String>) (input, context) -> input + "-lambda",
-            null, null, mock(PulsarSource.class));
+            mock(ContextImpl.class),
+            (Function<String, String>) (input, context) -> input + "-lambda");
         String testString = "ABC123";
         JavaExecutionResult result = 
instance.handleMessage(MessageId.earliest, "random", testString);
         assertNotNull(result.getResult());
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index a7e3610..baad625 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.api.utils.DefaultSerDe;
 import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.apache.pulsar.io.core.SourceContext;
 import org.testng.annotations.Test;
 
 import java.io.IOException;
@@ -121,7 +122,7 @@ public class PulsarSourceTest {
         PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
 
         try {
-            pulsarSource.open(new HashMap<>());
+            pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
             assertFalse(true);
         } catch (RuntimeException ex) {
             log.error("RuntimeException: {}", ex, ex);
@@ -145,7 +146,7 @@ public class PulsarSourceTest {
         pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
         PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
         try {
-            pulsarSource.open(new HashMap<>());
+            pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
             fail("Should fail constructing java instance if function type is 
inconsistent with serde type");
         } catch (RuntimeException ex) {
             log.error("RuntimeException: {}", ex, ex);
@@ -169,7 +170,7 @@ public class PulsarSourceTest {
         pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
         PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
 
-        pulsarSource.open(new HashMap<>());
+        pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
     }
 
     /**
@@ -184,7 +185,7 @@ public class PulsarSourceTest {
         pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap);
         PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), 
pulsarConfig);
 
-        pulsarSource.open(new HashMap<>());
+        pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
     }
 
     @Test
diff --git 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 599a30e..0bac0a6 100644
--- 
a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ 
b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -67,6 +67,7 @@ import 
org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.apache.pulsar.io.core.RecordContext;
 import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -105,7 +106,7 @@ public class FunctionApiV2ResourceTest {
         }
 
         @Override
-        public void open(Map config) throws Exception {
+        public void open(Map config, SinkContext sinkContext) throws Exception 
{
         }
 
         @Override
diff --git 
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
 
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
index 15fcc52..ba29616 100644
--- 
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
+++ 
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java
@@ -33,6 +33,7 @@ import com.aerospike.client.policy.ClientPolicy;
 import com.aerospike.client.policy.WritePolicy;
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.SimpleSink;
+import org.apache.pulsar.io.core.SinkContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +58,7 @@ public abstract class AerospikeAbstractSink<K, V> extends 
SimpleSink<byte[]> {
     private EventLoop eventLoop;
 
     @Override
-    public void open(Map<String, Object> config) throws Exception {
+    public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
         aerospikeSinkConfig = AerospikeSinkConfig.load(config);
         if (aerospikeSinkConfig.getSeedHosts() == null
                 || aerospikeSinkConfig.getKeyspace() == null
diff --git 
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
 
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
index 8c330cd..c8a9ca5 100644
--- 
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
+++ 
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CompletableFuture;
 
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.SimpleSink;
+import org.apache.pulsar.io.core.SinkContext;
 
 /**
  * A Simple abstract class for Cassandra sink
@@ -47,7 +48,7 @@ public abstract class CassandraAbstractSink<K, V> extends 
SimpleSink<byte[]> {
     private PreparedStatement statement;
 
     @Override
-    public void open(Map<String, Object> config) throws Exception {
+    public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
         cassandraSinkConfig = CassandraSinkConfig.load(config);
         if (cassandraSinkConfig.getRoots() == null
                 || cassandraSinkConfig.getKeyspace() == null
diff --git 
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
index a33ad63..44d8162 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
@@ -51,9 +51,10 @@ public abstract class PushSource<T> implements Source<T> {
      * Open connector with configuration
      *
      * @param config initialization config
+     * @param sourceContext
      * @throws Exception IO type exceptions when opening a connector
      */
-    abstract public void open(Map<String, Object> config) throws Exception;
+    abstract public void open(Map<String, Object> config, SourceContext 
sourceContext) throws Exception;
 
     /**
      * Attach a consumer function to this Source. This is invoked by the 
implementation
diff --git 
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
index 2c29bc8..352a059 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.io.core;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /**
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
index a84a47e..31a044b 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
@@ -28,9 +28,10 @@ public interface Sink<T> extends AutoCloseable{
      * Open connector with configuration
      *
      * @param config initialization config
+     * @param sinkContext
      * @throws Exception IO type exceptions when opening a connector
      */
-    void open(final Map<String, Object> config) throws Exception;
+    void open(final Map<String, Object> config, SinkContext sinkContext) 
throws Exception;
     
     /**
      * Write a message to Sink
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
similarity index 59%
copy from pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java
copy to pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
index e9ef044..2d58e0c 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java
@@ -18,22 +18,11 @@
  */
 package org.apache.pulsar.io.core;
 
-import java.util.Map;
-
-public interface Source<T> extends AutoCloseable {
-    /**
-     * Open connector with configuration
-     *
-     * @param config initialization config
-     * @throws Exception IO type exceptions when opening a connector
-     */
-    void open(final Map<String, Object> config) throws Exception;
-
+public interface SinkContext {
     /**
-     * Reads the next message from source.
-     * If source does not have any new messages, this call should block.
-     * @return next message from source.  The return result should never be 
null
-     * @throws Exception
+     * Record a user defined metric
+     * @param metricName The name of the metric
+     * @param value The value of the metric
      */
-    Record<T> read() throws Exception;
+    void recordMetric(String metricName, double value);
 }
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java
index e9ef044..e311761 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java
@@ -25,9 +25,10 @@ public interface Source<T> extends AutoCloseable {
      * Open connector with configuration
      *
      * @param config initialization config
+     * @param sourceContext
      * @throws Exception IO type exceptions when opening a connector
      */
-    void open(final Map<String, Object> config) throws Exception;
+    void open(final Map<String, Object> config, SourceContext sourceContext) 
throws Exception;
 
     /**
      * Reads the next message from source.
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
similarity index 59%
copy from pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java
copy to 
pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
index e9ef044..3ea707e 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java
@@ -18,22 +18,11 @@
  */
 package org.apache.pulsar.io.core;
 
-import java.util.Map;
-
-public interface Source<T> extends AutoCloseable {
-    /**
-     * Open connector with configuration
-     *
-     * @param config initialization config
-     * @throws Exception IO type exceptions when opening a connector
-     */
-    void open(final Map<String, Object> config) throws Exception;
-
+public interface SourceContext {
     /**
-     * Reads the next message from source.
-     * If source does not have any new messages, this call should block.
-     * @return next message from source.  The return result should never be 
null
-     * @throws Exception
+     * Record a user defined metric
+     * @param metricName The name of the metric
+     * @param value The value of the metric
      */
-    Record<T> read() throws Exception;
+    void recordMetric(String metricName, double value);
 }
diff --git 
a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java 
b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
new file mode 100644
index 0000000..4fb4a7d
--- /dev/null
+++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.core;
+
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class SinkTest {
+
+    public static class TestSink implements Sink<String> {
+
+        @Override
+        public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
+            sinkContext.recordMetric("foo", 1);
+        }
+
+        @Override
+        public void write(RecordContext inputRecordContext, String value) 
throws Exception {
+
+        }
+
+        @Override
+        public void close() throws Exception {
+
+        }
+    }
+
+    @Test
+    public void testSinkContext() throws Exception {
+        SinkContext sinkContext = mock(SinkContext.class);
+
+        Sink testSink = spy(TestSink.class);
+        testSink.open(new HashMap<>(), sinkContext);
+
+        verify(sinkContext, times(1)).recordMetric("foo", 1);
+    }
+}
diff --git 
a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java 
b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java
new file mode 100644
index 0000000..18ad5c2
--- /dev/null
+++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.core;
+
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class SourceTest {
+    public static class TestSource implements Source<String> {
+
+        @Override
+        public void close() throws Exception {
+
+        }
+
+        @Override
+        public void open(Map<String, Object> config, SourceContext 
sourceContext) throws Exception {
+            sourceContext.recordMetric("foo", 1);
+        }
+
+        @Override
+        public Record<String> read() throws Exception {
+            return null;
+        }
+    }
+
+    @Test
+    public void testSinkContext() throws Exception {
+        SourceContext sourceContext = mock(SourceContext.class);
+
+        Source testSource = spy(TestSource.class);
+        testSource.open(new HashMap<>(), sourceContext);
+
+        verify(sourceContext, times(1)).recordMetric("foo", 1);
+    }
+}
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
index 1d6b8b2..c597489 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.pulsar.io.core.KeyValue;
 import org.apache.pulsar.io.core.SimpleSink;
+import org.apache.pulsar.io.core.SinkContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,7 +73,7 @@ public abstract class KafkaAbstractSink<K, V> extends 
SimpleSink<byte[]> {
     }
 
     @Override
-    public void open(Map<String, Object> config) throws Exception {
+    public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
         kafkaSinkConfig = KafkaSinkConfig.load(config);
         if (kafkaSinkConfig.getTopic() == null
                 || kafkaSinkConfig.getBootstrapServers() == null
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
index 05d90b3..fe8a6a7 100644
--- 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
+++ 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +50,7 @@ public abstract class KafkaAbstractSource<V> extends 
PushSource<V> {
     Thread runnerThread;
 
     @Override
-    public void open(Map<String, Object> config) throws Exception {
+    public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
         kafkaSourceConfig = KafkaSourceConfig.load(config);
         if (kafkaSourceConfig.getTopic() == null
                 || kafkaSourceConfig.getBootstrapServers() == null
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index eaa2b91..5387a66 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -35,6 +35,7 @@ import 
org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 import org.apache.pulsar.io.core.RecordContext;
 import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -120,7 +121,7 @@ public class KinesisSink implements Sink<byte[]> {
     }
 
     @Override
-    public void open(Map<String, Object> config) throws Exception {
+    public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
         kinesisSinkConfig = KinesisSinkConfig.load(config);
 
         checkArgument(isNotBlank(kinesisSinkConfig.getAwsKinesisStreamName()), 
"empty kinesis-stream name");
diff --git 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index e17ab5f..dab3fe9 100644
--- 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++ 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -27,12 +27,12 @@ import com.rabbitmq.client.DefaultConsumer;
 import com.rabbitmq.client.Envelope;
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.function.Consumer;
 
 /**
  * A simple connector to consume messages from a RabbitMQ queue
@@ -46,7 +46,7 @@ public class RabbitMQSource extends PushSource<byte[]> {
     private RabbitMQConfig rabbitMQConfig;
 
     @Override
-    public void open(Map<String, Object> config) throws Exception {
+    public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
         rabbitMQConfig = RabbitMQConfig.load(config);
         if (rabbitMQConfig.getAmqUri() == null
                 || rabbitMQConfig.getQueueName() == null) {
diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index 5dc495e..56c52e7 100644
--- 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++ 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -36,6 +36,7 @@ import java.util.Map;
 
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.Record;
+import org.apache.pulsar.io.core.SourceContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,7 +53,7 @@ public class TwitterFireHose extends PushSource<String> {
     private Object waitObject;
 
     @Override
-    public void open(Map<String, Object> config) throws IOException {
+    public void open(Map<String, Object> config, SourceContext sourceContext) 
throws IOException {
         TwitterFireHoseConfig hoseConfig = TwitterFireHoseConfig.load(config);
         if (hoseConfig.getConsumerKey() == null
                 || hoseConfig.getConsumerSecret() == null

Reply via email to