KafkaDestinationProcessor implementation (sending msg to Kafka

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/0db63b94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/0db63b94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/0db63b94

Branch: refs/heads/master
Commit: 0db63b941bfb1270bb810ac0c7984c86658c9818
Parents: d32cea1
Author: pwawrzyniak <[email protected]>
Authored: Fri Mar 17 15:40:25 2017 +0100
Committer: nkourtellis <[email protected]>
Committed: Fri Jul 21 21:12:18 2017 +0300

----------------------------------------------------------------------
 .../kafka/KafkaDestinationProcessor.java        | 119 ++++++++++++-------
 .../apache/samoa/streams/kafka/KafkaUtils.java  |  15 +++
 2 files changed, 92 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/0db63b94/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
index ed8f164..5632b6e 100644
--- 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
+++ 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java
@@ -1,42 +1,77 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samoa.streams.kafka;
-
-import org.apache.samoa.core.ContentEvent;
-import org.apache.samoa.core.Processor;
-
-/**
- *
- * @author pwawrzyniak
- */
-public class KafkaDestinationProcessor implements Processor {
-
-    @Override
-    public boolean process(ContentEvent event) {
-        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
-    }
-
-    @Override
-    public void onCreate(int id) {
-        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
-    }
-
-    @Override
-    public Processor newProcessor(Processor processor) {
-        throw new UnsupportedOperationException("Not supported yet."); //To 
change body of generated methods, choose Tools | Templates.
-    }
-    
-}
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.streams.kafka;
+
+import java.util.Properties;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import org.apache.samoa.core.ContentEvent;
+import org.apache.samoa.core.Processor;
+
+/**
+ * Destination processor that writes data to Apache Kafka
+ * @author pwawrzyniak
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
+ */
+public class KafkaDestinationProcessor implements Processor {
+
+    private final KafkaUtils kafkaUtils;
+    private final String topic;
+    private final KafkaSerializer serializer;
+
+    /**
+     * Class constructor
+     * @param props Properties of Kafka Producer
+     * @see <a 
href="http://kafka.apache.org/documentation/#producerconfigs";>Kafka Producer 
configuration</a>
+     * @param topic Topic this destination processor will write into
+     * @param serializer Implementation of KafkaSerializer that handles 
arriving data serialization
+     */
+    public KafkaDestinationProcessor(Properties props, String topic, 
KafkaSerializer serializer) {
+        this.kafkaUtils = new KafkaUtils(null, props, 0);
+        this.topic = topic;
+        this.serializer = serializer;
+    }
+    
+    private KafkaDestinationProcessor(KafkaUtils kafkaUtils, String topic, 
KafkaSerializer serializer){
+        this.kafkaUtils = kafkaUtils;
+        this.topic = topic;
+        this.serializer = serializer;
+    }
+
+    @Override
+    public boolean process(ContentEvent event) {
+        try {
+            kafkaUtils.sendKafkaMessage(topic, serializer.serialize(event));
+        } catch (Exception ex) {
+            
Logger.getLogger(KafkaEntranceProcessor.class.getName()).log(Level.SEVERE, 
null, ex);
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void onCreate(int id) {
+        kafkaUtils.initializeProducer();
+    }
+
+    @Override
+    public Processor newProcessor(Processor processor) {
+        KafkaDestinationProcessor kdp = (KafkaDestinationProcessor)processor;
+        return new KafkaDestinationProcessor(new KafkaUtils(kdp.kafkaUtils), 
kdp.topic, kdp.serializer);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/0db63b94/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
index c87b2f1..24783d4 100644
--- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
+++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
 
 /**
  * Internal class responsible for Kafka Stream handling (both consume and 
produce)
@@ -76,6 +77,13 @@ class KafkaUtils {
         consumer.subscribe(topics);
     }
 
+    public void initializeProducer(){
+        // lazy instantiation
+        if(producer==null){
+            producer = new KafkaProducer<>(producerProperties);
+        }        
+    }
+    
     /**
      * Method for reading new messages from Kafka topics
      * @return Collection of read messages
@@ -104,4 +112,11 @@ class KafkaUtils {
         }
         return ret;
     }
+    
+    public void sendKafkaMessage(String topic, byte[] message){
+        if(producer!=null){
+            producer.send(new ProducerRecord<String, byte[]>(topic, message));
+            producer.flush();
+        }
+    }
 }

Reply via email to