Repository: apex-malhar
Updated Branches:
  refs/heads/master 23970382c -> 132012823


Kafka 0.9.0 output operators and unit tests.

1. Abstract Base class
2. Kafka Output operator
3. Exactly Once output operator
     Key in the Kafka message is used by the operator to track the tuples 
written by it.


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/0444ced6
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/0444ced6
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/0444ced6

Branch: refs/heads/master
Commit: 0444ced6c857c1dbbbbb8821f25ad367c0b47012
Parents: 85566a3
Author: sandeshh <[email protected]>
Authored: Wed May 25 08:56:56 2016 -0700
Committer: sandeshh <[email protected]>
Committed: Tue Jul 12 16:24:52 2016 -0700

----------------------------------------------------------------------
 .../kafka/AbstractKafkaOutputOperator.java      | 122 ++++++
 ...afkaSinglePortExactlyOnceOutputOperator.java | 401 +++++++++++++++++++
 .../kafka/KafkaSinglePortOutputOperator.java    |  43 ++
 .../malhar/kafka/KafkaOutputOperatorTest.java   | 375 +++++++++++++++++
 kafka/src/test/resources/log4j.properties       |   5 +-
 5 files changed, 944 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0444ced6/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
new file mode 100644
index 0000000..56d9611
--- /dev/null
+++ 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaOutputOperator.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.util.Properties;
+import javax.validation.constraints.NotNull;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+/**
+ * This is the base implementation of a Kafka output operator(0.9.0), which 
writes data to the Kafka message bus.
+ *
+ * @displayName Abstract Kafka Output
+ * @category Messaging
+ * @tags output operator
+ *
+ */
[email protected]
+public abstract class AbstractKafkaOutputOperator<K, V> implements Operator
+{
+  private transient Producer<K, V> producer;
+  @NotNull
+  private String topic;
+  private Properties properties = new Properties();
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    producer = new KafkaProducer<K, V>(properties);
+  }
+
+  /**
+   * Implement Component Interface.
+   */
+  @Override
+  public void teardown()
+  {
+    producer.close();
+  }
+
+  /**
+   * Implement Operator Interface.
+   */
+  @Override
+  public void beginWindow(long windowId)
+  {
+  }
+
+  /**
+   * Implement Operator Interface.
+   */
+  @Override
+  public void endWindow()
+  {
+  }
+
+  public Properties getProperties()
+  {
+    return properties;
+  }
+
+  /**
+   * Set the Kafka producer properties.
+   *
+   * @param properties Producer properties
+   */
+  public void setProperties(Properties properties)
+  {
+    this.properties.putAll(properties);
+  }
+
+  /**
+   * Set the Kafka producer property.
+   *
+   * @param key Producer Property name
+   * @param val Producer Property value
+   */
+  public void setProperty(Object key, Object val)
+  {
+    properties.put(key,val);
+  }
+
+  public String getTopic()
+  {
+    return topic;
+  }
+
+  /**
+   * Set the Kafka topic
+   * @param  topic  Kafka topic for which the data is sent
+   */
+  public void setTopic(String topic)
+  {
+    this.topic = topic;
+  }
+
+  protected Producer<K, V> getProducer()
+  {
+    return producer;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0444ced6/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
new file mode 100644
index 0000000..09ae1cb
--- /dev/null
+++ 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortExactlyOnceOutputOperator.java
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+/**
+ * Kafka output operator with exactly once processing semantics.
+ *<br>
+ *
+ * <p>
+ * <b>Recovery handling</b>
+ * <li> Offsets of the Kafka partitions are stored in the WindowDataManager at 
the endWindow</li>
+ * <li> During recovery,
+ * <ul>
+ *    <li>Partially written Streaming Window before the crash is constructed. 
( Explained below ) </li>
+ *    <li>Tuples from the completed Streaming Window's are skipped </li>
+ *    <li>Tuples coming for the partially written Streaming Window are skipped.
+ *       (No assumption is made on the order and the uniqueness of the tuples) 
</li>
+ *  </ul>
+ *  </li>
+ *</p>
+ *
+ * <p>
+ * <b>Partial Window Construction</b>
+ * <li> Operator uses the Key in the Kafka message, which is not available for 
use by the operator users.</li>
+ * <li> Key is used to uniquely identify the message written by the particular 
instance of this operator.</li>
+ *    This allows multiple writers to same Kafka partitions. Format of the key 
is "APPLICATTION_ID#OPERATOR_ID".
+ * <li>During recovery Kafka partitions are read between the latest offset and 
the last written offsets.</li>
+ *<li>All the tuples written by the particular instance is kept in the Map</li>
+ *</p>
+ *
+ * <p>
+ * <b>Limitations</b>
+ * <li> Key in the Kafka message is reserved for Operator's use </li>
+ * <li> During recovery, operator needs to read tuples between 2 offsets, if 
there are lot of data to be read, Operator may
+ *    appear to be blocked to the Stram and can kill the operator. </li>
+ *</p>
+ *
+ * @displayName Kafka Single Port Exactly Once Output(0.9.0)
+ * @category Messaging
+ * @tags output operator
+ *
+ */
[email protected]
+public class KafkaSinglePortExactlyOnceOutputOperator<T> extends 
AbstractKafkaOutputOperator<String, T>
+    implements Operator.CheckpointNotificationListener
+{
+  private transient String key;
+  private transient String appName;
+  private transient Integer operatorId;
+  private transient Long windowId;
+  private transient Map<T, Integer> partialWindowTuples = new HashMap<>();
+  private transient KafkaConsumer consumer;
+
+  private WindowDataManager windowDataManager = new FSWindowDataManager();
+  private final int KAFKA_CONNECT_ATTEMPT = 10;
+  private final String KEY_SEPARATOR = "#";
+
+  private final String KEY_DESERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+  private final String VALUE_DESERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+
+  public final transient DefaultInputPort<T> inputPort = new 
DefaultInputPort<T>()
+  {
+    @Override
+    public void process(T tuple)
+    {
+      sendTuple(tuple);
+    }
+  };
+
+  @Override
+  public void setup(Context.OperatorContext context)
+  {
+    super.setup(context);
+
+    this.operatorId = context.getId();
+    this.windowDataManager.setup(context);
+    this.appName = context.getValue(Context.DAGContext.APPLICATION_NAME);
+    this.key = appName + KEY_SEPARATOR + (new Integer(operatorId));
+    this.consumer = KafkaConsumerInit();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    this.windowId = windowId;
+
+    if (windowId == windowDataManager.getLargestRecoveryWindow()) {
+      rebuildPartialWindow();
+    }
+  }
+
+  @Override
+  public void checkpointed(long windowId)
+  {
+  }
+
+  @Override
+  public void committed(long windowId)
+  {
+    try {
+      windowDataManager.deleteUpTo(operatorId, windowId);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void beforeCheckpoint(long windowId)
+  {
+  }
+
+  @Override
+  public void teardown()
+  {
+    consumer.close();
+    super.teardown();
+  }
+
+  @Override
+  public void endWindow()
+  {
+    if (!partialWindowTuples.isEmpty() && windowId > 
windowDataManager.getLargestRecoveryWindow()) {
+      throw new RuntimeException("Violates Exactly once. Not all the tuples 
received after operator reset.");
+    }
+
+    // Every tuples should be written before the offsets are stored in the 
window data manager.
+    getProducer().flush();
+
+    try {
+      this.windowDataManager.save(getPartitionsAndOffsets(true), operatorId, 
windowId);
+    } catch (IOException | InterruptedException | ExecutionException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public WindowDataManager getWindowDataManager()
+  {
+    return windowDataManager;
+  }
+
+  public void setWindowDataManager(WindowDataManager windowDataManager)
+  {
+    this.windowDataManager = windowDataManager;
+  }
+
+  private boolean doesKeyBelongsToThisInstance(int operatorId, String key)
+  {
+    String[] split = key.split(KEY_SEPARATOR);
+
+    if (split.length != 2) {
+      return false;
+    }
+
+    if ((Integer.parseInt(split[1]) == operatorId) && 
(split[0].equals(appName))) {
+      return true;
+    }
+
+    return false;
+  }
+
+  private boolean alreadyInKafka(T message)
+  {
+    if ( windowId <= windowDataManager.getLargestRecoveryWindow() ) {
+      return true;
+    }
+
+    if (partialWindowTuples.containsKey(message)) {
+
+      Integer val = partialWindowTuples.get(message);
+
+      if ( val == 0 ) {
+        return false;
+      } else if ( val == 1 ) {
+        partialWindowTuples.remove(message);
+      } else {
+        partialWindowTuples.put(message, val - 1);
+      }
+
+      return true;
+    }
+
+    return false;
+  }
+
+  private Map<Integer,Long> getPartitionsAndOffsets(boolean latest) throws 
ExecutionException, InterruptedException
+  {
+    List<PartitionInfo> partitionInfoList = consumer.partitionsFor(getTopic());
+    List<TopicPartition> topicPartitionList = new java.util.ArrayList<>();
+
+    for ( PartitionInfo partitionInfo: partitionInfoList) {
+      topicPartitionList.add(new TopicPartition(getTopic(), 
partitionInfo.partition()) );
+    }
+
+    Map<Integer,Long> parttionsAndOffset = new HashMap<>();
+    consumer.assign(topicPartitionList);
+
+    for ( PartitionInfo partitionInfo: partitionInfoList) {
+
+      try {
+
+        TopicPartition topicPartition = new TopicPartition(getTopic(), 
partitionInfo.partition());
+        if (latest) {
+          consumer.seekToEnd(topicPartition);
+        } else {
+          consumer.seekToBeginning(topicPartition);
+        }
+        parttionsAndOffset.put(partitionInfo.partition(), 
consumer.position(topicPartition));
+
+      } catch (Exception ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+
+    return parttionsAndOffset;
+  }
+
+  private void rebuildPartialWindow()
+  {
+    logger.info("Rebuild the partial window after " + 
windowDataManager.getLargestRecoveryWindow());
+
+    Map<Integer,Long> storedOffsets;
+    Map<Integer,Long> currentOffsets;
+
+    try {
+      storedOffsets = 
(Map<Integer,Long>)this.windowDataManager.load(operatorId, windowId);
+      currentOffsets = getPartitionsAndOffsets(true);
+    } catch (IOException | ExecutionException | InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+
+    if (currentOffsets == null) {
+      logger.debug("No tuples found while building partial window " + 
windowDataManager.getLargestRecoveryWindow());
+      return;
+    }
+
+    if (storedOffsets == null) {
+
+      logger.debug("Stored offset not available, seeking to the beginning of 
the Kafka Partition.");
+
+      try {
+        storedOffsets = getPartitionsAndOffsets(false);
+      } catch (ExecutionException | InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    List<TopicPartition> topicPartitions = new ArrayList<>();
+
+    for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) {
+
+      topicPartitions.add(new TopicPartition(getTopic(), entry.getKey()));
+    }
+
+    consumer.assign(topicPartitions);
+
+    for (Map.Entry<Integer,Long> entry: currentOffsets.entrySet()) {
+
+      Long storedOffset = 0L;
+      Integer currentPartition = entry.getKey();
+      Long currentOffset = entry.getValue();
+
+      if (storedOffsets.containsKey(currentPartition)) {
+        storedOffset = storedOffsets.get(currentPartition);
+      }
+
+      if (storedOffset >= currentOffset) {
+        continue;
+      }
+
+      try {
+        consumer.seek(new TopicPartition(getTopic(), currentPartition), 
storedOffset);
+      } catch (Exception ex) {
+        logger.info("Rebuilding of the partial window is not complete, exactly 
once recovery is not possible.");
+        throw new RuntimeException(ex);
+      }
+
+      int kafkaAttempt = 0;
+
+      while ( true ) {
+
+        ConsumerRecords<String, String> consumerRecords = consumer.poll(100);
+
+        if (consumerRecords.count() == 0) {
+          if (kafkaAttempt++ == KAFKA_CONNECT_ATTEMPT) {
+            break;
+          }
+        } else {
+          kafkaAttempt = 0;
+        }
+
+        boolean crossedBoundary = false;
+
+        for (ConsumerRecord consumerRecord : consumerRecords) {
+
+          if (!doesKeyBelongsToThisInstance(operatorId, 
(String)consumerRecord.key())) {
+            continue;
+          }
+
+          T value = (T)consumerRecord.value();
+
+          if ( partialWindowTuples.containsKey(value)) {
+            Integer count = partialWindowTuples.get(value);
+            partialWindowTuples.put(value, count + 1);
+          } else {
+            partialWindowTuples.put(value, 1);
+          }
+
+          if (consumerRecord.offset() >= currentOffset) {
+            crossedBoundary = true;
+            break;
+          }
+        }
+
+        if (crossedBoundary) {
+          break;
+        }
+      }
+    }
+  }
+
+  private KafkaConsumer KafkaConsumerInit()
+  {
+    Properties props = new Properties();
+
+    props.put(BOOTSTRAP_SERVERS_CONFIG, 
getProperties().get(BOOTSTRAP_SERVERS_CONFIG));
+    props.put(KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+    props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
+
+    return new KafkaConsumer<>(props);
+  }
+
+  protected void sendTuple(T tuple)
+  {
+    if ( alreadyInKafka(tuple) ) {
+      return;
+    }
+
+    getProducer().send(new ProducerRecord<>(getTopic(), key, tuple),new 
Callback()
+    {
+      public void onCompletion(RecordMetadata metadata, Exception e)
+      {
+        if (e != null) {
+          logger.info("Wrting to Kafka failed with an exception {}" + 
e.getMessage());
+          throw new RuntimeException(e);
+        }
+      }
+    });
+  }
+
+  private static final Logger logger = 
LoggerFactory.getLogger(KafkaSinglePortExactlyOnceOutputOperator.class);
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0444ced6/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java
 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java
new file mode 100644
index 0000000..a08c7a2
--- /dev/null
+++ 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaSinglePortOutputOperator.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.kafka;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import com.datatorrent.api.DefaultInputPort;
+
+/**
+ * Kafka output operator with single input port (inputPort).
+ * It supports atleast once processing guarantees
+ */
[email protected]
+public class KafkaSinglePortOutputOperator<K,V> extends 
AbstractKafkaOutputOperator
+{
+    /**
+     * This input port receives tuples that will be written out to Kafka.
+     */
+  public final transient DefaultInputPort<V> inputPort = new 
DefaultInputPort<V>()
+  {
+    @Override
+    public void process(V tuple)
+    {
+      getProducer().send(new ProducerRecord<K,V>(getTopic(),tuple));
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0444ced6/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java 
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
new file mode 100644
index 0000000..db27be0
--- /dev/null
+++ 
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
@@ -0,0 +1,375 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.kafka;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Operator;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.stram.StramLocalCluster;
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.apex.malhar.lib.wal.WindowDataManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+@Ignore
+public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
+{
+  String testName;
+  private static List<String> tupleCollection = new LinkedList<>();
+  private final String KEY_DESERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+  private final String VALUE_DESERIALIZER = 
"org.apache.kafka.common.serialization.StringDeserializer";
+  private final String KEY_SERIALIZER = 
"org.apache.kafka.common.serialization.StringSerializer";
+  private final String VALUE_SERIALIZER = 
"org.apache.kafka.common.serialization.StringSerializer";
+
+  public static String APPLICATION_PATH = baseDir + File.separator + 
StramLocalCluster.class.getName() + File.separator;
+
+  @Before
+  public void before()
+  {
+    FileUtils.deleteQuietly(new File(APPLICATION_PATH));
+    testName = TEST_TOPIC + testCounter++;
+    createTopic(0, testName);
+    if (hasMultiCluster) {
+      createTopic(1, testName);
+    }
+  }
+
+  @Test
+  public void testExactlyOnceWithFailure() throws Exception
+  {
+    List<String> toKafka = GenerateList();
+
+    sendDataToKafka(true, toKafka, true, false);
+
+    List<String> fromKafka = ReadFromKafka();
+
+    Assert.assertTrue("With Failure", compare(fromKafka, toKafka));
+  }
+
+  @Test
+  public void testExactlyOnceWithNoFailure() throws Exception
+  {
+    List<String> toKafka = GenerateList();
+
+    sendDataToKafka(true, toKafka, false, false);
+
+    List<String> fromKafka = ReadFromKafka();
+
+    Assert.assertTrue("With No Failure", compare(fromKafka, toKafka));
+  }
+
+  @Test
+  public void testExactlyOnceWithDifferentTuplesAfterRecovery() throws 
Exception
+  {
+    List<String> toKafka = GenerateList();
+
+    try {
+      sendDataToKafka(true, toKafka, true, true);
+    } catch (RuntimeException ex) {
+
+      boolean expectedException = false;
+      if ( ex.getMessage().contains("Violates")) {
+        expectedException = true;
+      }
+
+      Assert.assertTrue("Different tuples after recovery", expectedException);
+      return;
+    }
+
+    Assert.assertTrue("Wrong tuples during replay, should throw exception", 
false);
+  }
+
+  @Test
+  public void testKafkaOutput() throws Exception
+  {
+    List<String> toKafka = GenerateList();
+
+    sendDataToKafka(false, toKafka, false, false);
+
+    List<String> fromKafka = ReadFromKafka();
+
+    Assert.assertTrue("No failure", compare(fromKafka, toKafka));
+  }
+
+  @Test
+  public void testKafkaOutputWithFailure() throws Exception
+  {
+    List<String> toKafka = GenerateList();
+
+    sendDataToKafka(false, toKafka, true, true);
+
+    List<String> fromKafka = ReadFromKafka();
+
+    Assert.assertTrue("No failure", fromKafka.size() > toKafka.size());
+  }
+
+  private void sendDataToKafka(boolean exactlyOnce, List<String> toKafka, 
boolean hasFailure, boolean differentTuplesAfterRecovery) throws 
InterruptedException
+  {
+    Properties props = new Properties();
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER);
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KEY_SERIALIZER);
+    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
+
+    Attribute.AttributeMap attributeMap = new 
Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(Context.DAGContext.APPLICATION_NAME, "MyKafkaApp");
+
+    OperatorContextTestHelper.TestIdOperatorContext operatorContext = new 
OperatorContextTestHelper.TestIdOperatorContext(2, attributeMap);
+
+    cleanUp(operatorContext);
+
+    Operator kafkaOutput;
+    DefaultInputPort<String> inputPort;
+
+    if ( exactlyOnce ) {
+      KafkaSinglePortExactlyOnceOutputOperator kafkaOutputTemp = 
ResetKafkaOutput(testName, props, operatorContext);
+      inputPort = kafkaOutputTemp.inputPort;
+      kafkaOutput = kafkaOutputTemp;
+    } else {
+      KafkaSinglePortOutputOperator<String,String> kafkaOutputTemp = 
ResetKafkaSimpleOutput(testName, props, operatorContext);
+      inputPort = kafkaOutputTemp.inputPort;
+      kafkaOutput = kafkaOutputTemp;
+    }
+
+    kafkaOutput.beginWindow(1);
+    inputPort.getSink().put(toKafka.get(0));
+    inputPort.getSink().put(toKafka.get(1));
+    inputPort.getSink().put(toKafka.get(2));
+    kafkaOutput.endWindow();
+    kafkaOutput.beginWindow(2);
+    inputPort.getSink().put(toKafka.get(3));
+    inputPort.getSink().put(toKafka.get(4));
+    inputPort.getSink().put(toKafka.get(5));
+    kafkaOutput.endWindow();
+    kafkaOutput.beginWindow(3);
+    inputPort.getSink().put(toKafka.get(6));
+    inputPort.getSink().put(toKafka.get(7));
+
+    if ( hasFailure ) {
+
+      if ( exactlyOnce ) {
+        KafkaSinglePortExactlyOnceOutputOperator kafkaOutputTemp = 
ResetKafkaOutput(testName, props, operatorContext);
+        inputPort = kafkaOutputTemp.inputPort;
+        kafkaOutput = kafkaOutputTemp;
+      } else {
+        KafkaSinglePortOutputOperator<String,String> kafkaOutputTemp = 
ResetKafkaSimpleOutput(testName, props, operatorContext);
+        inputPort = kafkaOutputTemp.inputPort;
+        kafkaOutput = kafkaOutputTemp;
+      }
+
+      kafkaOutput.beginWindow(2);
+      inputPort.getSink().put(toKafka.get(3));
+      inputPort.getSink().put(toKafka.get(4));
+      inputPort.getSink().put(toKafka.get(5));
+      kafkaOutput.endWindow();
+      kafkaOutput.beginWindow(3);
+      inputPort.getSink().put(toKafka.get(6));
+
+      if (!differentTuplesAfterRecovery) {
+        inputPort.getSink().put(toKafka.get(7));
+      }
+    }
+
+    inputPort.getSink().put(toKafka.get(8));
+    inputPort.getSink().put(toKafka.get(9));
+    kafkaOutput.endWindow();
+    kafkaOutput.beginWindow(4);
+    inputPort.getSink().put(toKafka.get(10));
+    inputPort.getSink().put(toKafka.get(11));
+    kafkaOutput.endWindow();
+
+    cleanUp(operatorContext);
+  }
+
+  private KafkaSinglePortExactlyOnceOutputOperator<String> 
ResetKafkaOutput(String testName, Properties props, Context.OperatorContext 
operatorContext)
+  {
+    KafkaSinglePortExactlyOnceOutputOperator<String> kafkaOutput = new 
KafkaSinglePortExactlyOnceOutputOperator<>();
+    kafkaOutput.setTopic(testName);
+    kafkaOutput.setProperties(props);
+    kafkaOutput.setup(operatorContext);
+
+    return kafkaOutput;
+  }
+
+  private KafkaSinglePortOutputOperator<String,String> 
ResetKafkaSimpleOutput(String testName, Properties props, 
Context.OperatorContext operatorContext)
+  {
+    KafkaSinglePortOutputOperator<String,String> kafkaOutput = new 
KafkaSinglePortOutputOperator<>();
+    kafkaOutput.setTopic(testName);
+    kafkaOutput.setProperties(props);
+    kafkaOutput.setup(operatorContext);
+
+    return kafkaOutput;
+  }
+
+  private void cleanUp(Context.OperatorContext operatorContext)
+  {
+    WindowDataManager windowDataManager = new FSWindowDataManager();
+    windowDataManager.setup(operatorContext);
+    try {
+      
windowDataManager.deleteUpTo(operatorContext.getId(),windowDataManager.getLargestRecoveryWindow());
+    } catch (IOException e) {
+      e.printStackTrace();
+    }
+  }
+
+  private boolean compare(List<String> fromKafka, List<String> toKafka)
+  {
+    if (fromKafka.size() != toKafka.size()) {
+      return false;
+    }
+
+    for (int i = 0; i < fromKafka.size(); ++i) {
+      if ( !fromKafka.get(i).equals(toKafka.get(i))) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  private String getClusterConfig()
+  {
+    String l = "localhost:";
+    return l + TEST_KAFKA_BROKER_PORT[0][0] +
+        (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") +
+        (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") +
+        (hasMultiCluster && hasMultiPartition ? "," + l  + 
TEST_KAFKA_BROKER_PORT[1][1] : "");
+  }
+
+  private List<String> GenerateList()
+  {
+    List<String> strings = new ArrayList<>();
+
+    for (Integer i = 0; i < 12; ++i) {
+
+      strings.add(i.toString());
+    }
+
+    return strings;
+  }
+
+  public List<String> ReadFromKafka()
+  {
+    tupleCollection.clear();
+
+    // Create KafkaSinglePortStringInputOperator
+    Properties props = new Properties();
+    props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
+    props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
+    props.put(KEY_DESERIALIZER_CLASS_CONFIG, KEY_DESERIALIZER);
+    props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
+    props.put(GROUP_ID_CONFIG, "KafkaTest");
+
+
+    LocalMode lma = LocalMode.newInstance();
+    DAG dag = lma.getDAG();
+
+    // Create KafkaSinglePortStringInputOperator
+    KafkaSinglePortInputOperator node = dag.addOperator("Kafka input", 
KafkaSinglePortInputOperator.class);
+    node.setConsumerProps(props);
+    node.setInitialPartitionCount(1);
+    // set topic
+    node.setTopics(testName);
+    
node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
+    node.setClusters(getClusterConfig());
+    node.setStrategy("one_to_one");
+
+    // Create Test tuple collector
+    CollectorModule collector1 = dag.addOperator("collector", new 
CollectorModule());
+
+    // Connect ports
+    dag.addStream("Kafka message", node.outputPort, collector1.inputPort);
+
+
+    // Create local cluster
+    final LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(false);
+
+    lc.run(30000);
+
+    return tupleCollection;
+  }
+
+  public static class CollectorModule extends BaseOperator
+  {
+
+    public final transient CollectorInputPort inputPort = new 
CollectorInputPort(this);
+
+    long currentWindowId;
+    long operatorId;
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      super.setup(context);
+      operatorId = context.getId();
+    }
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+      super.beginWindow(windowId);
+      currentWindowId = windowId;
+    }
+
+    @Override
+    public void endWindow()
+    {
+      super.endWindow();
+    }
+
+  }
+
+  public static class CollectorInputPort extends DefaultInputPort<byte[]>
+  {
+    CollectorModule ownerNode;
+
+    CollectorInputPort(CollectorModule node)
+    {
+      this.ownerNode = node;
+    }
+
+    @Override
+    public void process(byte[] bt)
+    {
+      String tuple = new String(bt);
+      tupleCollection.add(tuple);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/0444ced6/kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/kafka/src/test/resources/log4j.properties 
b/kafka/src/test/resources/log4j.properties
index c115950..910e44a 100644
--- a/kafka/src/test/resources/log4j.properties
+++ b/kafka/src/test/resources/log4j.properties
@@ -38,12 +38,13 @@ log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
 log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - 
%m%n
 log4j.appender.SYSLOG.Facility=LOCAL1
 
-log4j.logger.org=INFO
+#log4j.logger.org=INFO
 
 #log4j.logger.org.apache.commons.beanutils=warn
 log4j.logger.com.datatorrent=INFO
 log4j.logger.org.apache.apex=INFO
 
-log4j.logger.org.apacke.kafka=WARN
+log4j.logger.org.apache.kafka=WARN
 log4j.logger.kafka.consumer=WARN
 log4j.logger.kafka=WARN
+log4j.logger.org.apache.zookeeper=WARN

Reply via email to