merlimat closed pull request #1792: Modifying sink interface to be generic
URL: https://github.com/apache/incubator-pulsar/pull/1792
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 4aaed5bab3..5b5e943e84 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -56,10 +56,8 @@
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
-import org.apache.pulsar.functions.sink.DefaultRuntimeSink;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.sink.PulsarSinkConfig;
-import org.apache.pulsar.functions.sink.RuntimeSink;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
@@ -106,7 +104,7 @@
     private Record currentRecord;
 
     private Source source;
-    private RuntimeSink sink;
+    private Sink sink;
 
     public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 FunctionCacheManager fnCache,
@@ -524,10 +522,8 @@ public void setupOutput() throws Exception {
                     Thread.currentThread().getContextClassLoader());
         }
 
-        if (object instanceof RuntimeSink) {
-            this.sink = (RuntimeSink) object;
-        } else if (object instanceof Sink) {
-            this.sink = DefaultRuntimeSink.of((Sink) object);
+        if (object instanceof Sink) {
+            this.sink = (Sink) object;
         } else {
             throw new RuntimeException("Sink does not implement correct 
interface");
         }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 61deeff9e4..4fccb54451 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -39,13 +39,14 @@
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.utils.FunctionConfig;
 import org.apache.pulsar.io.core.RecordContext;
+import org.apache.pulsar.io.core.Sink;
 
 import java.util.Base64;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 @Slf4j
-public class PulsarSink<T> implements RuntimeSink<T> {
+public class PulsarSink<T> implements Sink<T> {
 
     private PulsarClient client;
     private PulsarSinkConfig pulsarSinkConfig;
@@ -206,11 +207,6 @@ public void open(Map<String, Object> config) throws 
Exception {
         
this.pulsarSinkProcessor.initializeOutputProducer(this.pulsarSinkConfig.getTopic());
     }
 
-    @Override
-    public CompletableFuture<Void> write(T value) {
-        return null;
-    }
-
     @Override
     public void write(RecordContext recordContext, T value) throws Exception {
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
deleted file mode 100644
index e9c8dc5b43..0000000000
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/RuntimeSink.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.sink;
-
-import org.apache.pulsar.io.core.RecordContext;
-import org.apache.pulsar.io.core.Sink;
-
-/**
- * This class extends connect sink.
- *
- * <p>Runtime should interact sink rather than interact directly to the public 
{@link Sink} interface.
- *
- * <p>There is a default implementation provided for wrapping up the user 
provided {@link Sink}. Pulsar sink
- * should be implemented using this interface to ensure supporting 
effective-once.
- */
-public interface RuntimeSink<T> extends Sink<T>{
-
-    /**
-     * Write the <tt>value</tt>value.
-     *
-     * <p>The implementation of this class is responsible for notifying the 
runtime whether the input record
-     * for generating this value is done with processing by {@link 
RecordContext#ack} and {@link RecordContext#fail}.
-     *
-     * @param inputRecordContext input record context
-     * @param value output value computed from the runtime.
-     */
-    default void write(RecordContext inputRecordContext, T value) throws 
Exception {
-        write(value)
-            .thenAccept(ignored -> inputRecordContext.ack())
-            .exceptionally(cause -> {
-                inputRecordContext.fail();
-                return null;
-            });
-    }
-}
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
deleted file mode 100644
index 018a968632..0000000000
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/DefaultRuntimeSinkTest.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.functions.sink;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.io.core.RecordContext;
-import org.apache.pulsar.io.core.Sink;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-/**
- * Unit test {@link DefaultRuntimeSink}.
- */
-public class DefaultRuntimeSinkTest {
-
-    private Sink<String> mockSink;
-    private RuntimeSink<String> runtimeSink;
-
-    @BeforeMethod
-    public void setup() {
-        this.mockSink = mock(Sink.class);
-        this.runtimeSink = DefaultRuntimeSink.of(mockSink);
-    }
-
-    @Test
-    public void testOpen() throws Exception {
-        this.runtimeSink.open(Collections.emptyMap());
-
-        verify(mockSink, times(1)).open(any(Map.class));
-    }
-
-    @Test
-    public void testClose() throws Exception {
-        this.runtimeSink.close();
-
-        verify(mockSink, times(1)).close();
-    }
-
-    @Test
-    public void testWrite() throws Exception {
-        this.runtimeSink.write("test-record");
-        verify(mockSink, times(1)).write(eq("test-record"));
-    }
-
-    @Test
-    public void testWriteAck() throws Exception {
-        RecordContext context = mock(RecordContext.class);
-
-        CompletableFuture<Void> writeFuture = new CompletableFuture<>();
-        writeFuture.complete(null);
-        when(mockSink.write(anyString())).thenReturn(writeFuture);
-
-        runtimeSink.write(context, "test-record");
-
-        verify(context, times(1)).ack();
-    }
-
-    @Test
-    public void testWriteFail() throws Exception {
-        RecordContext context = mock(RecordContext.class);
-
-        CompletableFuture<Void> writeFuture = new CompletableFuture<>();
-        writeFuture.completeExceptionally(new Exception("test-exception"));
-        when(mockSink.write(anyString())).thenReturn(writeFuture);
-
-        runtimeSink.write(context, "test-record");
-
-        verify(context, times(1)).fail();
-    }
-}
diff --git 
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
 
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
index 34df2aacae..f1390c12d5 100644
--- 
a/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
+++ 
b/pulsar-io/aerospike/src/main/java/org/apache/pulsar/io/aerospike/AerospikeSink.java
@@ -32,6 +32,7 @@
 import com.aerospike.client.policy.ClientPolicy;
 import com.aerospike.client.policy.WritePolicy;
 import org.apache.pulsar.common.util.KeyValue;
+import org.apache.pulsar.io.core.SimpleSink;
 import org.apache.pulsar.io.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,7 +45,7 @@
 /**
  * Simple AeroSpike sink
  */
-public class AerospikeSink<K, V> implements Sink<KeyValue<K, V>> {
+public class AerospikeSink<K, V> extends SimpleSink<KeyValue<K, V>> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AerospikeSink.class);
 
diff --git 
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
 
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
index 14abc9bd64..9aa09e92a8 100644
--- 
a/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
+++ 
b/pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraSink.java
@@ -28,6 +28,7 @@
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import org.apache.pulsar.common.util.KeyValue;
+import org.apache.pulsar.io.core.SimpleSink;
 import org.apache.pulsar.io.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,7 +40,7 @@
  * Simple Cassandra sink
  * Takes in a KeyValue and writes it to a predefined 
keyspace/columnfamily/columnname.
  */
-public class CassandraSink<K, V> implements Sink<KeyValue<K, V>> {
+public class CassandraSink<K, V> extends SimpleSink<KeyValue<K, V>> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(CassandraSink.class);
 
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
 b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
similarity index 53%
rename from 
pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
rename to pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
index 86cd5b53b5..2c29bc8850 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/DefaultRuntimeSink.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SimpleSink.java
@@ -16,38 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.sink;
+package org.apache.pulsar.io.core;
 
-import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import org.apache.pulsar.io.core.Sink;
 
 /**
- * The default implementation of runtime sink.
- *
- * @param <T>
+ * A simpler version of the Sink interface users can extend for use cases to
+ * don't require fine grained delivery control
  */
-public class DefaultRuntimeSink<T> implements RuntimeSink<T> {
-
-    public static <T> DefaultRuntimeSink<T> of(Sink<T> sink) {
-        return new DefaultRuntimeSink<>(sink);
-    }
-
-    private final Sink<T> sink;
+public abstract class SimpleSink<T> implements Sink<T> {
 
-    private DefaultRuntimeSink(Sink<T> sink) {
-        this.sink = sink;
-    }
-
-    /**
-     * Open connector with configuration
-     *
-     * @param config initialization config
-     * @throws Exception IO type exceptions when opening a connector
-     */
     @Override
-    public void open(final Map<String, Object> config) throws Exception {
-        sink.open(config);
+    public void write(RecordContext inputRecordContext, T value) throws 
Exception {
+        write(value)
+                .thenAccept(ignored -> inputRecordContext.ack())
+                .exceptionally(cause -> {
+                    inputRecordContext.fail();
+                    return null;
+                });
     }
 
     /**
@@ -56,13 +42,5 @@ public void open(final Map<String, Object> config) throws 
Exception {
      * @param value output value
      * @return Completable future fo async publish request
      */
-    @Override
-    public CompletableFuture<Void> write(T value) {
-        return sink.write(value);
-    }
-
-    @Override
-    public void close() throws Exception {
-        sink.close();
-    }
+    public abstract CompletableFuture<Void> write(T value);
 }
diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
index 48a58e7146..0265e77ce8 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/Sink.java
@@ -19,18 +19,11 @@
 package org.apache.pulsar.io.core;
 
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 
 /**
- * Pulsar's Sink interface. Sink read data from
- * a Pulsar topic and write it to external sinks(kv store, database, 
filesystem ,etc)
- * The lifcycle of a Sink is to open it passing any config needed
- * by it to initialize(like open network connection, authenticate, etc).
- * On every message from the designated PulsarTopic, the write method is
- * invoked which writes the message to the external sink. One can use close
- * at the end of the session to do any cleanup
+ * Generic sink interface users can implement to run Sink on top of Pulsar 
Functions
  */
-public interface Sink<T> extends AutoCloseable {
+public interface Sink<T> extends AutoCloseable{
     /**
      * Open connector with configuration
      *
@@ -45,5 +38,13 @@
      * @param value output value
      * @return Completable future fo async publish request
      */
-    CompletableFuture<Void> write(T value);
+
+
+    /**
+     * Write a message to Sink
+     * @param inputRecordContext Context of value
+     * @param value value to write to sink
+     * @throws Exception
+     */
+    void write(RecordContext inputRecordContext, T value) throws Exception;
 }
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
index 13d65ab94d..08ca65274a 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSink.java
@@ -24,6 +24,7 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.pulsar.common.util.KeyValue;
+import org.apache.pulsar.io.core.SimpleSink;
 import org.apache.pulsar.io.core.Sink;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +39,7 @@
 /**
  * Simple Kafka Sink to publish messages to a Kafka topic
  */
-public class KafkaSink<K, V> implements Sink<KeyValue<K, V>> {
+public class KafkaSink<K, V> extends SimpleSink<KeyValue<K, V>> {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to