Repository: incubator-samoa Updated Branches: refs/heads/master 26c219124 -> 17733b5e6
Changes in KafkaTask Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/17733b5e Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/17733b5e Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/17733b5e Branch: refs/heads/master Commit: 17733b5e62b915ad398a9c29cc9df1c1f644f6fa Parents: cd9319d Author: pwawrzyniak <[email protected]> Authored: Fri Jul 14 14:39:01 2017 +0200 Committer: nkourtellis <[email protected]> Committed: Fri Jul 21 21:12:18 2017 +0300 ---------------------------------------------------------------------- .../apache/samoa/streams/kafka/KafkaTask.java | 147 -------------- .../java/org/apache/samoa/tasks/KafkaTask.java | 199 +++++++++++++++++++ 2 files changed, 199 insertions(+), 147 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/17733b5e/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java deleted file mode 100644 index b3d638f..0000000 --- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.samoa.streams.kafka; - -/* - * #%L - * SAMOA - * %% - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * #L% - */ - -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.Properties; - -import org.apache.samoa.tasks.Task; -import org.apache.samoa.topology.ComponentFactory; -import org.apache.samoa.topology.Stream; -import org.apache.samoa.topology.Topology; -import org.apache.samoa.topology.TopologyBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.github.javacliparser.Configurable; -import com.github.javacliparser.IntOption; -import com.github.javacliparser.StringOption; - -/** - * Kafka task - * - * @author Jakub Jankowski - * @version 0.5.0-incubating-SNAPSHOT - * @since 0.5.0-incubating - * - */ - -public class KafkaTask implements Task, Configurable { - - private static final long serialVersionUID = 3984474041982397855L; - private static Logger logger = LoggerFactory.getLogger(KafkaTask.class); - - //czy identyczne dla enterance i destination? - Properties producerProps; - Properties consumerProps; - int timeout; - private final KafkaDeserializer deserializer; - private final KafkaSerializer serializer; - - private TopologyBuilder builder; - private Topology kafkaTopology; - - public IntOption kafkaParallelismOption = new IntOption("parallelismOption", 'p', - "Number of destination Processors", 1, 1, Integer.MAX_VALUE); - - public StringOption evaluationNameOption = new StringOption("evaluationName", 'n', "Identifier of the evaluation", - "KafkaTask" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); - - private final String inTopic; - private final String outTopic; - - /** - * Class constructor - * @param props Properties of Kafka Producer and Consumer - * @see <a href="http://kafka.apache.org/documentation/#producerconfigs">Kafka Producer configuration</a> - * @see <a href="http://kafka.apache.org/documentation/#consumerconfigs">Kafka Consumer configuration</a> - * @param topic Topic to which destination processor will write into - * @param timeout Timeout used when polling Kafka for new messages - * @param serializer Implementation of KafkaSerializer that handles arriving data serialization - * @param serializer Implementation of KafkaDeserializer that handles arriving data deserialization - */ - public KafkaTask(Properties producerProps, Properties consumerProps, String inTopic, String outTopic, int timeout, KafkaSerializer serializer, KafkaDeserializer deserializer) { - this.producerProps = producerProps; - this.consumerProps = consumerProps; - this.deserializer = deserializer; - this.serializer = serializer; - this.inTopic = inTopic; - this.outTopic = outTopic; - this.timeout = timeout; - } - - @Override - public void init() { - logger.info("Invoking init"); - if (builder == null) { - builder = new TopologyBuilder(); - logger.info("Successfully instantiating TopologyBuilder"); - - builder.initTopology(evaluationNameOption.getValue()); - logger.info("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); - } - - // create enterance processor - KafkaEntranceProcessor sourceProcessor = new KafkaEntranceProcessor(consumerProps, inTopic, timeout, deserializer); - builder.addEntranceProcessor(sourceProcessor); - - // create stream - Stream stream = builder.createStream(sourceProcessor); - - // create destination processor - KafkaDestinationProcessor destProcessor = new KafkaDestinationProcessor(producerProps, outTopic, serializer); - builder.addProcessor(destProcessor, kafkaParallelismOption.getValue()); - builder.connectInputShuffleStream(stream, destProcessor); - - // build topology - kafkaTopology = builder.build(); - logger.info("Successfully built the topology"); - } - - @Override - public Topology getTopology() { - return kafkaTopology; - } - - @Override - public void setFactory(ComponentFactory factory) { - logger.info("Invoking setFactory: "+factory.toString()); - builder = new TopologyBuilder(factory); - logger.info("Successfully instantiating TopologyBuilder"); - - builder.initTopology(evaluationNameOption.getValue()); - logger.info("Successfully initializing SAMOA topology with name {}", evaluationNameOption.getValue()); - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/17733b5e/samoa-api/src/main/java/org/apache/samoa/tasks/KafkaTask.java ---------------------------------------------------------------------- diff --git a/samoa-api/src/main/java/org/apache/samoa/tasks/KafkaTask.java b/samoa-api/src/main/java/org/apache/samoa/tasks/KafkaTask.java new file mode 100644 index 0000000..f0597a8 --- /dev/null +++ b/samoa-api/src/main/java/org/apache/samoa/tasks/KafkaTask.java @@ -0,0 +1,199 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.samoa.tasks; + +/* + * #%L + * SAMOA + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +import com.github.javacliparser.ClassOption; +import java.util.Properties; + +import org.apache.samoa.topology.ComponentFactory; +import org.apache.samoa.topology.Stream; +import org.apache.samoa.topology.Topology; +import org.apache.samoa.topology.TopologyBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.javacliparser.Configurable; +import com.github.javacliparser.IntOption; +import com.github.javacliparser.StringOption; +import java.text.SimpleDateFormat; +import java.util.Date; +import org.apache.samoa.streams.kafka.KafkaDeserializer; +import org.apache.samoa.streams.kafka.KafkaDestinationProcessor; +import org.apache.samoa.streams.kafka.KafkaEntranceProcessor; +import org.apache.samoa.streams.kafka.KafkaSerializer; + +/** + * Kafka task + * + * @author Jakub Jankowski + * @version 0.5.0-incubating-SNAPSHOT + * @since 0.5.0-incubating + * + */ +public class KafkaTask implements Task, Configurable { + + private static final long serialVersionUID = 3984474041982397855L; + private static Logger logger = LoggerFactory.getLogger(KafkaTask.class); + + Properties producerProps; + Properties consumerProps; + int timeout; + private KafkaDeserializer deserializer; + private KafkaSerializer serializer; + private String inTopic; + private String outTopic; + + private TopologyBuilder builder; + private Topology kafkaTopology; + + public IntOption kafkaParallelismOption = new IntOption("parallelismOption", 'p', + "Number of destination Processors", 1, 1, Integer.MAX_VALUE); + + public IntOption timeoutOption = new IntOption("timeout", 't', + "Kafka consumer timeout", 1, 1, Integer.MAX_VALUE); + + public StringOption inputBrokerOption = new StringOption("inputBroker", 'r', "Input brokers addresses", + "inputTopic"); + + public StringOption outputBrokerOption = new StringOption("outputBroker", 's', "Output brokers name", + "inputTopic"); + + public StringOption inputTopicOption = new StringOption("inputTopic", 'i', "Input topic name", + "inputTopic"); + + public StringOption outputTopicOption = new StringOption("outputTopic", 'o', "Output topic name", + "outputTopic"); + + public ClassOption serializerOption = new ClassOption("serializer", 'w', + "Serializer class name", + KafkaSerializer.class, KafkaSerializer.class.getName()); + + public ClassOption deserializerOption = new ClassOption("deserializer", 'd', + "Deserializer class name", + KafkaDeserializer.class, KafkaDeserializer.class.getName()); + + public StringOption taskNameOption = new StringOption("taskName", 'n', "Identifier of the task", + "KafkaTask" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date())); + + /** + * Class constructor (for tests purposes) + * + * @param producerProps Properties of Kafka Producer and Consumer + * @see <a href="http://kafka.apache.org/documentation/#producerconfigs">Kafka + * Producer configuration</a> + * @param consumerProps Properties of Kafka Producer and Consumer + * @see <a href="http://kafka.apache.org/documentation/#consumerconfigs">Kafka + * Consumer configuration</a> + * @param inTopic Topic to which destination processor will read from + * @param outTopic Topic to which destination processor will write into + * @param timeout Timeout used when polling Kafka for new messages + * @param serializer Implementation of KafkaSerializer that handles arriving + * data serialization + * @param deserializer Implementation of KafkaDeserializer that handles + * arriving data deserialization + */ + public KafkaTask(Properties producerProps, Properties consumerProps, String inTopic, String outTopic, int timeout, KafkaSerializer serializer, KafkaDeserializer deserializer) { + this.producerProps = producerProps; + this.consumerProps = consumerProps; + this.deserializer = deserializer; + this.serializer = serializer; + this.inTopic = inTopic; + this.outTopic = outTopic; + this.timeout = timeout; + } + + /** + * Class constructor + */ + public KafkaTask() { + + } + + @Override + public void init() { + producerProps = new Properties(); + producerProps.setProperty("bootstrap.servers", outputBrokerOption.getValue()); + + consumerProps = new Properties(); + consumerProps.setProperty("bootstrap.servers", inputBrokerOption.getValue()); + + serializer = serializerOption.getValue(); + + deserializer = deserializerOption.getValue(); + + inTopic = inputTopicOption.getValue(); + outTopic = outputTopicOption.getValue(); + + timeout = timeoutOption.getValue(); + + logger.info("Invoking init"); + if (builder == null) { + builder = new TopologyBuilder(); + logger.info("Successfully instantiating TopologyBuilder"); + + builder.initTopology(taskNameOption.getValue()); + logger.info("Successfully initializing SAMOA topology with name {}", taskNameOption.getValue()); + } + + // create enterance processor + KafkaEntranceProcessor sourceProcessor = new KafkaEntranceProcessor(consumerProps, inTopic, timeout, deserializer); + builder.addEntranceProcessor(sourceProcessor); + + // create stream + Stream stream = builder.createStream(sourceProcessor); + + // create destination processor + KafkaDestinationProcessor destProcessor = new KafkaDestinationProcessor(producerProps, outTopic, serializer); + builder.addProcessor(destProcessor, kafkaParallelismOption.getValue()); + builder.connectInputShuffleStream(stream, destProcessor); + + // build topology + kafkaTopology = builder.build(); + logger.info("Successfully built the topology"); + } + + @Override + public Topology getTopology() { + return kafkaTopology; + } + + @Override + public void setFactory(ComponentFactory factory) { + logger.info("Invoking setFactory: " + factory.toString()); + builder = new TopologyBuilder(factory); + logger.info("Successfully instantiating TopologyBuilder"); + + builder.initTopology(taskNameOption.getValue()); + logger.info("Successfully initializing SAMOA topology with name {}", taskNameOption.getValue()); + + } + +}
