KafkaDestinationProcessor implementation (sending msg to Kafka Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/0db63b94 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/0db63b94 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/0db63b94
Branch: refs/heads/master Commit: 0db63b941bfb1270bb810ac0c7984c86658c9818 Parents: d32cea1 Author: pwawrzyniak <[email protected]> Authored: Fri Mar 17 15:40:25 2017 +0100 Committer: nkourtellis <[email protected]> Committed: Fri Jul 21 21:12:18 2017 +0300 ---------------------------------------------------------------------- .../kafka/KafkaDestinationProcessor.java | 119 ++++++++++++------- .../apache/samoa/streams/kafka/KafkaUtils.java | 15 +++ 2 files changed, 92 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/0db63b94/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java index ed8f164..5632b6e 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaDestinationProcessor.java @@ -1,42 +1,77 @@ -/* - * Copyright 2017 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.samoa.streams.kafka; - -import org.apache.samoa.core.ContentEvent; -import org.apache.samoa.core.Processor; - -/** - * - * @author pwawrzyniak - */ -public class KafkaDestinationProcessor implements Processor { - - @Override - public boolean process(ContentEvent event) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - @Override - public void onCreate(int id) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - @Override - public Processor newProcessor(Processor processor) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - -} +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.Processor; + +/** + * Destination processor that writes data to Apache Kafka + * @author pwawrzyniak + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating + */ +public class KafkaDestinationProcessor implements Processor { + + private final KafkaUtils kafkaUtils; + private final String topic; + private final KafkaSerializer serializer; + + /** + * Class constructor + * @param props Properties of Kafka Producer + * @see <a href="http://kafka.apache.org/documentation/#producerconfigs">Kafka Producer configuration</a> + * @param topic Topic this destination processor will write into + * @param serializer Implementation of KafkaSerializer that handles arriving data serialization + */ + public KafkaDestinationProcessor(Properties props, String topic, KafkaSerializer serializer) { + this.kafkaUtils = new KafkaUtils(null, props, 0); + this.topic = topic; + this.serializer = serializer; + } + + private KafkaDestinationProcessor(KafkaUtils kafkaUtils, String topic, KafkaSerializer serializer){ + this.kafkaUtils = kafkaUtils; + this.topic = topic; + this.serializer = serializer; + } + + @Override + public boolean process(ContentEvent event) { + try { + kafkaUtils.sendKafkaMessage(topic, serializer.serialize(event)); + } catch (Exception ex) { + Logger.getLogger(KafkaEntranceProcessor.class.getName()).log(Level.SEVERE, null, ex); + return false; + } + return true; + } + + @Override + public void onCreate(int id) { + kafkaUtils.initializeProducer(); + } + + @Override + public Processor newProcessor(Processor processor) { + KafkaDestinationProcessor kdp = (KafkaDestinationProcessor)processor; + return new KafkaDestinationProcessor(new KafkaUtils(kdp.kafkaUtils), kdp.topic, kdp.serializer); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/0db63b94/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java index c87b2f1..24783d4 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java @@ -24,6 +24,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; /** * Internal class responsible for Kafka Stream handling (both consume and produce) @@ -76,6 +77,13 @@ class KafkaUtils { consumer.subscribe(topics); } + public void initializeProducer(){ + // lazy instantiation + if(producer==null){ + producer = new KafkaProducer<>(producerProperties); + } + } + /** * Method for reading new messages from Kafka topics * @return Collection of read messages @@ -104,4 +112,11 @@ class KafkaUtils { } return ret; } + + public void sendKafkaMessage(String topic, byte[] message){ + if(producer!=null){ + producer.send(new ProducerRecord<String, byte[]>(topic, message)); + producer.flush(); + } + } }
