Updated comments Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/c2a589d5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/c2a589d5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/c2a589d5
Branch: refs/heads/master Commit: c2a589d52223a387846c45431ecd257de3eaf430 Parents: 032ddf0 Author: pwawrzyniak <[email protected]> Authored: Fri Mar 17 11:54:47 2017 +0100 Committer: nkourtellis <[email protected]> Committed: Fri Jul 21 21:12:18 2017 +0300 ---------------------------------------------------------------------- .../streams/kafka/KafkaEntranceProcessor.java | 27 +++++++++++++----- .../apache/samoa/streams/kafka/KafkaUtils.java | 29 ++++++++++++++++---- 2 files changed, 43 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/c2a589d5/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java index b1e8a7f..fe82212 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java @@ -16,38 +16,52 @@ package org.apache.samoa.streams.kafka; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.samoa.core.ContentEvent; import org.apache.samoa.core.EntranceProcessor; import org.apache.samoa.core.Processor; /** - * + * Entrance processor that reads incoming messages from <a href="https://kafka.apache.org/">Apache Kafka</a> * @author pwawrzyniak + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating */ public class KafkaEntranceProcessor implements EntranceProcessor { - transient private KafkaUtils kafkaUtils; - List<byte[]> buffer; + transient private final KafkaUtils kafkaUtils; + private List<byte[]> buffer; private final KafkaDeserializer deserializer; + private final String topic; + /** + * Class constructor + * @param props Properties of Kafka consumer + * @see <a href="https://kafka.apache.org/documentation/#newconsumerconfigs"> Apache Kafka consumer configuration</a> + * @param topic Topic from which the messages should be read + * @param timeout Timeout used when polling Kafka for new messages + * @param deserializer Instance of the implementation of {@link KafkaDeserializer} + */ public KafkaEntranceProcessor(Properties props, String topic, int timeout, KafkaDeserializer deserializer) { this.kafkaUtils = new KafkaUtils(props, null, timeout); this.deserializer = deserializer; + this.topic = topic; } - private KafkaEntranceProcessor(KafkaUtils kafkaUtils, KafkaDeserializer deserializer) { + private KafkaEntranceProcessor(KafkaUtils kafkaUtils, KafkaDeserializer deserializer, String topic) { this.kafkaUtils = kafkaUtils; this.deserializer = deserializer; + this.topic = topic; } @Override public void onCreate(int id) { this.buffer = new ArrayList<>(100); + this.kafkaUtils.initializeConsumer(Arrays.asList(this.topic)); } @Override @@ -82,7 +96,6 @@ public class KafkaEntranceProcessor implements EntranceProcessor { @Override public Processor newProcessor(Processor processor) { KafkaEntranceProcessor kep = (KafkaEntranceProcessor) processor; - return new KafkaEntranceProcessor(new KafkaUtils(kep.kafkaUtils), deserializer); + return new KafkaEntranceProcessor(new KafkaUtils(kep.kafkaUtils), kep.deserializer, kep.topic); } - } http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/c2a589d5/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java index d148878..c87b2f1 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java @@ -26,9 +26,11 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; /** - * Internal class responsible for Kafka Stream handling + * Internal class responsible for Kafka Stream handling (both consume and produce) * * @author pwawrzyniak + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating */ class KafkaUtils { @@ -38,12 +40,18 @@ class KafkaUtils { private KafkaProducer<String, byte[]> producer; // Properties of the consumer, as defined in Kafka documentation - private Properties consumerProperties; - private Properties producerProperties; + private final Properties consumerProperties; + private final Properties producerProperties; - // Batch size for Kafka Consumer + // Timeout for Kafka Consumer private int consumerTimeout; + /** + * Class constructor + * @param consumerProperties Properties of consumer + * @param producerProperties Properties of producer + * @param consumerTimeout Timeout for consumer poll requests + */ public KafkaUtils(Properties consumerProperties, Properties producerProperties, int consumerTimeout) { this.consumerProperties = consumerProperties; this.producerProperties = producerProperties; @@ -56,14 +64,23 @@ class KafkaUtils { this.consumerTimeout = kafkaUtils.consumerTimeout; } + /** + * Method used to initialize Kafka Consumer, i.e. instantiate it and subscribe to configured topic + * @param topics List of Kafka topics that consumer should subscribe to + */ public void initializeConsumer(Collection<String> topics) { - // lazy initialization + // lazy instantiation if (consumer == null) { - consumer = new KafkaConsumer<String, byte[]>(consumerProperties); + consumer = new KafkaConsumer<>(consumerProperties); } consumer.subscribe(topics); } + /** + * Method for reading new messages from Kafka topics + * @return Collection of read messages + * @throws Exception Exception is thrown when consumer was not initialized or is not subscribed to any topic. + */ public List<byte[]> getKafkaMessages() throws Exception { if (consumer != null) {
