This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f84222   Implementing PushSource on top of Source (#1766)
0f84222 is described below

commit 0f84222deff90acbb5bab99cc56e9e6f673ad7ef
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Mon May 14 12:02:13 2018 -0700

     Implementing PushSource on top of Source (#1766)
    
    * Implementing PushSource on top of Source
    
    * adding missing license header
    
    * fix formatting
---
 .../java/org/apache/pulsar/io/core/PushSource.java | 48 ++++++++++++++++++----
 .../org/apache/pulsar/io/kafka/KafkaSource.java    | 39 +++++++++++-------
 .../apache/pulsar/io/rabbitmq/RabbitMQSource.java  | 23 +++++------
 .../apache/pulsar/io/twitter/TwitterFireHose.java  | 13 +++---
 4 files changed, 81 insertions(+), 42 deletions(-)

diff --git 
a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java 
b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
index 8111488..011f8ab 100644
--- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
+++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java
@@ -19,34 +19,66 @@
 package org.apache.pulsar.io.core;
 
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Consumer;
 
 /**
  * Pulsar's Push Source interface. PushSource read data from
  * external sources(database changes, twitter firehose, etc)
  * and publish to a Pulsar topic. The reason its called Push is
- * because PushSources get passed a consumption Function that they
+ * because PushSources get passed a consumer that they
  * invoke whenever they have data to be published to Pulsar.
  * The lifcycle of a PushSource is to open it passing any config needed
  * by it to initialize(like open network connection, authenticate, etc).
- * A consumer Function is then to it which is invoked by the source whenever
+ * A consumer  is then to it which is invoked by the source whenever
  * there is data to be published. Once all data has been read, one can use 
close
  * at the end of the session to do any cleanup
  */
-public interface PushSource<T> extends AutoCloseable {
+public abstract class PushSource<T> implements Source<T> {
+
+    private LinkedBlockingQueue<Record<T>> queue;
+    private static final int DEFAULT_QUEUE_LENGTH = 1000;
+
+    public PushSource() {
+        this.queue = new LinkedBlockingQueue<>(this.getQueueLength());
+        this.setConsumer(new Consumer<Record<T>>() {
+            @Override
+            public void accept(Record<T> record) {
+                try {
+                    queue.put(record);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+    }
+
+    @Override
+    public Record<T> read() throws Exception {
+        return queue.take();
+    }
+
     /**
      * Open connector with configuration
      *
      * @param config initialization config
      * @throws Exception IO type exceptions when opening a connector
      */
-    void open(final Map<String, Object> config) throws Exception;
+    abstract public void open(Map<String, Object> config) throws Exception;
 
     /**
      * Attach a consumer function to this Source. This is invoked by the 
implementation
      * to pass messages whenever there is data to be pushed to Pulsar.
      * @param consumer
      */
-    void setConsumer(Function<Record<T>, CompletableFuture<Void>> consumer);
-}
\ No newline at end of file
+    abstract public void setConsumer(Consumer<Record<T>> consumer);
+
+    /**
+     * Get length of the queue that records are push onto
+     * Users can override this method to customize the queue length
+     * @return queue length
+     */
+    public int getQueueLength() {
+        return DEFAULT_QUEUE_LENGTH;
+    }
+}
diff --git 
a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java 
b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java
index a157955..6618c1c 100644
--- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java
+++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.io.kafka;
 
+import lombok.Getter;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -38,7 +39,7 @@ import java.util.concurrent.ExecutionException;
 /**
  * Simple Kafka Source to transfer messages from a Kafka topic
  */
-public class KafkaSource<V> implements PushSource<V> {
+public class KafkaSource<V> extends PushSource<V> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSource.class);
 
@@ -47,7 +48,7 @@ public class KafkaSource<V> implements PushSource<V> {
     private KafkaSourceConfig kafkaSourceConfig;
     Thread runnerThread;
 
-    private java.util.function.Function<Record<V>, CompletableFuture<Void>> 
consumeFunction;
+    private java.util.function.Consumer<Record<V>> consumeFunction;
 
     @Override
     public void open(Map<String, Object> config) throws Exception {
@@ -77,6 +78,11 @@ public class KafkaSource<V> implements PushSource<V> {
     }
 
     @Override
+    public void setConsumer(java.util.function.Consumer<Record<V>> 
consumerFunction) {
+        this.consumeFunction = consumerFunction;
+    }
+
+    @Override
     public void close() throws InterruptedException {
         LOG.info("Stopping kafka source");
         if (runnerThread != null) {
@@ -97,14 +103,16 @@ public class KafkaSource<V> implements PushSource<V> {
             consumer = new KafkaConsumer<>(props);
             consumer.subscribe(Arrays.asList(kafkaSourceConfig.getTopic()));
             LOG.info("Kafka source started.");
-            ConsumerRecords<String, V> records;
+            ConsumerRecords<String, V> consumerRecords;
             while(true){
-                records = consumer.poll(1000);
-                CompletableFuture<?>[] futures = new 
CompletableFuture<?>[records.count()];
+                consumerRecords = consumer.poll(1000);
+                CompletableFuture<?>[] futures = new 
CompletableFuture<?>[consumerRecords.count()];
                 int index = 0;
-                for (ConsumerRecord<String, V> record : records) {
-                    LOG.debug("Record received from kafka, key: {}. value: 
{}", record.key(), record.value());
-                    futures[index] = consumeFunction.apply(new 
KafkaRecord<>(record));
+                for (ConsumerRecord<String, V> consumerRecord : 
consumerRecords) {
+                    LOG.debug("Record received from kafka, key: {}. value: 
{}", consumerRecord.key(), consumerRecord.value());
+                    KafkaRecord<V> record = new KafkaRecord<>(consumerRecord);
+                    consumeFunction.accept(record);
+                    futures[index] = record.getCompletableFuture();
                     index++;
                 }
                 if (!kafkaSourceConfig.isAutoCommitEnabled()) {
@@ -122,17 +130,13 @@ public class KafkaSource<V> implements PushSource<V> {
         runnerThread.start();
     }
 
-    @Override
-    public void setConsumer(java.util.function.Function<Record<V>, 
CompletableFuture<Void>> consumeFunction) {
-        this.consumeFunction = consumeFunction;
-    }
-
     static private class KafkaRecord<V> implements Record<V> {
         private final ConsumerRecord<String, V> record;
+        @Getter
+        private final CompletableFuture<Void> completableFuture = new 
CompletableFuture();
 
         public KafkaRecord(ConsumerRecord<String, V> record) {
             this.record = record;
-
         }
         @Override
         public String getPartitionId() {
@@ -148,5 +152,10 @@ public class KafkaSource<V> implements PushSource<V> {
         public V getValue() {
             return record.value();
         }
+
+        @Override
+        public void ack() {
+            completableFuture.complete(null);
+        }
     }
-}
\ No newline at end of file
+}
diff --git 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index 4b9ca98..967c005 100644
--- 
a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++ 
b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -32,27 +32,21 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
+import java.util.function.Consumer;
 
 /**
  * A simple connector to consume messages from a RabbitMQ queue
  */
-public class RabbitMQSource implements PushSource<byte[]> {
+public class RabbitMQSource extends PushSource<byte[]> {
 
     private static Logger logger = 
LoggerFactory.getLogger(RabbitMQSource.class);
 
-    private Function<Record<byte[]>, CompletableFuture<Void>> consumer;
+    private Consumer<Record<byte[]>> consumer;
     private Connection rabbitMQConnection;
     private Channel rabbitMQChannel;
     private RabbitMQConfig rabbitMQConfig;
 
     @Override
-    public void setConsumer(Function<Record<byte[]>, CompletableFuture<Void>> 
consumeFunction) {
-        this.consumer = consumeFunction;
-    }
-
-    @Override
     public void open(Map<String, Object> config) throws Exception {
         rabbitMQConfig = RabbitMQConfig.load(config);
         if (rabbitMQConfig.getAmqUri() == null
@@ -74,22 +68,27 @@ public class RabbitMQSource implements PushSource<byte[]> {
     }
 
     @Override
+    public void setConsumer(Consumer<Record<byte[]>> consumer) {
+        this.consumer = consumer;
+    }
+
+    @Override
     public void close() throws Exception {
         rabbitMQChannel.close();
         rabbitMQConnection.close();
     }
 
     private class RabbitMQConsumer extends DefaultConsumer {
-        private Function<Record<byte[]>, CompletableFuture<Void>> 
consumeFunction;
+        private Consumer<Record<byte[]>> consumeFunction;
 
-        public RabbitMQConsumer(Function<Record<byte[]>, 
CompletableFuture<Void>> consumeFunction, Channel channel) {
+        public RabbitMQConsumer(Consumer<Record<byte[]>> consumeFunction, 
Channel channel) {
             super(channel);
             this.consumeFunction = consumeFunction;
         }
 
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
-            consumeFunction.apply(new RabbitMQRecord(body));
+            consumeFunction.accept(new RabbitMQRecord(body));
         }
     }
 
diff --git 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
index 05e2b08..8331e7d 100644
--- 
a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
+++ 
b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java
@@ -37,13 +37,12 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
+import java.util.function.Consumer;
 
 /**
  * Simple Push based Twitter FireHose Source
  */
-public class TwitterFireHose implements PushSource<String> {
+public class TwitterFireHose extends PushSource<String> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TwitterFireHose.class);
 
@@ -51,7 +50,7 @@ public class TwitterFireHose implements PushSource<String> {
 
     // ----- Runtime fields
     private Object waitObject;
-    private Function<Record<String>, CompletableFuture<Void>> consumeFunction;
+    private Consumer<Record<String>> consumeFunction;
 
     @Override
     public void open(Map<String, Object> config) throws IOException {
@@ -67,7 +66,7 @@ public class TwitterFireHose implements PushSource<String> {
     }
 
     @Override
-    public void setConsumer(Function<Record<String>, CompletableFuture<Void>> 
consumeFunction) {
+    public void setConsumer(Consumer<Record<String>> consumeFunction) {
         this.consumeFunction = consumeFunction;
     }
 
@@ -123,10 +122,10 @@ public class TwitterFireHose implements 
PushSource<String> {
                     public boolean process() throws IOException, 
InterruptedException {
                         String line = reader.readLine();
                         try {
-                            // We don't really care if the future succeeds or 
not.
+                            // We don't really care if the record succeeds or 
not.
                             // However might be in the future to count failures
                             // TODO:- Figure out the metrics story for 
connectors
-                            consumeFunction.apply(new TwitterRecord(line));
+                            consumeFunction.accept(new TwitterRecord(line));
                         } catch (Exception e) {
                             LOG.error("Exception thrown");
                         }

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to