This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 0f84222 Implementing PushSource on top of Source (#1766) 0f84222 is described below commit 0f84222deff90acbb5bab99cc56e9e6f673ad7ef Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Mon May 14 12:02:13 2018 -0700 Implementing PushSource on top of Source (#1766) * Implementing PushSource on top of Source * adding missing license header * fix formatting --- .../java/org/apache/pulsar/io/core/PushSource.java | 48 ++++++++++++++++++---- .../org/apache/pulsar/io/kafka/KafkaSource.java | 39 +++++++++++------- .../apache/pulsar/io/rabbitmq/RabbitMQSource.java | 23 +++++------ .../apache/pulsar/io/twitter/TwitterFireHose.java | 13 +++--- 4 files changed, 81 insertions(+), 42 deletions(-) diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java index 8111488..011f8ab 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/PushSource.java @@ -19,34 +19,66 @@ package org.apache.pulsar.io.core; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Consumer; /** * Pulsar's Push Source interface. PushSource read data from * external sources(database changes, twitter firehose, etc) * and publish to a Pulsar topic. The reason its called Push is - * because PushSources get passed a consumption Function that they + * because PushSources get passed a consumer that they * invoke whenever they have data to be published to Pulsar. * The lifcycle of a PushSource is to open it passing any config needed * by it to initialize(like open network connection, authenticate, etc). - * A consumer Function is then to it which is invoked by the source whenever + * A consumer is then to it which is invoked by the source whenever * there is data to be published. Once all data has been read, one can use close * at the end of the session to do any cleanup */ -public interface PushSource<T> extends AutoCloseable { +public abstract class PushSource<T> implements Source<T> { + + private LinkedBlockingQueue<Record<T>> queue; + private static final int DEFAULT_QUEUE_LENGTH = 1000; + + public PushSource() { + this.queue = new LinkedBlockingQueue<>(this.getQueueLength()); + this.setConsumer(new Consumer<Record<T>>() { + @Override + public void accept(Record<T> record) { + try { + queue.put(record); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + }); + } + + @Override + public Record<T> read() throws Exception { + return queue.take(); + } + /** * Open connector with configuration * * @param config initialization config * @throws Exception IO type exceptions when opening a connector */ - void open(final Map<String, Object> config) throws Exception; + abstract public void open(Map<String, Object> config) throws Exception; /** * Attach a consumer function to this Source. This is invoked by the implementation * to pass messages whenever there is data to be pushed to Pulsar. * @param consumer */ - void setConsumer(Function<Record<T>, CompletableFuture<Void>> consumer); -} \ No newline at end of file + abstract public void setConsumer(Consumer<Record<T>> consumer); + + /** + * Get length of the queue that records are push onto + * Users can override this method to customize the queue length + * @return queue length + */ + public int getQueueLength() { + return DEFAULT_QUEUE_LENGTH; + } +} diff --git a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java index a157955..6618c1c 100644 --- a/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java +++ b/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaSource.java @@ -19,6 +19,7 @@ package org.apache.pulsar.io.kafka; +import lombok.Getter; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -38,7 +39,7 @@ import java.util.concurrent.ExecutionException; /** * Simple Kafka Source to transfer messages from a Kafka topic */ -public class KafkaSource<V> implements PushSource<V> { +public class KafkaSource<V> extends PushSource<V> { private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class); @@ -47,7 +48,7 @@ public class KafkaSource<V> implements PushSource<V> { private KafkaSourceConfig kafkaSourceConfig; Thread runnerThread; - private java.util.function.Function<Record<V>, CompletableFuture<Void>> consumeFunction; + private java.util.function.Consumer<Record<V>> consumeFunction; @Override public void open(Map<String, Object> config) throws Exception { @@ -77,6 +78,11 @@ public class KafkaSource<V> implements PushSource<V> { } @Override + public void setConsumer(java.util.function.Consumer<Record<V>> consumerFunction) { + this.consumeFunction = consumerFunction; + } + + @Override public void close() throws InterruptedException { LOG.info("Stopping kafka source"); if (runnerThread != null) { @@ -97,14 +103,16 @@ public class KafkaSource<V> implements PushSource<V> { consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(kafkaSourceConfig.getTopic())); LOG.info("Kafka source started."); - ConsumerRecords<String, V> records; + ConsumerRecords<String, V> consumerRecords; while(true){ - records = consumer.poll(1000); - CompletableFuture<?>[] futures = new CompletableFuture<?>[records.count()]; + consumerRecords = consumer.poll(1000); + CompletableFuture<?>[] futures = new CompletableFuture<?>[consumerRecords.count()]; int index = 0; - for (ConsumerRecord<String, V> record : records) { - LOG.debug("Record received from kafka, key: {}. value: {}", record.key(), record.value()); - futures[index] = consumeFunction.apply(new KafkaRecord<>(record)); + for (ConsumerRecord<String, V> consumerRecord : consumerRecords) { + LOG.debug("Record received from kafka, key: {}. value: {}", consumerRecord.key(), consumerRecord.value()); + KafkaRecord<V> record = new KafkaRecord<>(consumerRecord); + consumeFunction.accept(record); + futures[index] = record.getCompletableFuture(); index++; } if (!kafkaSourceConfig.isAutoCommitEnabled()) { @@ -122,17 +130,13 @@ public class KafkaSource<V> implements PushSource<V> { runnerThread.start(); } - @Override - public void setConsumer(java.util.function.Function<Record<V>, CompletableFuture<Void>> consumeFunction) { - this.consumeFunction = consumeFunction; - } - static private class KafkaRecord<V> implements Record<V> { private final ConsumerRecord<String, V> record; + @Getter + private final CompletableFuture<Void> completableFuture = new CompletableFuture(); public KafkaRecord(ConsumerRecord<String, V> record) { this.record = record; - } @Override public String getPartitionId() { @@ -148,5 +152,10 @@ public class KafkaSource<V> implements PushSource<V> { public V getValue() { return record.value(); } + + @Override + public void ack() { + completableFuture.complete(null); + } } -} \ No newline at end of file +} diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java index 4b9ca98..967c005 100644 --- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java +++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java @@ -32,27 +32,21 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; +import java.util.function.Consumer; /** * A simple connector to consume messages from a RabbitMQ queue */ -public class RabbitMQSource implements PushSource<byte[]> { +public class RabbitMQSource extends PushSource<byte[]> { private static Logger logger = LoggerFactory.getLogger(RabbitMQSource.class); - private Function<Record<byte[]>, CompletableFuture<Void>> consumer; + private Consumer<Record<byte[]>> consumer; private Connection rabbitMQConnection; private Channel rabbitMQChannel; private RabbitMQConfig rabbitMQConfig; @Override - public void setConsumer(Function<Record<byte[]>, CompletableFuture<Void>> consumeFunction) { - this.consumer = consumeFunction; - } - - @Override public void open(Map<String, Object> config) throws Exception { rabbitMQConfig = RabbitMQConfig.load(config); if (rabbitMQConfig.getAmqUri() == null @@ -74,22 +68,27 @@ public class RabbitMQSource implements PushSource<byte[]> { } @Override + public void setConsumer(Consumer<Record<byte[]>> consumer) { + this.consumer = consumer; + } + + @Override public void close() throws Exception { rabbitMQChannel.close(); rabbitMQConnection.close(); } private class RabbitMQConsumer extends DefaultConsumer { - private Function<Record<byte[]>, CompletableFuture<Void>> consumeFunction; + private Consumer<Record<byte[]>> consumeFunction; - public RabbitMQConsumer(Function<Record<byte[]>, CompletableFuture<Void>> consumeFunction, Channel channel) { + public RabbitMQConsumer(Consumer<Record<byte[]>> consumeFunction, Channel channel) { super(channel); this.consumeFunction = consumeFunction; } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - consumeFunction.apply(new RabbitMQRecord(body)); + consumeFunction.accept(new RabbitMQRecord(body)); } } diff --git a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java index 05e2b08..8331e7d 100644 --- a/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java +++ b/pulsar-io/twitter/src/main/java/org/apache/pulsar/io/twitter/TwitterFireHose.java @@ -37,13 +37,12 @@ import java.io.IOException; import java.io.InputStream; import java.io.Serializable; import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.function.Function; +import java.util.function.Consumer; /** * Simple Push based Twitter FireHose Source */ -public class TwitterFireHose implements PushSource<String> { +public class TwitterFireHose extends PushSource<String> { private static final Logger LOG = LoggerFactory.getLogger(TwitterFireHose.class); @@ -51,7 +50,7 @@ public class TwitterFireHose implements PushSource<String> { // ----- Runtime fields private Object waitObject; - private Function<Record<String>, CompletableFuture<Void>> consumeFunction; + private Consumer<Record<String>> consumeFunction; @Override public void open(Map<String, Object> config) throws IOException { @@ -67,7 +66,7 @@ public class TwitterFireHose implements PushSource<String> { } @Override - public void setConsumer(Function<Record<String>, CompletableFuture<Void>> consumeFunction) { + public void setConsumer(Consumer<Record<String>> consumeFunction) { this.consumeFunction = consumeFunction; } @@ -123,10 +122,10 @@ public class TwitterFireHose implements PushSource<String> { public boolean process() throws IOException, InterruptedException { String line = reader.readLine(); try { - // We don't really care if the future succeeds or not. + // We don't really care if the record succeeds or not. // However might be in the future to count failures // TODO:- Figure out the metrics story for connectors - consumeFunction.apply(new TwitterRecord(line)); + consumeFunction.accept(new TwitterRecord(line)); } catch (Exception e) { LOG.error("Exception thrown"); } -- To stop receiving notification emails like this one, please contact si...@apache.org.