This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 5e4f2bb Modifying sink interface to be generic (#1792) 5e4f2bb is described below commit 5e4f2bbbf791e216e2eb6c50088c1c5910f0ec20 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Thu May 17 10:25:24 2018 -0700 Modifying sink interface to be generic (#1792) --- .../functions/instance/JavaInstanceRunnable.java | 10 +-- .../apache/pulsar/functions/sink/PulsarSink.java | 8 +- .../apache/pulsar/functions/sink/RuntimeSink.java | 51 ------------ .../functions/sink/DefaultRuntimeSinkTest.java | 96 ---------------------- .../apache/pulsar/io/aerospike/AerospikeSink.java | 3 +- .../apache/pulsar/io/cassandra/CassandraSink.java | 3 +- .../java/org/apache/pulsar/io/core/SimpleSink.java | 46 +++-------- .../main/java/org/apache/pulsar/io/core/Sink.java | 21 ++--- .../java/org/apache/pulsar/io/kafka/KafkaSink.java | 3 +- 9 files changed, 34 insertions(+), 207 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java index 4aaed5b..5b5e943 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java @@ -56,10 +56,8 @@ import org.apache.pulsar.functions.api.Function; import org.apache.pulsar.functions.proto.InstanceCommunication; import org.apache.pulsar.functions.proto.Function.SourceSpec; import org.apache.pulsar.functions.proto.Function.SinkSpec; -import org.apache.pulsar.functions.sink.DefaultRuntimeSink; import org.apache.pulsar.functions.sink.PulsarSink; import org.apache.pulsar.functions.sink.PulsarSinkConfig; -import org.apache.pulsar.functions.sink.RuntimeSink; import org.apache.pulsar.functions.source.PulsarRecord; import org.apache.pulsar.functions.source.PulsarSource; import org.apache.pulsar.functions.source.PulsarSourceConfig; @@ -106,7 +104,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { private Record currentRecord; private Source source; - private RuntimeSink sink; + private Sink sink; public JavaInstanceRunnable(InstanceConfig instanceConfig, FunctionCacheManager fnCache, @@ -524,10 +522,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable { Thread.currentThread().getContextClassLoader()); } - if (object instanceof RuntimeSink) { - this.sink = (RuntimeSink) object; - } else if (object instanceof Sink) { - this.sink = DefaultRuntimeSink.of((Sink) object); + if (object instanceof Sink) { + this.sink = (Sink) object; } else { throw new RuntimeException("Sink does not implement correct interface"); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java index 61deeff..4fccb54 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java @@ -39,13 +39,14 @@ import org.apache.pulsar.functions.instance.producers.Producers; import org.apache.pulsar.functions.source.PulsarRecord; import org.apache.pulsar.functions.utils.FunctionConfig; import org.apache.pulsar.io.core.RecordContext; +import org.apache.pulsar.io.core.Sink; import java.util.Base64; import java.util.Map; import java.util.concurrent.CompletableFuture; @Slf4j -public class PulsarSink<T> implements RuntimeSink<T> { +public class PulsarSink<T> implements Sink<T> { private PulsarClient client; private PulsarSinkConfig pulsarSinkConfig; @@ -207,11 +208,6 @@ public class PulsarSink<T> implements RuntimeSink<T> { } @Override - public CompletableFuture<Void> write(T value) { - return null; - } - - @Override public void write(RecordContext recordContext, T value) throws Exception { byte[] output; diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java deleted file mode 100644 index e9c8dc5..0000000 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.sink; - -import org.apache.pulsar.io.core.RecordContext; -import org.apache.pulsar.io.core.Sink; - -/** - * This class extends connect sink. - * - * <p>Runtime should interact sink rather than interact directly to the public {@link Sink} interface. - * - * <p>There is a default implementation provided for wrapping up the user provided {@link Sink}. Pulsar sink - * should be implemented using this interface to ensure supporting effective-once. - */ -public interface RuntimeSink<T> extends Sink<T>{ - - /** - * Write the <tt>value</tt>value. - * - * <p>The implementation of this class is responsible for notifying the runtime whether the input record - * for generating this value is done with processing by {@link RecordContext#ack} and {@link RecordContext#fail}. - * - * @param inputRecordContext input record context - * @param value output value computed from the runtime. - */ - default void write(RecordContext inputRecordContext, T value) throws Exception { - write(value) - .thenAccept(ignored -> inputRecordContext.ack()) - .exceptionally(cause -> { - inputRecordContext.fail(); - return null; - }); - } -} diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java deleted file mode 100644 index 018a968..0000000 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pulsar.functions.sink; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import org.apache.pulsar.io.core.RecordContext; -import org.apache.pulsar.io.core.Sink; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -/** - * Unit test {@link DefaultRuntimeSink}. - */ -public class DefaultRuntimeSinkTest { - - private Sink<String> mockSink; - private RuntimeSink<String> runtimeSink; - - @BeforeMethod - public void setup() { - this.mockSink = mock(Sink.class); - this.runtimeSink = DefaultRuntimeSink.of(mockSink); - } - - @Test - public void testOpen() throws Exception { - this.runtimeSink.open(Collections.emptyMap()); - - verify(mockSink, times(1)).open(any(Map.class)); - } - - @Test - public void testClose() throws Exception { - this.runtimeSink.close(); - - verify(mockSink, times(1)).close(); - } - - @Test - public void testWrite() throws Exception { - this.runtimeSink.write("test-record"); - verify(mockSink, times(1)).write(eq("test-record")); - } - - @Test - public void testWriteAck() throws Exception { - RecordContext context = mock(RecordContext.class); - - CompletableFuture<Void> writeFuture = new CompletableFuture<>(); - writeFuture.complete(null); - when(mockSink.write(anyString())).thenReturn(writeFuture); - - runtimeSink.write(context, "test-record"); - - verify(context, times(1)).ack(); - } - - @Test - public void testWriteFail() throws Exception { - RecordContext context = mock(RecordContext.class); - - CompletableFuture<Void> writeFuture = new CompletableFuture<>(); - writeFuture.completeExceptionally(new Exception("test-exception")); - when(mockSink.write(anyString())).thenReturn(writeFuture); - - runtimeSink.write(context, "test-record"); - - verify(context, times(1)).fail(); - } -} diff --git a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java index 34df2aa..f1390c1 100644 --- a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java +++ b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java @@ -32,6 +32,7 @@ import com.aerospike.client.listener.WriteListener; import com.aerospike.client.policy.ClientPolicy; import com.aerospike.client.policy.WritePolicy; import org.apache.pulsar.common.util.KeyValue; +import org.apache.pulsar.io.core.SimpleSink; import org.apache.pulsar.io.core.Sink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +45,7 @@ import java.util.concurrent.LinkedBlockingDeque; /** * Simple AeroSpike sink */ -public class AerospikeSink<K, V> implements Sink<KeyValue<K, V>> { +public class AerospikeSink<K, V> extends SimpleSink<KeyValue<K, V>> { private static final Logger LOG = LoggerFactory.getLogger(AerospikeSink.class); diff --git a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java index 14abc9b..9aa09e9 100644 --- a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java +++ b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java @@ -28,6 +28,7 @@ import com.datastax.driver.core.Session; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import org.apache.pulsar.common.util.KeyValue; +import org.apache.pulsar.io.core.SimpleSink; import org.apache.pulsar.io.core.Sink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +40,7 @@ import java.util.concurrent.CompletableFuture; * Simple Cassandra sink * Takes in a KeyValue and writes it to a predefined keyspace/columnfamily/columnname. */ -public class CassandraSink<K, V> implements Sink<KeyValue<K, V>> { +public class CassandraSink<K, V> extends SimpleSink<KeyValue<K, V>> { private static final Logger LOG = LoggerFactory.getLogger(CassandraSink.class); diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java similarity index 53% rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java rename to pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java index 86cd5b5..2c29bc8 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java @@ -16,38 +16,24 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pulsar.functions.sink; +package org.apache.pulsar.io.core; -import java.util.Map; import java.util.concurrent.CompletableFuture; -import org.apache.pulsar.io.core.Sink; /** - * The default implementation of runtime sink. - * - * @param <T> + * A simpler version of the Sink interface users can extend for use cases to + * don't require fine grained delivery control */ -public class DefaultRuntimeSink<T> implements RuntimeSink<T> { - - public static <T> DefaultRuntimeSink<T> of(Sink<T> sink) { - return new DefaultRuntimeSink<>(sink); - } - - private final Sink<T> sink; +public abstract class SimpleSink<T> implements Sink<T> { - private DefaultRuntimeSink(Sink<T> sink) { - this.sink = sink; - } - - /** - * Open connector with configuration - * - * @param config initialization config - * @throws Exception IO type exceptions when opening a connector - */ @Override - public void open(final Map<String, Object> config) throws Exception { - sink.open(config); + public void write(RecordContext inputRecordContext, T value) throws Exception { + write(value) + .thenAccept(ignored -> inputRecordContext.ack()) + .exceptionally(cause -> { + inputRecordContext.fail(); + return null; + }); } /** @@ -56,13 +42,5 @@ public class DefaultRuntimeSink<T> implements RuntimeSink<T> { * @param value output value * @return Completable future fo async publish request */ - @Override - public CompletableFuture<Void> write(T value) { - return sink.write(value); - } - - @Override - public void close() throws Exception { - sink.close(); - } + public abstract CompletableFuture<Void> write(T value); } diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java index 48a58e7..0265e77 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java @@ -19,18 +19,11 @@ package org.apache.pulsar.io.core; import java.util.Map; -import java.util.concurrent.CompletableFuture; /** - * Pulsar's Sink interface. Sink read data from - * a Pulsar topic and write it to external sinks(kv store, database, filesystem ,etc) - * The lifcycle of a Sink is to open it passing any config needed - * by it to initialize(like open network connection, authenticate, etc). - * On every message from the designated PulsarTopic, the write method is - * invoked which writes the message to the external sink. One can use close - * at the end of the session to do any cleanup + * Generic sink interface users can implement to run Sink on top of Pulsar Functions */ -public interface Sink<T> extends AutoCloseable { +public interface Sink<T> extends AutoCloseable{ /** * Open connector with configuration * @@ -45,5 +38,13 @@ public interface Sink<T> extends AutoCloseable { * @param value output value * @return Completable future fo async publish request */ - CompletableFuture<Void> write(T value); + + + /** + * Write a message to Sink + * @param inputRecordContext Context of value + * @param value value to write to sink + * @throws Exception + */ + void write(RecordContext inputRecordContext, T value) throws Exception; } diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java index 13d65ab..08ca652 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.pulsar.common.util.KeyValue; +import org.apache.pulsar.io.core.SimpleSink; import org.apache.pulsar.io.core.Sink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +39,7 @@ import java.util.concurrent.Future; /** * Simple Kafka Sink to publish messages to a Kafka topic */ -public class KafkaSink<K, V> implements Sink<KeyValue<K, V>> { +public class KafkaSink<K, V> extends SimpleSink<KeyValue<K, V>> { private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class); -- To stop receiving notification emails like this one, please contact mme...@apache.org.