Code for KafkaEntranceProcessor (consuming messages from Kafka)

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/032ddf02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/032ddf02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/032ddf02

Branch: refs/heads/master
Commit: 032ddf029d27cc9961268d84b0b63bb5df4e4fd3
Parents: 4453f1f
Author: pwawrzyniak <[email protected]>
Authored: Fri Mar 17 11:05:14 2017 +0100
Committer: nkourtellis <[email protected]>
Committed: Fri Jul 21 21:12:18 2017 +0300

----------------------------------------------------------------------
 .../streams/kafka/KafkaEntranceProcessor.java   | 153 ++++++++++--------
 .../apache/samoa/streams/kafka/KafkaUtils.java  | 161 +++++++++++--------
 2 files changed, 178 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/032ddf02/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
index 228e81b..b1e8a7f 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java
@@ -1,65 +1,88 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samoa.streams.kafka;
-
-import java.util.Properties;
-import org.apache.samoa.core.ContentEvent;
-import org.apache.samoa.core.EntranceProcessor;
-import org.apache.samoa.core.Processor;
-
-/**
- *
- * @author pwawrzyniak
- */
-public class KafkaEntranceProcessor implements EntranceProcessor {
-
-    transient private KafkaUtils kafkaUtils;
-
-    public KafkaEntranceProcessor(Properties props, String topic, int 
batchSize) {
-        kafkaUtils = new KafkaUtils(props, null, batchSize);
-    }
-
-    @Override
-    public void onCreate(int id) {
-        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
-    }
-
-    @Override
-    public boolean isFinished() {
-        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
-    }
-
-    @Override
-    public boolean hasNext() {
-        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
-    }
-
-    @Override
-    public ContentEvent nextEvent() {
-        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
-    }
-
-    @Override
-    public boolean process(ContentEvent event) {
-        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
-    }
-
-    @Override
-    public Processor newProcessor(Processor processor) {
-        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
-    }
-
-}
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.EntranceProcessor;
+import org.apache.samoa.core.Processor;
+
+/**
+ *
+ * @author pwawrzyniak
+ */
+public class KafkaEntranceProcessor implements EntranceProcessor {
+
+    transient private KafkaUtils kafkaUtils;
+    List<byte[]> buffer;
+    private final KafkaDeserializer deserializer;
+
+    public KafkaEntranceProcessor(Properties props, String topic, int timeout, 
KafkaDeserializer deserializer) {
+        this.kafkaUtils = new KafkaUtils(props, null, timeout);
+        this.deserializer = deserializer;
+    }
+
+    private KafkaEntranceProcessor(KafkaUtils kafkaUtils, KafkaDeserializer 
deserializer) {
+        this.kafkaUtils = kafkaUtils;
+        this.deserializer = deserializer;
+    }
+
+    @Override
+    public void onCreate(int id) {
+        this.buffer = new ArrayList<>(100);
+    }
+
+    @Override
+    public boolean isFinished() {
+        return false;
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (buffer.isEmpty()) {
+            try {
+                buffer.addAll(kafkaUtils.getKafkaMessages());
+            } catch (Exception ex) {
+                
Logger.getLogger(KafkaEntranceProcessor.class.getName()).log(Level.SEVERE, 
null, ex);
+            }
+        }
+        return buffer.size() > 0;
+    }
+
+    @Override
+    public ContentEvent nextEvent() {
+        // assume this will never be called when buffer is empty!
+        return this.deserializer.deserialize(buffer.remove(buffer.size() - 1));
+
+    }
+
+    @Override
+    public boolean process(ContentEvent event) {
+        return false;
+    }
+
+    @Override
+    public Processor newProcessor(Processor processor) {
+        KafkaEntranceProcessor kep = (KafkaEntranceProcessor) processor;
+        return new KafkaEntranceProcessor(new KafkaUtils(kep.kafkaUtils), 
deserializer);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/032ddf02/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
index c2fbaa8..d148878 100644
--- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
@@ -1,71 +1,90 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samoa.streams.kafka;
-
-import java.util.Collection;
-import java.util.Properties;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-
-/**
- * Internal class responsible for Kafka Stream handling
- *
- * @author pwawrzyniak
- */
-class KafkaUtils {
-
-    // Consumer class for internal use to retrieve messages from Kafka
-    private KafkaConsumer<String, byte[]> consumer;
-
-    private KafkaProducer<String, byte[]> producer;
-
-    // Properties of the consumer, as defined in Kafka documentation
-    private Properties consumerProperties;
-    private Properties producerProperties;
-
-    // Batch size for Kafka Consumer    
-    private int consumerTimeout;
-
-    public KafkaUtils(Properties consumerProperties, Properties 
producerProperties, int consumerTimeout) {
-        this.consumerProperties = consumerProperties;
-        this.producerProperties = producerProperties;
-        this.consumerTimeout = consumerTimeout;
-    }
-
-    public void initializeConsumer(Collection<String> topics) {
-        // lazy initialization
-        if (consumer == null) {
-            consumer = new KafkaConsumer<String, byte[]>(consumerProperties);
-        }
-        consumer.subscribe(topics);
-    }
-
-    public ConsumerRecords<String, byte[]> getMessages() throws Exception {
-
-        if (consumer != null) {
-            if (!consumer.subscription().isEmpty()) {
-                return consumer.poll(consumerTimeout);
-            } else {
-                // TODO: do it more elegant way
-                throw new Exception("Consumer subscribed to no topics!");
-            }
-        } else {
-            // TODO: do more elegant way
-            throw new Exception("Consumer not initialised");
-        }
-    }
-}
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+
+/**
+ * Internal class responsible for Kafka Stream handling
+ *
+ * @author pwawrzyniak
+ */
+class KafkaUtils {
+
+    // Consumer class for internal use to retrieve messages from Kafka
+    private KafkaConsumer<String, byte[]> consumer;
+
+    private KafkaProducer<String, byte[]> producer;
+
+    // Properties of the consumer, as defined in Kafka documentation
+    private Properties consumerProperties;
+    private Properties producerProperties;
+
+    // Batch size for Kafka Consumer    
+    private int consumerTimeout;
+
+    public KafkaUtils(Properties consumerProperties, Properties 
producerProperties, int consumerTimeout) {
+        this.consumerProperties = consumerProperties;
+        this.producerProperties = producerProperties;
+        this.consumerTimeout = consumerTimeout;
+    }
+
+    KafkaUtils(KafkaUtils kafkaUtils) {
+        this.consumerProperties = kafkaUtils.consumerProperties;
+        this.producerProperties = kafkaUtils.producerProperties;
+        this.consumerTimeout = kafkaUtils.consumerTimeout;
+    }
+
+    public void initializeConsumer(Collection<String> topics) {
+        // lazy initialization
+        if (consumer == null) {
+            consumer = new KafkaConsumer<String, byte[]>(consumerProperties);
+        }
+        consumer.subscribe(topics);
+    }
+
+    public List<byte[]> getKafkaMessages() throws Exception {
+
+        if (consumer != null) {
+            if (!consumer.subscription().isEmpty()) {
+                return getMessagesBytes(consumer.poll(consumerTimeout));
+            } else {
+                // TODO: do it more elegant way
+                throw new Exception("Consumer subscribed to no topics!");
+            }
+        } else {
+            // TODO: do more elegant way
+            throw new Exception("Consumer not initialised");
+        }
+    }
+
+    private List<byte[]> getMessagesBytes(ConsumerRecords<String, byte[]> 
poll) {
+        Iterator<ConsumerRecord<String, byte[]>> iterator = poll.iterator();
+        List<byte[]> ret = new ArrayList<>();
+        while(iterator.hasNext()){
+            ret.add(iterator.next().value());
+        }
+        return ret;
+    }
+}

Reply via email to