Repository: incubator-samoa
Updated Branches:
  refs/heads/master 26c219124 -> 17733b5e6


Changes in KafkaTask

Project: http://git-wip-us.apache.org/repos/asf/incubator-samoa/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samoa/commit/17733b5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samoa/tree/17733b5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samoa/diff/17733b5e

Branch: refs/heads/master
Commit: 17733b5e62b915ad398a9c29cc9df1c1f644f6fa
Parents: cd9319d
Author: pwawrzyniak <[email protected]>
Authored: Fri Jul 14 14:39:01 2017 +0200
Committer: nkourtellis <[email protected]>
Committed: Fri Jul 21 21:12:18 2017 +0300

----------------------------------------------------------------------
 .../apache/samoa/streams/kafka/KafkaTask.java   | 147 --------------
 .../java/org/apache/samoa/tasks/KafkaTask.java  | 199 +++++++++++++++++++
 2 files changed, 199 insertions(+), 147 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/17733b5e/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java
----------------------------------------------------------------------
diff --git 
a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java 
b/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java
deleted file mode 100644
index b3d638f..0000000
--- a/samoa-api/src/main/java/org/apache/samoa/streams/kafka/KafkaTask.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.samoa.streams.kafka;
-
-/*
- * #%L
- * SAMOA
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * 
- *      http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * #L%
- */
-
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Properties;
-
-import org.apache.samoa.tasks.Task;
-import org.apache.samoa.topology.ComponentFactory;
-import org.apache.samoa.topology.Stream;
-import org.apache.samoa.topology.Topology;
-import org.apache.samoa.topology.TopologyBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.github.javacliparser.Configurable;
-import com.github.javacliparser.IntOption;
-import com.github.javacliparser.StringOption;
-
-/**
- * Kafka task
- * 
- * @author Jakub Jankowski
- * @version 0.5.0-incubating-SNAPSHOT
- * @since 0.5.0-incubating
- *
- */
-
-public class KafkaTask implements Task, Configurable {
-
-       private static final long serialVersionUID = 3984474041982397855L;
-       private static Logger logger = LoggerFactory.getLogger(KafkaTask.class);
-       
-       //czy identyczne dla enterance i destination?
-       Properties producerProps;
-       Properties consumerProps;
-       int timeout;
-       private final KafkaDeserializer deserializer;
-       private final KafkaSerializer serializer;       
-
-       private TopologyBuilder builder;
-       private Topology kafkaTopology;
-
-       public IntOption kafkaParallelismOption = new 
IntOption("parallelismOption", 'p',
-                       "Number of destination Processors", 1, 1, 
Integer.MAX_VALUE);
-
-       public StringOption evaluationNameOption = new 
StringOption("evaluationName", 'n', "Identifier of the evaluation",
-                       "KafkaTask" + new 
SimpleDateFormat("yyyyMMddHHmmss").format(new Date()));
-        
-    private final String inTopic;
-    private final String outTopic;
-
-       /**
-     * Class constructor
-     * @param props Properties of Kafka Producer and Consumer
-     * @see <a 
href="http://kafka.apache.org/documentation/#producerconfigs";>Kafka Producer 
configuration</a>
-     * @see <a 
href="http://kafka.apache.org/documentation/#consumerconfigs";>Kafka Consumer 
configuration</a>
-     * @param topic Topic to which destination processor will write into
-     * @param timeout Timeout used when polling Kafka for new messages
-     * @param serializer Implementation of KafkaSerializer that handles 
arriving data serialization
-     * @param serializer Implementation of KafkaDeserializer that handles 
arriving data deserialization
-     */
-       public KafkaTask(Properties producerProps, Properties consumerProps, 
String inTopic, String outTopic, int timeout, KafkaSerializer serializer, 
KafkaDeserializer deserializer) {
-               this.producerProps = producerProps;
-               this.consumerProps = consumerProps;
-               this.deserializer = deserializer;
-               this.serializer = serializer;
-               this.inTopic = inTopic;
-                this.outTopic = outTopic;
-               this.timeout = timeout;
-       }
-
-       @Override
-       public void init() {
-               logger.info("Invoking init");
-               if (builder == null) {
-                       builder = new TopologyBuilder();
-                       logger.info("Successfully instantiating 
TopologyBuilder");
-
-                       builder.initTopology(evaluationNameOption.getValue());
-                       logger.info("Successfully initializing SAMOA topology 
with name {}", evaluationNameOption.getValue());
-               }
-               
-               // create enterance processor
-               KafkaEntranceProcessor sourceProcessor = new 
KafkaEntranceProcessor(consumerProps, inTopic, timeout, deserializer);
-               builder.addEntranceProcessor(sourceProcessor);
-               
-               // create stream
-               Stream stream = builder.createStream(sourceProcessor);
-               
-               // create destination processor
-               KafkaDestinationProcessor destProcessor = new 
KafkaDestinationProcessor(producerProps, outTopic, serializer);
-               builder.addProcessor(destProcessor, 
kafkaParallelismOption.getValue());
-               builder.connectInputShuffleStream(stream, destProcessor);
-               
-               // build topology
-               kafkaTopology = builder.build();
-           logger.info("Successfully built the topology");
-       }
-
-       @Override
-       public Topology getTopology() {
-               return kafkaTopology;
-       }
-
-       @Override
-       public void setFactory(ComponentFactory factory) {
-               logger.info("Invoking setFactory: "+factory.toString());
-               builder = new TopologyBuilder(factory);
-           logger.info("Successfully instantiating TopologyBuilder");
-
-           builder.initTopology(evaluationNameOption.getValue());
-           logger.info("Successfully initializing SAMOA topology with name 
{}", evaluationNameOption.getValue());
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-samoa/blob/17733b5e/samoa-api/src/main/java/org/apache/samoa/tasks/KafkaTask.java
----------------------------------------------------------------------
diff --git a/samoa-api/src/main/java/org/apache/samoa/tasks/KafkaTask.java 
b/samoa-api/src/main/java/org/apache/samoa/tasks/KafkaTask.java
new file mode 100644
index 0000000..f0597a8
--- /dev/null
+++ b/samoa-api/src/main/java/org/apache/samoa/tasks/KafkaTask.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.samoa.tasks;
+
+/*
+ * #%L
+ * SAMOA
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+import com.github.javacliparser.ClassOption;
+import java.util.Properties;
+
+import org.apache.samoa.topology.ComponentFactory;
+import org.apache.samoa.topology.Stream;
+import org.apache.samoa.topology.Topology;
+import org.apache.samoa.topology.TopologyBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.javacliparser.Configurable;
+import com.github.javacliparser.IntOption;
+import com.github.javacliparser.StringOption;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import org.apache.samoa.streams.kafka.KafkaDeserializer;
+import org.apache.samoa.streams.kafka.KafkaDestinationProcessor;
+import org.apache.samoa.streams.kafka.KafkaEntranceProcessor;
+import org.apache.samoa.streams.kafka.KafkaSerializer;
+
+/**
+ * Kafka task
+ *
+ * @author Jakub Jankowski
+ * @version 0.5.0-incubating-SNAPSHOT
+ * @since 0.5.0-incubating
+ *
+ */
+public class KafkaTask implements Task, Configurable {
+
+  private static final long serialVersionUID = 3984474041982397855L;
+  private static Logger logger = LoggerFactory.getLogger(KafkaTask.class);
+
+  Properties producerProps;
+  Properties consumerProps;
+  int timeout;
+  private KafkaDeserializer deserializer;
+  private KafkaSerializer serializer;
+  private String inTopic;
+  private String outTopic;
+
+  private TopologyBuilder builder;
+  private Topology kafkaTopology;
+
+  public IntOption kafkaParallelismOption = new IntOption("parallelismOption", 
'p',
+          "Number of destination Processors", 1, 1, Integer.MAX_VALUE);
+
+  public IntOption timeoutOption = new IntOption("timeout", 't',
+          "Kafka consumer timeout", 1, 1, Integer.MAX_VALUE);
+
+  public StringOption inputBrokerOption = new StringOption("inputBroker", 'r', 
"Input brokers addresses",
+          "inputTopic");
+
+  public StringOption outputBrokerOption = new StringOption("outputBroker", 
's', "Output brokers name",
+          "inputTopic");
+
+  public StringOption inputTopicOption = new StringOption("inputTopic", 'i', 
"Input topic name",
+          "inputTopic");
+
+  public StringOption outputTopicOption = new StringOption("outputTopic", 'o', 
"Output topic name",
+          "outputTopic");
+
+  public ClassOption serializerOption = new ClassOption("serializer", 'w',
+          "Serializer class name",
+          KafkaSerializer.class, KafkaSerializer.class.getName());
+
+  public ClassOption deserializerOption = new ClassOption("deserializer", 'd',
+          "Deserializer class name",
+          KafkaDeserializer.class, KafkaDeserializer.class.getName());
+
+  public StringOption taskNameOption = new StringOption("taskName", 'n', 
"Identifier of the task",
+          "KafkaTask" + new SimpleDateFormat("yyyyMMddHHmmss").format(new 
Date()));
+
+  /**
+   * Class constructor (for tests purposes)
+   *
+   * @param producerProps Properties of Kafka Producer and Consumer
+   * @see <a 
href="http://kafka.apache.org/documentation/#producerconfigs";>Kafka
+   * Producer configuration</a>
+   * @param consumerProps Properties of Kafka Producer and Consumer
+   * @see <a 
href="http://kafka.apache.org/documentation/#consumerconfigs";>Kafka
+   * Consumer configuration</a>
+   * @param inTopic Topic to which destination processor will read from
+   * @param outTopic Topic to which destination processor will write into
+   * @param timeout Timeout used when polling Kafka for new messages
+   * @param serializer Implementation of KafkaSerializer that handles arriving
+   * data serialization
+   * @param deserializer Implementation of KafkaDeserializer that handles
+   * arriving data deserialization
+   */
+  public KafkaTask(Properties producerProps, Properties consumerProps, String 
inTopic, String outTopic, int timeout, KafkaSerializer serializer, 
KafkaDeserializer deserializer) {
+    this.producerProps = producerProps;
+    this.consumerProps = consumerProps;
+    this.deserializer = deserializer;
+    this.serializer = serializer;
+    this.inTopic = inTopic;
+    this.outTopic = outTopic;
+    this.timeout = timeout;
+  }
+
+  /**
+   * Class constructor
+   */
+  public KafkaTask() {
+
+  }
+
+  @Override
+  public void init() {
+    producerProps = new Properties();
+    producerProps.setProperty("bootstrap.servers", 
outputBrokerOption.getValue());
+
+    consumerProps = new Properties();
+    consumerProps.setProperty("bootstrap.servers", 
inputBrokerOption.getValue());
+
+    serializer = serializerOption.getValue();
+
+    deserializer = deserializerOption.getValue();
+
+    inTopic = inputTopicOption.getValue();
+    outTopic = outputTopicOption.getValue();
+
+    timeout = timeoutOption.getValue();
+
+    logger.info("Invoking init");
+    if (builder == null) {
+      builder = new TopologyBuilder();
+      logger.info("Successfully instantiating TopologyBuilder");
+
+      builder.initTopology(taskNameOption.getValue());
+      logger.info("Successfully initializing SAMOA topology with name {}", 
taskNameOption.getValue());
+    }
+
+    // create enterance processor
+    KafkaEntranceProcessor sourceProcessor = new 
KafkaEntranceProcessor(consumerProps, inTopic, timeout, deserializer);
+    builder.addEntranceProcessor(sourceProcessor);
+
+    // create stream
+    Stream stream = builder.createStream(sourceProcessor);
+
+    // create destination processor
+    KafkaDestinationProcessor destProcessor = new 
KafkaDestinationProcessor(producerProps, outTopic, serializer);
+    builder.addProcessor(destProcessor, kafkaParallelismOption.getValue());
+    builder.connectInputShuffleStream(stream, destProcessor);
+
+    // build topology
+    kafkaTopology = builder.build();
+    logger.info("Successfully built the topology");
+  }
+
+  @Override
+  public Topology getTopology() {
+    return kafkaTopology;
+  }
+
+  @Override
+  public void setFactory(ComponentFactory factory) {
+    logger.info("Invoking setFactory: " + factory.toString());
+    builder = new TopologyBuilder(factory);
+    logger.info("Successfully instantiating TopologyBuilder");
+
+    builder.initTopology(taskNameOption.getValue());
+    logger.info("Successfully initializing SAMOA topology with name {}", 
taskNameOption.getValue());
+
+  }
+
+}

Reply via email to