Updated comments

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/c2a589d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/c2a589d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/c2a589d5

Branch: refs/heads/master
Commit: c2a589d52223a387846c45431ecd257de3eaf430
Parents: 032ddf0
Author: pwawrzyniak <[email protected]>
Authored: Fri Mar 17 11:54:47 2017 +0100
Committer: nkourtellis <[email protected]>
Committed: Fri Jul 21 21:12:18 2017 +0300

----------------------------------------------------------------------
 .../streams/kafka/KafkaEntranceProcessor.java   | 27 +++++++++++++-----
 .../apache/samoa/streams/kafka/KafkaUtils.java  | 29 ++++++++++++++++----
 2 files changed, 43 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/c2a589d5/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
index b1e8a7f..fe82212 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
@@ -16,38 +16,52 @@
 package org.apache.samoa.streams.kafka;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 import java.util.logging.Level;
 import java.util.logging.Logger;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.samoa.core.ContentEvent;
 import org.apache.samoa.core.EntranceProcessor;
 import org.apache.samoa.core.Processor;
 
 /**
- *
+ * Entrance processor that reads incoming messages from <a 
href="https://kafka.apache.org/";>Apache Kafka</a>
  * @author pwawrzyniak
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
  */
 public class KafkaEntranceProcessor implements EntranceProcessor {
 
-    transient private KafkaUtils kafkaUtils;
-    List<byte[]> buffer;
+    transient private final KafkaUtils kafkaUtils;
+    private List<byte[]> buffer;
     private final KafkaDeserializer deserializer;
+    private final String topic;
 
+    /**
+     * Class constructor
+     * @param props Properties of Kafka consumer
+     * @see  <a 
href="https://kafka.apache.org/documentation/#newconsumerconfigs";> Apache Kafka 
consumer configuration</a>
+     * @param topic Topic from which the messages should be read
+     * @param timeout Timeout used when polling Kafka for new messages
+     * @param deserializer Instance of the implementation of {@link 
KafkaDeserializer}
+     */
     public KafkaEntranceProcessor(Properties props, String topic, int timeout, 
KafkaDeserializer deserializer) {
         this.kafkaUtils = new KafkaUtils(props, null, timeout);
         this.deserializer = deserializer;
+        this.topic = topic;
     }
 
-    private KafkaEntranceProcessor(KafkaUtils kafkaUtils, KafkaDeserializer 
deserializer) {
+    private KafkaEntranceProcessor(KafkaUtils kafkaUtils, KafkaDeserializer 
deserializer, String topic) {
         this.kafkaUtils = kafkaUtils;
         this.deserializer = deserializer;
+        this.topic = topic;
     }
 
     @Override
     public void onCreate(int id) {
         this.buffer = new ArrayList<>(100);
+        this.kafkaUtils.initializeConsumer(Arrays.asList(this.topic));
     }
 
     @Override
@@ -82,7 +96,6 @@ public class KafkaEntranceProcessor implements 
EntranceProcessor {
     @Override
     public Processor newProcessor(Processor processor) {
         KafkaEntranceProcessor kep = (KafkaEntranceProcessor) processor;
-        return new KafkaEntranceProcessor(new KafkaUtils(kep.kafkaUtils), 
deserializer);
+        return new KafkaEntranceProcessor(new KafkaUtils(kep.kafkaUtils), 
kep.deserializer, kep.topic);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/c2a589d5/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
index d148878..c87b2f1 100644
--- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
@@ -26,9 +26,11 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 
 /**
- * Internal class responsible for Kafka Stream handling
+ * Internal class responsible for Kafka Stream handling (both consume and 
produce)
  *
  * @author pwawrzyniak
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
  */
 class KafkaUtils {
 
@@ -38,12 +40,18 @@ class KafkaUtils {
     private KafkaProducer<String, byte[]> producer;
 
     // Properties of the consumer, as defined in Kafka documentation
-    private Properties consumerProperties;
-    private Properties producerProperties;
+    private final Properties consumerProperties;
+    private final Properties producerProperties;
 
-    // Batch size for Kafka Consumer    
+    // Timeout for Kafka Consumer    
     private int consumerTimeout;
 
+    /**
+     * Class constructor
+     * @param consumerProperties Properties of consumer
+     * @param producerProperties Properties of producer
+     * @param consumerTimeout Timeout for consumer poll requests
+     */
     public KafkaUtils(Properties consumerProperties, Properties 
producerProperties, int consumerTimeout) {
         this.consumerProperties = consumerProperties;
         this.producerProperties = producerProperties;
@@ -56,14 +64,23 @@ class KafkaUtils {
         this.consumerTimeout = kafkaUtils.consumerTimeout;
     }
 
+    /**
+     * Method used to initialize Kafka Consumer, i.e. instantiate it and 
subscribe to configured topic
+     * @param topics List of Kafka topics that consumer should subscribe to
+     */
     public void initializeConsumer(Collection<String> topics) {
-        // lazy initialization
+        // lazy instantiation
         if (consumer == null) {
-            consumer = new KafkaConsumer<String, byte[]>(consumerProperties);
+            consumer = new KafkaConsumer<>(consumerProperties);
         }
         consumer.subscribe(topics);
     }
 
+    /**
+     * Method for reading new messages from Kafka topics
+     * @return Collection of read messages
+     * @throws Exception Exception is thrown when consumer was not initialized 
or is not subscribed to any topic.
+     */
     public List<byte[]> getKafkaMessages() throws Exception {
 
         if (consumer != null) {

Reply via email to