Code for KafkaEntranceProcessor (consuming messages from Kafka) Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/032ddf02 Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/032ddf02 Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/032ddf02
Branch: refs/heads/master Commit: 032ddf029d27cc9961268d84b0b63bb5df4e4fd3 Parents: 4453f1f Author: pwawrzyniak <[email protected]> Authored: Fri Mar 17 11:05:14 2017 +0100 Committer: nkourtellis <[email protected]> Committed: Fri Jul 21 21:12:18 2017 +0300 ---------------------------------------------------------------------- .../streams/kafka/KafkaEntranceProcessor.java | 153 ++++++++++-------- .../apache/samoa/streams/kafka/KafkaUtils.java | 161 +++++++++++-------- 2 files changed, 178 insertions(+), 136 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/032ddf02/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java index 228e81b..b1e8a7f 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaEntranceProcessor.java @@ -1,65 +1,88 @@ -/* - * Copyright 2017 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.samoa.streams.kafka; - -import java.util.Properties; -import org.apache.samoa.core.ContentEvent; -import org.apache.samoa.core.EntranceProcessor; -import org.apache.samoa.core.Processor; - -/** - * - * @author pwawrzyniak - */ -public class KafkaEntranceProcessor implements EntranceProcessor { - - transient private KafkaUtils kafkaUtils; - - public KafkaEntranceProcessor(Properties props, String topic, int batchSize) { - kafkaUtils = new KafkaUtils(props, null, batchSize); - } - - @Override - public void onCreate(int id) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - @Override - public boolean isFinished() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - @Override - public boolean hasNext() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - @Override - public ContentEvent nextEvent() { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - @Override - public boolean process(ContentEvent event) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - - @Override - public Processor newProcessor(Processor processor) { - throw new UnsupportedOperationException("Not supported yet."); //To change body of generated methods, choose Tools | Templates. - } - -} +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.samoa.core.ContentEvent; +import org.apache.samoa.core.EntranceProcessor; +import org.apache.samoa.core.Processor; + +/** + * + * @author pwawrzyniak + */ +public class KafkaEntranceProcessor implements EntranceProcessor { + + transient private KafkaUtils kafkaUtils; + List<byte[]> buffer; + private final KafkaDeserializer deserializer; + + public KafkaEntranceProcessor(Properties props, String topic, int timeout, KafkaDeserializer deserializer) { + this.kafkaUtils = new KafkaUtils(props, null, timeout); + this.deserializer = deserializer; + } + + private KafkaEntranceProcessor(KafkaUtils kafkaUtils, KafkaDeserializer deserializer) { + this.kafkaUtils = kafkaUtils; + this.deserializer = deserializer; + } + + @Override + public void onCreate(int id) { + this.buffer = new ArrayList<>(100); + } + + @Override + public boolean isFinished() { + return false; + } + + @Override + public boolean hasNext() { + if (buffer.isEmpty()) { + try { + buffer.addAll(kafkaUtils.getKafkaMessages()); + } catch (Exception ex) { + Logger.getLogger(KafkaEntranceProcessor.class.getName()).log(Level.SEVERE, null, ex); + } + } + return buffer.size() > 0; + } + + @Override + public ContentEvent nextEvent() { + // assume this will never be called when buffer is empty! + return this.deserializer.deserialize(buffer.remove(buffer.size() - 1)); + + } + + @Override + public boolean process(ContentEvent event) { + return false; + } + + @Override + public Processor newProcessor(Processor processor) { + KafkaEntranceProcessor kep = (KafkaEntranceProcessor) processor; + return new KafkaEntranceProcessor(new KafkaUtils(kep.kafkaUtils), deserializer); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/032ddf02/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java index c2fbaa8..d148878 100644 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java +++ b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaUtils.java @@ -1,71 +1,90 @@ -/* - * Copyright 2017 The Apache Software Foundation. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.samoa.streams.kafka; - -import java.util.Collection; -import java.util.Properties; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.apache.kafka.clients.producer.KafkaProducer; - -/** - * Internal class responsible for Kafka Stream handling - * - * @author pwawrzyniak - */ -class KafkaUtils { - - // Consumer class for internal use to retrieve messages from Kafka - private KafkaConsumer<String, byte[]> consumer; - - private KafkaProducer<String, byte[]> producer; - - // Properties of the consumer, as defined in Kafka documentation - private Properties consumerProperties; - private Properties producerProperties; - - // Batch size for Kafka Consumer - private int consumerTimeout; - - public KafkaUtils(Properties consumerProperties, Properties producerProperties, int consumerTimeout) { - this.consumerProperties = consumerProperties; - this.producerProperties = producerProperties; - this.consumerTimeout = consumerTimeout; - } - - public void initializeConsumer(Collection<String> topics) { - // lazy initialization - if (consumer == null) { - consumer = new KafkaConsumer<String, byte[]>(consumerProperties); - } - consumer.subscribe(topics); - } - - public ConsumerRecords<String, byte[]> getMessages() throws Exception { - - if (consumer != null) { - if (!consumer.subscription().isEmpty()) { - return consumer.poll(consumerTimeout); - } else { - // TODO: do it more elegant way - throw new Exception("Consumer subscribed to no topics!"); - } - } else { - // TODO: do more elegant way - throw new Exception("Consumer not initialised"); - } - } -} +/* + * Copyright 2017 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.streams.kafka; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; + +/** + * Internal class responsible for Kafka Stream handling + * + * @author pwawrzyniak + */ +class KafkaUtils { + + // Consumer class for internal use to retrieve messages from Kafka + private KafkaConsumer<String, byte[]> consumer; + + private KafkaProducer<String, byte[]> producer; + + // Properties of the consumer, as defined in Kafka documentation + private Properties consumerProperties; + private Properties producerProperties; + + // Batch size for Kafka Consumer + private int consumerTimeout; + + public KafkaUtils(Properties consumerProperties, Properties producerProperties, int consumerTimeout) { + this.consumerProperties = consumerProperties; + this.producerProperties = producerProperties; + this.consumerTimeout = consumerTimeout; + } + + KafkaUtils(KafkaUtils kafkaUtils) { + this.consumerProperties = kafkaUtils.consumerProperties; + this.producerProperties = kafkaUtils.producerProperties; + this.consumerTimeout = kafkaUtils.consumerTimeout; + } + + public void initializeConsumer(Collection<String> topics) { + // lazy initialization + if (consumer == null) { + consumer = new KafkaConsumer<String, byte[]>(consumerProperties); + } + consumer.subscribe(topics); + } + + public List<byte[]> getKafkaMessages() throws Exception { + + if (consumer != null) { + if (!consumer.subscription().isEmpty()) { + return getMessagesBytes(consumer.poll(consumerTimeout)); + } else { + // TODO: do it more elegant way + throw new Exception("Consumer subscribed to no topics!"); + } + } else { + // TODO: do more elegant way + throw new Exception("Consumer not initialised"); + } + } + + private List<byte[]> getMessagesBytes(ConsumerRecords<String, byte[]> poll) { + Iterator<ConsumerRecord<String, byte[]>> iterator = poll.iterator(); + List<byte[]> ret = new ArrayList<>(); + while(iterator.hasNext()){ + ret.add(iterator.next().value()); + } + return ret; + } +}
