This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 423a9af Add context to source and sink (#2098) 423a9af is described below commit 423a9af9973c3df6275f2abd97957bd772e3da43 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Sat Jul 7 11:38:43 2018 -0700 Add context to source and sink (#2098) * adding contexts to sink and source interface * cleaning up * adding unit tests --- .../pulsar/functions/instance/ContextImpl.java | 4 +- .../pulsar/functions/instance/JavaInstance.java | 21 +------- .../functions/instance/JavaInstanceRunnable.java | 37 +++++++++---- .../apache/pulsar/functions/sink/PulsarSink.java | 3 +- .../pulsar/functions/source/PulsarSource.java | 3 +- .../functions/instance/JavaInstanceTest.java | 26 +++------- .../pulsar/functions/source/PulsarSourceTest.java | 9 ++-- .../rest/api/v2/FunctionApiV2ResourceTest.java | 3 +- .../pulsar/io/aerospike/AerospikeAbstractSink.java | 3 +- .../pulsar/io/cassandra/CassandraAbstractSink.java | 3 +- .../java/org/apache/pulsar/io/core/PushSource.java | 3 +- .../java/org/apache/pulsar/io/core/SimpleSink.java | 1 + .../main/java/org/apache/pulsar/io/core/Sink.java | 3 +- .../io/core/{Source.java => SinkContext.java} | 21 ++------ .../java/org/apache/pulsar/io/core/Source.java | 3 +- .../io/core/{Source.java => SourceContext.java} | 21 ++------ .../java/org/apache/pulsar/io/core/SinkTest.java | 60 ++++++++++++++++++++++ .../java/org/apache/pulsar/io/core/SourceTest.java | 59 +++++++++++++++++++++ .../apache/pulsar/io/kafka/KafkaAbstractSink.java | 3 +- .../pulsar/io/kafka/KafkaAbstractSource.java | 3 +- .../org/apache/pulsar/io/kinesis/KinesisSink.java | 3 +- .../apache/pulsar/io/rabbitmq/RabbitMQSource.java | 4 +- .../apache/pulsar/io/twitter/TwitterFireHose.java | 3 +- 23 files changed, 198 insertions(+), 101 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index e2887bf..0a1b965 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -36,6 +36,8 @@ import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.instance.state.StateContextImpl; import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData; import org.apache.pulsar.functions.utils.Reflections; +import org.apache.pulsar.io.core.SinkContext; +import org.apache.pulsar.io.core.SourceContext; import org.slf4j.Logger; import java.io.IOException; @@ -56,7 +58,7 @@ import static com.google.common.base.Preconditions.checkState; /** * This class implements the Context interface exposed to the user. */ -class ContextImpl implements Context { +class ContextImpl implements Context, SinkContext, SourceContext { private InstanceConfig config; private Logger logger; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java index 3bd563e..496b43f 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java @@ -21,16 +21,9 @@ package org.apache.pulsar.functions.instance; import lombok.AccessLevel; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; -import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; -import org.apache.pulsar.io.core.Source; - -import org.apache.pulsar.functions.source.PulsarSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * This is the Java Instance. This is started by the runtimeSpawner using the JavaInstanceClient @@ -45,19 +38,9 @@ public class JavaInstance implements AutoCloseable { private Function function; private java.util.function.Function javaUtilFunction; - public JavaInstance(InstanceConfig config, Object userClassObject, - ClassLoader clsLoader, - PulsarClient pulsarClient, - Source source) { - // TODO: cache logger instances by functions? - Logger instanceLog = LoggerFactory.getLogger("function-" + config.getFunctionDetails().getName()); - - Consumer consumer = null; - if (source instanceof PulsarSource) { - consumer = ((PulsarSource) source).getInputConsumer(); - } + public JavaInstance(ContextImpl contextImpl, Object userClassObject) { - this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, consumer); + this.context = contextImpl; // create the functions if (userClassObject instanceof Function) { diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 5485d72..5b206fb 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -47,6 +47,7 @@ import org.apache.logging.log4j.ThreadContext; import org.apache.logging.log4j.core.LoggerContext; import org.apache.logging.log4j.core.config.Configuration; import org.apache.logging.log4j.core.config.LoggerConfig; +import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.SubscriptionType; @@ -68,6 +69,8 @@ import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.io.core.Record; import org.apache.pulsar.io.core.Sink; import org.apache.pulsar.io.core.Source; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A function container implemented using java thread. @@ -121,7 +124,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { /** * NOTE: this method should be called in the instance thread, in order to make class loading work. */ - JavaInstance setupJavaInstance() throws Exception { + JavaInstance setupJavaInstance(ContextImpl contextImpl) throws Exception { // initialize the thread context ThreadContext.put("function", FunctionDetailsUtils.getFullyQualifiedName(instanceConfig.getFunctionDetails())); ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName()); @@ -143,13 +146,24 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { // start the state table setupStateTable(); // start the output producer - setupOutput(); + setupOutput(contextImpl); // start the input consumer - setupInput(); + setupInput(contextImpl); // start any log topic handler setupLogHandler(); - return new JavaInstance(instanceConfig, object, clsLoader, client, this.source); + return new JavaInstance(contextImpl, object); + } + + ContextImpl setupContext() { + Consumer consumer = null; + if (source instanceof PulsarSource) { + consumer = ((PulsarSource) source).getInputConsumer(); + } + Logger instanceLog = LoggerFactory.getLogger( + "function-" + instanceConfig.getFunctionDetails().getName()); + return new ContextImpl(instanceConfig, instanceLog, client, + Thread.currentThread().getContextClassLoader(), consumer); } /** @@ -158,7 +172,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { @Override public void run() { try { - javaInstance = setupJavaInstance(); + ContextImpl contextImpl = setupContext(); + javaInstance = setupJavaInstance(contextImpl); if (null != stateTable) { StateContextImpl stateContext = new StateContextImpl(stateTable); javaInstance.getContext().setStateContext(stateContext); @@ -438,7 +453,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { config.getRootLogger().removeAppender(logAppender.getName()); } - public void setupInput() throws Exception { + public void setupInput(ContextImpl contextImpl) throws Exception { SourceSpec sourceSpec = this.instanceConfig.getFunctionDetails().getSource(); Object object; @@ -484,14 +499,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { this.source = (Source) object; if (sourceSpec.getConfigs().isEmpty()) { - this.source.open(new HashMap<>()); + this.source.open(new HashMap<>(), contextImpl); } else { this.source.open(new Gson().fromJson(sourceSpec.getConfigs(), - new TypeToken<Map<String, Object>>(){}.getType())); + new TypeToken<Map<String, Object>>(){}.getType()), contextImpl); } } - public void setupOutput() throws Exception { + public void setupOutput(ContextImpl contextImpl) throws Exception { SinkSpec sinkSpec = this.instanceConfig.getFunctionDetails().getSink(); Object object; @@ -522,10 +537,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { throw new RuntimeException("Sink does not implement correct interface"); } if (sinkSpec.getConfigs().isEmpty()) { - this.sink.open(new HashMap<>()); + this.sink.open(new HashMap<>(), contextImpl); } else { this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(), - new TypeToken<Map<String, Object>>() {}.getType())); + new TypeToken<Map<String, Object>>() {}.getType()), contextImpl); } } } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index a561096..adb551d 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -41,6 +41,7 @@ import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.io.core.RecordContext; import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; import java.util.Base64; import java.util.Map; @@ -187,7 +188,7 @@ public class PulsarSink<T> implements Sink<T> { } @Override - public void open(Map<String, Object> config) throws Exception { + public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { // Setup Serialization/Deserialization setupSerDe(); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java index 54373ba..680d754 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java @@ -38,6 +38,7 @@ import org.apache.pulsar.functions.utils.Reflections; import org.apache.pulsar.functions.utils.Utils; import org.apache.pulsar.io.core.Record; import org.apache.pulsar.io.core.Source; +import org.apache.pulsar.io.core.SourceContext; import java.util.ArrayList; import java.util.HashMap; @@ -61,7 +62,7 @@ public class PulsarSource<T> implements Source<T> { } @Override - public void open(Map<String, Object> config) throws Exception { + public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception { // Setup Serialization/Deserialization setupSerDe(); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java index 4c30160..f69172a 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java @@ -18,27 +18,15 @@ */ package org.apache.pulsar.functions.instance; -import static org.mockito.Mockito.mock; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; - import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.functions.api.Function; -import org.apache.pulsar.functions.api.utils.DefaultSerDe; -import org.apache.pulsar.functions.proto.Function.FunctionDetails; -import org.apache.pulsar.functions.proto.Function.SinkSpec; -import org.apache.pulsar.functions.source.PulsarSource; import org.testng.annotations.Test; -public class JavaInstanceTest { +import static org.mockito.Mockito.mock; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; - private static InstanceConfig createInstanceConfig() { - FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); - functionDetailsBuilder.setSink(SinkSpec.newBuilder().setSerDeClassName(DefaultSerDe.class.getName()).build()); - InstanceConfig instanceConfig = new InstanceConfig(); - instanceConfig.setFunctionDetails(functionDetailsBuilder.build()); - return instanceConfig; - } +public class JavaInstanceTest { /** * Verify that be able to run lambda functions. @@ -46,11 +34,9 @@ public class JavaInstanceTest { */ @Test public void testLambda() { - InstanceConfig config = createInstanceConfig(); JavaInstance instance = new JavaInstance( - config, - (Function<String, String>) (input, context) -> input + "-lambda", - null, null, mock(PulsarSource.class)); + mock(ContextImpl.class), + (Function<String, String>) (input, context) -> input + "-lambda"); String testString = "ABC123"; JavaExecutionResult result = instance.handleMessage(MessageId.earliest, "random", testString); assertNotNull(result.getResult()); diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java index a7e3610..baad625 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java @@ -28,6 +28,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.functions.api.SerDe; import org.apache.pulsar.functions.api.utils.DefaultSerDe; import org.apache.pulsar.functions.utils.FunctionConfig; +import org.apache.pulsar.io.core.SourceContext; import org.testng.annotations.Test; import java.io.IOException; @@ -121,7 +122,7 @@ public class PulsarSourceTest { PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); try { - pulsarSource.open(new HashMap<>()); + pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); assertFalse(true); } catch (RuntimeException ex) { log.error("RuntimeException: {}", ex, ex); @@ -145,7 +146,7 @@ public class PulsarSourceTest { pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap); PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); try { - pulsarSource.open(new HashMap<>()); + pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); fail("Should fail constructing java instance if function type is inconsistent with serde type"); } catch (RuntimeException ex) { log.error("RuntimeException: {}", ex, ex); @@ -169,7 +170,7 @@ public class PulsarSourceTest { pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap); PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); - pulsarSource.open(new HashMap<>()); + pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); } /** @@ -184,7 +185,7 @@ public class PulsarSourceTest { pulsarConfig.setTopicSerdeClassNameMap(topicSerdeClassNameMap); PulsarSource pulsarSource = new PulsarSource(getPulsarClient(), pulsarConfig); - pulsarSource.open(new HashMap<>()); + pulsarSource.open(new HashMap<>(), mock(SourceContext.class)); } @Test diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java index 599a30e..0bac0a6 100644 --- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java +++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java @@ -67,6 +67,7 @@ import org.apache.pulsar.functions.worker.request.RequestResult; import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl; import org.apache.pulsar.io.core.RecordContext; import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; import org.glassfish.jersey.media.multipart.FormDataContentDisposition; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; @@ -105,7 +106,7 @@ public class FunctionApiV2ResourceTest { } @Override - public void open(Map config) throws Exception { + public void open(Map config, SinkContext sinkContext) throws Exception { } @Override diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java index 15fcc52..ba29616 100644 --- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java +++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeAbstractSink.java @@ -33,6 +33,7 @@ import com.aerospike.client.policy.ClientPolicy; import com.aerospike.client.policy.WritePolicy; import org.apache.pulsar.io.core.KeyValue; import org.apache.pulsar.io.core.SimpleSink; +import org.apache.pulsar.io.core.SinkContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +58,7 @@ public abstract class AerospikeAbstractSink<K, V> extends SimpleSink<byte[]> { private EventLoop eventLoop; @Override - public void open(Map<String, Object> config) throws Exception { + public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { aerospikeSinkConfig = AerospikeSinkConfig.load(config); if (aerospikeSinkConfig.getSeedHosts() == null || aerospikeSinkConfig.getKeyspace() == null diff --git a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java index 8c330cd..c8a9ca5 100644 --- a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java +++ b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraAbstractSink.java @@ -33,6 +33,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.pulsar.io.core.KeyValue; import org.apache.pulsar.io.core.SimpleSink; +import org.apache.pulsar.io.core.SinkContext; /** * A Simple abstract class for Cassandra sink @@ -47,7 +48,7 @@ public abstract class CassandraAbstractSink<K, V> extends SimpleSink<byte[]> { private PreparedStatement statement; @Override - public void open(Map<String, Object> config) throws Exception { + public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { cassandraSinkConfig = CassandraSinkConfig.load(config); if (cassandraSinkConfig.getRoots() == null || cassandraSinkConfig.getKeyspace() == null diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java index a33ad63..44d8162 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java @@ -51,9 +51,10 @@ public abstract class PushSource<T> implements Source<T> { * Open connector with configuration * * @param config initialization config + * @param sourceContext * @throws Exception IO type exceptions when opening a connector */ - abstract public void open(Map<String, Object> config) throws Exception; + abstract public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception; /** * Attach a consumer function to this Source. This is invoked by the implementation diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java index 2c29bc8..352a059 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.io.core; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java index a84a47e..31a044b 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java @@ -28,9 +28,10 @@ public interface Sink<T> extends AutoCloseable{ * Open connector with configuration * * @param config initialization config + * @param sinkContext * @throws Exception IO type exceptions when opening a connector */ - void open(final Map<String, Object> config) throws Exception; + void open(final Map<String, Object> config, SinkContext sinkContext) throws Exception; /** * Write a message to Sink diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java similarity index 59% copy from pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java copy to pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java index e9ef044..2d58e0c 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java @@ -18,22 +18,11 @@ */ package org.apache.pulsar.io.core; -import java.util.Map; - -public interface Source<T> extends AutoCloseable { - /** - * Open connector with configuration - * - * @param config initialization config - * @throws Exception IO type exceptions when opening a connector - */ - void open(final Map<String, Object> config) throws Exception; - +public interface SinkContext { /** - * Reads the next message from source. - * If source does not have any new messages, this call should block. - * @return next message from source. The return result should never be null - * @throws Exception + * Record a user defined metric + * @param metricName The name of the metric + * @param value The value of the metric */ - Record<T> read() throws Exception; + void recordMetric(String metricName, double value); } diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java index e9ef044..e311761 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java @@ -25,9 +25,10 @@ public interface Source<T> extends AutoCloseable { * Open connector with configuration * * @param config initialization config + * @param sourceContext * @throws Exception IO type exceptions when opening a connector */ - void open(final Map<String, Object> config) throws Exception; + void open(final Map<String, Object> config, SourceContext sourceContext) throws Exception; /** * Reads the next message from source. diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java similarity index 59% copy from pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java copy to pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java index e9ef044..3ea707e 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Source.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java @@ -18,22 +18,11 @@ */ package org.apache.pulsar.io.core; -import java.util.Map; - -public interface Source<T> extends AutoCloseable { - /** - * Open connector with configuration - * - * @param config initialization config - * @throws Exception IO type exceptions when opening a connector - */ - void open(final Map<String, Object> config) throws Exception; - +public interface SourceContext { /** - * Reads the next message from source. - * If source does not have any new messages, this call should block. - * @return next message from source. The return result should never be null - * @throws Exception + * Record a user defined metric + * @param metricName The name of the metric + * @param value The value of the metric */ - Record<T> read() throws Exception; + void recordMetric(String metricName, double value); } diff --git a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java new file mode 100644 index 0000000..4fb4a7d --- /dev/null +++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SinkTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.core; + +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class SinkTest { + + public static class TestSink implements Sink<String> { + + @Override + public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { + sinkContext.recordMetric("foo", 1); + } + + @Override + public void write(RecordContext inputRecordContext, String value) throws Exception { + + } + + @Override + public void close() throws Exception { + + } + } + + @Test + public void testSinkContext() throws Exception { + SinkContext sinkContext = mock(SinkContext.class); + + Sink testSink = spy(TestSink.class); + testSink.open(new HashMap<>(), sinkContext); + + verify(sinkContext, times(1)).recordMetric("foo", 1); + } +} diff --git a/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java new file mode 100644 index 0000000..18ad5c2 --- /dev/null +++ b/pulsar-io/core/src/test/java/org/apache/pulsar/io/core/SourceTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.core; + +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class SourceTest { + public static class TestSource implements Source<String> { + + @Override + public void close() throws Exception { + + } + + @Override + public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception { + sourceContext.recordMetric("foo", 1); + } + + @Override + public Record<String> read() throws Exception { + return null; + } + } + + @Test + public void testSinkContext() throws Exception { + SourceContext sourceContext = mock(SourceContext.class); + + Source testSource = spy(TestSource.class); + testSource.open(new HashMap<>(), sourceContext); + + verify(sourceContext, times(1)).recordMetric("foo", 1); + } +} diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java index 1d6b8b2..c597489 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java @@ -25,6 +25,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.pulsar.io.core.KeyValue; import org.apache.pulsar.io.core.SimpleSink; +import org.apache.pulsar.io.core.SinkContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +73,7 @@ public abstract class KafkaAbstractSink<K, V> extends SimpleSink<byte[]> { } @Override - public void open(Map<String, Object> config) throws Exception { + public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { kafkaSinkConfig = KafkaSinkConfig.load(config); if (kafkaSinkConfig.getTopic() == null || kafkaSinkConfig.getBootstrapServers() == null diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java index 05d90b3..fe8a6a7 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java @@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.pulsar.io.core.PushSource; import org.apache.pulsar.io.core.Record; +import org.apache.pulsar.io.core.SourceContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +50,7 @@ public abstract class KafkaAbstractSource<V> extends PushSource<V> { Thread runnerThread; @Override - public void open(Map<String, Object> config) throws Exception { + public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception { kafkaSourceConfig = KafkaSourceConfig.load(config); if (kafkaSourceConfig.getTopic() == null || kafkaSourceConfig.getBootstrapServers() == null diff --git a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java index eaa2b91..5387a66 100644 --- a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java +++ b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java @@ -35,6 +35,7 @@ import org.apache.commons.lang3.builder.ReflectionToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; import org.apache.pulsar.io.core.RecordContext; import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,7 +121,7 @@ public class KinesisSink implements Sink<byte[]> { } @Override - public void open(Map<String, Object> config) throws Exception { + public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { kinesisSinkConfig = KinesisSinkConfig.load(config); checkArgument(isNotBlank(kinesisSinkConfig.getAwsKinesisStreamName()), "empty kinesis-stream name"); diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java index e17ab5f..dab3fe9 100644 --- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java +++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java @@ -27,12 +27,12 @@ import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import org.apache.pulsar.io.core.PushSource; import org.apache.pulsar.io.core.Record; +import org.apache.pulsar.io.core.SourceContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Map; -import java.util.function.Consumer; /** * A simple connector to consume messages from a RabbitMQ queue @@ -46,7 +46,7 @@ public class RabbitMQSource extends PushSource<byte[]> { private RabbitMQConfig rabbitMQConfig; @Override - public void open(Map<String, Object> config) throws Exception { + public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception { rabbitMQConfig = RabbitMQConfig.load(config); if (rabbitMQConfig.getAmqUri() == null || rabbitMQConfig.getQueueName() == null) { diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java index 5dc495e..56c52e7 100644 --- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java +++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java @@ -36,6 +36,7 @@ import java.util.Map; import org.apache.pulsar.io.core.PushSource; import org.apache.pulsar.io.core.Record; +import org.apache.pulsar.io.core.SourceContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -52,7 +53,7 @@ public class TwitterFireHose extends PushSource<String> { private Object waitObject; @Override - public void open(Map<String, Object> config) throws IOException { + public void open(Map<String, Object> config, SourceContext sourceContext) throws IOException { TwitterFireHoseConfig hoseConfig = TwitterFireHoseConfig.load(config); if (hoseConfig.getConsumerKey() == null || hoseConfig.getConsumerSecret() == null