Repository: apex-malhar
Updated Branches:
  refs/heads/master 719cf952d -> 389a2d564


APEXMALHAR-2120 #resolve #comment solve problems of KafkaInputOperatorTest and 
AbstractKafkaInputOperator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/f2b7a856
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/f2b7a856
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/f2b7a856

Branch: refs/heads/master
Commit: f2b7a85677ab9ada29c95ff7821c43459c8468dd
Parents: d2f0586
Author: brightchen <[email protected]>
Authored: Tue Jun 14 16:30:17 2016 -0700
Committer: brightchen <[email protected]>
Committed: Fri Jun 24 09:56:48 2016 -0700

----------------------------------------------------------------------
 .../malhar/kafka/AbstractKafkaPartitioner.java  |  39 ++-
 .../apex/malhar/kafka/KafkaConsumerWrapper.java |  58 ++---
 .../malhar/kafka/KafkaInputOperatorTest.java    | 237 +++++++++++++------
 .../apex/malhar/kafka/KafkaTestProducer.java    |  54 +++--
 kafka/src/test/resources/log4j.properties       |  49 ++++
 5 files changed, 302 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f2b7a856/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
index 7bb8585..c6e47e9 100644
--- 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
+++ 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaPartitioner.java
@@ -45,6 +45,8 @@ import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.StatsListener;
 import com.datatorrent.lib.util.KryoCloneUtils;
 
+import kafka.common.AuthorizationException;
+
 /**
  * Abstract partitioner used to manage the partitions of kafka input operator.
  * It use a number of kafka consumers(one for each cluster) to get the latest 
partition metadata for topics that
@@ -87,27 +89,50 @@ public abstract class AbstractKafkaPartitioner implements 
Partitioner<AbstractKa
   @Override
   public Collection<Partition<AbstractKafkaInputOperator>> 
definePartitions(Collection<Partition<AbstractKafkaInputOperator>> collection, 
PartitioningContext partitioningContext)
   {
-
     initMetadataClients();
 
     Map<String, Map<String, List<PartitionInfo>>> metadata = new HashMap<>();
 
-
     for (int i = 0; i < clusters.length; i++) {
       metadata.put(clusters[i], new HashMap<String, List<PartitionInfo>>());
       for (String topic : topics) {
-        List<PartitionInfo> ptis = 
metadataRefreshClients.get(i).partitionsFor(topic);
-        if (logger.isDebugEnabled()) {
-          logger.debug("Partition metadata for topic {} : {}", topic, 
Joiner.on(';').join(ptis));
+        int tryTime = 3;
+        while (tryTime-- > 0) {
+          try {
+            List<PartitionInfo> ptis = 
metadataRefreshClients.get(i).partitionsFor(topic);
+            if (logger.isDebugEnabled()) {
+              logger.debug("Partition metadata for topic {} : {}", topic, 
Joiner.on(';').join(ptis));
+            }
+            metadata.get(clusters[i]).put(topic, ptis);
+            break;
+          } catch (AuthorizationException ae) {
+            logger.error("Kafka AuthorizationException.");
+            throw new RuntimeException("Kafka AuthorizationException.", ae);
+          } catch (Exception e) {
+            logger.warn("Got Exception when trying get partition info for 
topic {}.", topic, e);
+            try {
+              Thread.sleep(100);
+            } catch (Exception e1) {
+              //ignore
+            }
+          }
+        }
+        if (tryTime == 0) {
+          throw new RuntimeException("Get partition info completely failed. 
Please check the log file");
         }
-        metadata.get(clusters[i]).put(topic, ptis);
       }
       metadataRefreshClients.get(i).close();
     }
 
     metadataRefreshClients = null;
 
-    List<Set<AbstractKafkaPartitioner.PartitionMeta>> parts = assign(metadata);
+    List<Set<AbstractKafkaPartitioner.PartitionMeta>> parts = null;
+    try {
+      parts = assign(metadata);
+    } catch (Exception e) {
+      logger.error("assign() exception.", e);
+      e.printStackTrace();
+    }
 
 
     if (currentPartitions == parts || currentPartitions.equals(parts)) {

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f2b7a856/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java 
b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
index adc9540..143a5bd 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,7 +75,7 @@ public class KafkaConsumerWrapper implements Closeable
 
   private static final Logger logger = 
LoggerFactory.getLogger(KafkaConsumerWrapper.class);
 
-  private boolean isAlive = false;
+  private AtomicBoolean isAlive = new AtomicBoolean(false);
 
   private final Map<String, KafkaConsumer<byte[], byte[]>> consumers = new 
HashMap<>();
 
@@ -129,6 +130,12 @@ public class KafkaConsumerWrapper implements Closeable
         if (meta.getTopicPartition().equals(tp)) {
           kc.resume(tp);
         } else {
+          try {
+            kc.position(tp);
+          } catch (NoOffsetForPartitionException e) {
+            //the poll() method of a consumer will throw exception if any of 
subscribed consumers not initialized with position
+            handleNoOffsetForPartitionException(e, kc);
+          }
           kc.pause(tp);
         }
       }
@@ -188,7 +195,7 @@ public class KafkaConsumerWrapper implements Closeable
       try {
 
 
-        while (wrapper.isAlive) {
+        while (wrapper.isAlive.get()) {
           if (wrapper.waitForReplay) {
             Thread.sleep(100);
             continue;
@@ -207,19 +214,7 @@ public class KafkaConsumerWrapper implements Closeable
               wrapper.putMessage(Pair.of(cluster, record));
             }
           } catch (NoOffsetForPartitionException e) {
-            // if initialOffset is set to EARLIST or LATEST
-            // and the application is run as first time
-            // then there is no existing committed offset and this error will 
be caught
-            // we need to seek to either beginning or end of the partition
-            // based on the initial offset setting
-            AbstractKafkaInputOperator.InitialOffset io =
-                
AbstractKafkaInputOperator.InitialOffset.valueOf(wrapper.ownerOperator.getInitialOffset());
-            if (io == 
AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST
-                || io == AbstractKafkaInputOperator.InitialOffset.EARLIEST) {
-              consumer.seekToBeginning(e.partitions().toArray(new 
TopicPartition[0]));
-            } else {
-              consumer.seekToEnd(e.partitions().toArray(new 
TopicPartition[0]));
-            }
+            wrapper.handleNoOffsetForPartitionException(e, consumer);
           } catch (InterruptedException e) {
             throw new IllegalStateException("Consumer thread is interrupted 
unexpectedly", e);
           }
@@ -233,7 +228,24 @@ public class KafkaConsumerWrapper implements Closeable
       }
     }
   }
-
+  
+  protected void 
handleNoOffsetForPartitionException(NoOffsetForPartitionException e, 
KafkaConsumer<byte[], byte[]> consumer)
+  {
+    // if initialOffset is set to EARLIST or LATEST
+    // and the application is run as first time
+    // then there is no existing committed offset and this error will be caught
+    // we need to seek to either beginning or end of the partition
+    // based on the initial offset setting
+    AbstractKafkaInputOperator.InitialOffset io =
+        
AbstractKafkaInputOperator.InitialOffset.valueOf(ownerOperator.getInitialOffset());
+    if (io == AbstractKafkaInputOperator.InitialOffset.APPLICATION_OR_EARLIEST
+        || io == AbstractKafkaInputOperator.InitialOffset.EARLIEST) {
+      consumer.seekToBeginning(e.partitions().toArray(new TopicPartition[0]));
+    } else {
+      consumer.seekToEnd(e.partitions().toArray(new TopicPartition[0]));
+    }
+  
+  }
 
   /**
    * This method is called in setup method of Abstract Kafka Input Operator
@@ -255,7 +267,7 @@ public class KafkaConsumerWrapper implements Closeable
   public void start(boolean waitForReplay)
   {
     this.waitForReplay = waitForReplay;
-    isAlive = true;
+    isAlive.set(true);
 
     // thread to consume the kafka data
     // create thread pool for consumer threads
@@ -330,11 +342,11 @@ public class KafkaConsumerWrapper implements Closeable
    */
   public void stop()
   {
+    isAlive.set(false);
     for (KafkaConsumer<byte[], byte[]> c : consumers.values()) {
       c.wakeup();
     }
     kafkaConsumerExecutor.shutdownNow();
-    isAlive = false;
     holdingBuffer.clear();
     IOUtils.closeQuietly(this);
   }
@@ -347,16 +359,6 @@ public class KafkaConsumerWrapper implements Closeable
     holdingBuffer.clear();
   }
 
-  public boolean isAlive()
-  {
-    return isAlive;
-  }
-
-  public void setAlive(boolean isAlive)
-  {
-    this.isAlive = isAlive;
-  }
-
   public Pair<String, ConsumerRecord<byte[], byte[]>> pollMessage()
   {
     return holdingBuffer.poll();

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f2b7a856/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java 
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index 72ecd57..8440615 100644
--- 
a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ 
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -34,14 +34,16 @@ import java.util.concurrent.TimeUnit;
 
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestWatcher;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.slf4j.LoggerFactory;
 
 import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
-import org.apache.commons.io.FileUtils;
+
+import com.google.common.collect.Lists;
 
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
@@ -55,7 +57,6 @@ import com.datatorrent.stram.StramLocalCluster;
  * A bunch of test to verify the input operator will be automatically 
partitioned per kafka partition This test is launching its
  * own Kafka cluster.
  */
-@Ignore
 @RunWith(Parameterized.class)
 public class KafkaInputOperatorTest extends KafkaOperatorTestBase
 {
@@ -65,13 +66,36 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
   private String partition = null;
 
   private String testName = "";
-
+  
   public static String APPLICATION_PATH = baseDir + File.separator + 
StramLocalCluster.class.getName() + File.separator;
 
+  public class KafkaTestInfo extends TestWatcher
+  {
+    public org.junit.runner.Description desc;
+
+    public String getDir()
+    {
+      String methodName = desc.getMethodName();
+      String className = desc.getClassName();
+      return "target/" + className + "/" + methodName + "/" + testName;
+    }
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      this.desc = description;
+    }
+  }
+  
+  @Rule
+  public final KafkaTestInfo testInfo = new KafkaTestInfo();
+  
+  
   @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, 
partition: {2}")
   public static Collection<Object[]> testScenario()
   {
-    return Arrays.asList(new Object[][]{{true, false, "one_to_one"},// multi 
cluster with single partition
+    return Arrays.asList(new Object[][]{
+      {true, false, "one_to_one"},// multi cluster with single partition
       {true, false, "one_to_many"},
       {true, true, "one_to_one"},// multi cluster with multi partitions
       {true, true, "one_to_many"},
@@ -82,12 +106,17 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
     });
   }
 
+
+
   @Before
   public void before()
   {
-    FileUtils.deleteQuietly(new File(APPLICATION_PATH));
-    tupleCollection.clear();
     testName = TEST_TOPIC + testCounter++;
+    logger.info("before() test case: {}", testName);
+    tupleCollection.clear();
+    //reset count for next new test case
+    k = 0;
+    
     createTopic(0, testName);
     if (hasMultiCluster) {
       createTopic(1, testName);
@@ -107,12 +136,24 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
 
   private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(KafkaInputOperatorTest.class);
   private static List<String> tupleCollection = new LinkedList<>();
-  private static Map<String, Set<String>> tupleCollectedInWindow = new 
HashMap<>();
+
+  /**
+   * whether countDown latch count all tuples or just END_TUPLE
+   */
+  private static final boolean countDownAll = false;
+  private static final int scale = 2;
+  private static final int totalCount = 10 * scale;
+  private static final int failureTrigger = 3 * scale;
+  private static final int tuplesPerWindow = 5 * scale;
+  private static final int waitTime = 60000 + 300 * scale;
+  
+  //This latch was used to count the END_TUPLE, but the order of tuple can't 
be guaranteed, 
+  //so, count valid tuple instead.
   private static CountDownLatch latch;
   private static boolean hasFailure = false;
-  private static int failureTrigger = 3000;
   private static int k = 0;
-
+  private static Thread monitorThread;
+  
   /**
    * Test Operator to collect tuples from KafkaSingleInputStringOperator.
    *
@@ -120,8 +161,14 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
    */
   public static class CollectorModule extends BaseOperator
   {
-
-    public final transient CollectorInputPort inputPort = new 
CollectorInputPort(this);
+    public final transient DefaultInputPort<byte[]> inputPort = new 
DefaultInputPort<byte[]>()
+    {
+      @Override
+      public void process(byte[] bt)
+      {
+        processTuple(bt);
+      }
+    };
 
     long currentWindowId;
 
@@ -129,8 +176,10 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
 
     boolean isIdempotentTest = false;
 
-    transient Set<String> windowTupleCollector = new HashSet<>();
-
+    transient List<String> windowTupleCollector = Lists.newArrayList();
+    private transient Map<String, List<String>> tupleCollectedInWindow = new 
HashMap<>();
+    private int endTuples = 0;
+    
     @Override
     public void setup(Context.OperatorContext context)
     {
@@ -143,59 +192,80 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
     {
       super.beginWindow(windowId);
       currentWindowId = windowId;
+      windowTupleCollector.clear();
+      endTuples = 0;
     }
 
+    
+    public void processTuple(byte[] bt)
+    {
+      String tuple = new String(bt);
+      if (hasFailure && k++ == failureTrigger) {
+        //you can only kill yourself once
+        hasFailure = false;
+        throw new RuntimeException();
+      }
+      if (tuple.startsWith(KafkaOperatorTestBase.END_TUPLE)) {
+        endTuples++;
+      }
+      
+      windowTupleCollector.add(tuple);
+    }
+    
     @Override
     public void endWindow()
     {
       super.endWindow();
       if (isIdempotentTest) {
         String key = operatorId + "," + currentWindowId;
-        Set<String> msgsInWin = tupleCollectedInWindow.get(key);
-        if (msgsInWin!=null) {
+        List<String> msgsInWin = tupleCollectedInWindow.get(key);
+        if (msgsInWin != null) {
           Assert.assertEquals("replay messages should be exactly same as 
previous window", msgsInWin, windowTupleCollector);
         } else {
-          Set<String> newSet = new HashSet<>();
-          newSet.addAll(windowTupleCollector);
-          tupleCollectedInWindow.put(key, newSet);
+          List<String> newList = Lists.newArrayList();
+          newList.addAll(windowTupleCollector);
+          tupleCollectedInWindow.put(key, newList);
         }
       }
-      windowTupleCollector.clear();
-    }
-
-  }
-
-  public static class CollectorInputPort extends DefaultInputPort<byte[]>
-  {
-    CollectorModule ownerNode;
-
-    CollectorInputPort(CollectorModule node) {
-      this.ownerNode = node;
-    }
 
-    @Override
-    public void process(byte[] bt)
-    {
-      String tuple = new String(bt);
-      if (hasFailure && k++ == failureTrigger) {
-        //you can only kill yourself once
-        hasFailure = false;
-        throw new RuntimeException();
-      }
-      if (tuple.equals(KafkaOperatorTestBase.END_TUPLE)) {
-        if (latch != null) {
-          latch.countDown();
+      //discard the tuples of this window if except happened
+      int tupleSize = windowTupleCollector.size();
+      tupleCollection.addAll(windowTupleCollector);
+      
+      int countDownTupleSize = countDownAll ? tupleSize : endTuples;
+
+      if (latch != null) {
+        Assert.assertTrue("received END_TUPLES more than expected.", 
latch.getCount() >= countDownTupleSize);
+        while (countDownTupleSize > 0) {
+            latch.countDown();
+            --countDownTupleSize;
+        }
+        if (latch.getCount() == 0) {
+          /**
+           * The time before countDown() and the shutdown() of the application
+           * will cause fatal error:
+           * "Catastrophic Error: Invalid State - the operator blocked 
forever!"
+           * as the activeQueues could be cleared but alive haven't changed 
yet.
+           * throw the ShutdownException to let the engine shutdown;
+           */
+          try {
+            throw new ShutdownException();
+            //lc.shutdown();
+          } finally {
+            /**
+             * interrupt the engine thread, let it wake from sleep and handle
+             * the shutdown at this time, all payload should be handled. so it
+             * should be ok to interrupt
+             */
+            monitorThread.interrupt();
+          }
         }
-        return;
-      }
-      tupleCollection.add(tuple);
-      if (ownerNode.isIdempotentTest) {
-        ownerNode.windowTupleCollector.add(tuple);
       }
     }
 
   }
 
+
   /**
    * Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for 
Kafka, aka consumer). This module receives
    * data from an outside test generator through Kafka message bus and feed 
that data into Malhar streaming platform.
@@ -230,11 +300,11 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
 
   public void testInputOperator(boolean hasFailure, boolean idempotent) throws 
Exception
   {
-
     // each broker should get a END_TUPLE message
-    latch = new CountDownLatch(totalBrokers);
+    latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : 
totalBrokers);
 
-    int totalCount = 10000;
+    logger.info("Test Case: name: {}; totalBrokers: {}; hasFailure: {}; 
hasMultiCluster: {}; hasMultiPartition: {}, partition: {}", 
+        testName, totalBrokers, hasFailure, hasMultiCluster, 
hasMultiPartition, partition); 
 
     // Start producer
     KafkaTestProducer p = new KafkaTestProducer(testName, hasMultiPartition, 
hasMultiCluster);
@@ -242,19 +312,21 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
     Thread t = new Thread(p);
     t.start();
 
+    int expectedReceiveCount = totalCount + totalBrokers;
+    
     // Create DAG for testing.
     LocalMode lma = LocalMode.newInstance();
     DAG dag = lma.getDAG();
 
     // Create KafkaSinglePortStringInputOperator
-    KafkaSinglePortInputOperator node = dag.addOperator("Kafka input", 
KafkaSinglePortInputOperator.class);
+    KafkaSinglePortInputOperator node = dag.addOperator("Kafka input" + 
testName, KafkaSinglePortInputOperator.class);
     node.setInitialPartitionCount(1);
     // set topic
     node.setTopics(testName);
     
node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
     node.setClusters(getClusterConfig());
     node.setStrategy(partition);
-    if(idempotent) {
+    if (idempotent) {
       node.setWindowDataManager(new FSWindowDataManager());
     }
 
@@ -264,49 +336,60 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
     collector.isIdempotentTest = idempotent;
 
     // Connect ports
-    dag.addStream("Kafka message", node.outputPort, 
collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
-
+    dag.addStream("Kafka message" + testName, node.outputPort, 
collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
 
     if (hasFailure) {
       setupHasFailureTest(node, dag);
     }
 
     // Create local cluster
-    final LocalMode.Controller lc = lma.getController();
+    LocalMode.Controller lc = lma.getController();
     lc.setHeartbeatMonitoringEnabled(false);
 
-    lc.runAsync();
-
-    // Wait 30s for consumer finish consuming all the messages
-    boolean notTimeout = latch.await(40000, TimeUnit.MILLISECONDS);
-    Collections.sort(tupleCollection, new Comparator<String>()
-    {
-      @Override
-      public int compare(String o1, String o2)
-      {
-        return Integer.parseInt(o1.split("_")[1]) - 
Integer.parseInt(o2.split("_")[1]);
-      }
-    });
-    Assert.assertTrue("TIMEOUT: 40s Collected " + tupleCollection, notTimeout);
+    //let the Controller to run the inside another thread. It is almost same 
as call Controller.runAsync(), 
+    //but Controller.runAsync() don't expose the thread which run it, so we 
don't know when the thread will be terminated.
+    //create this thread and then call join() to make sure the Controller 
shutdown completely.
+    monitorThread = new Thread((StramLocalCluster)lc, "master");
+    monitorThread.start();
+
+    boolean notTimeout = true;
+    try {
+      // Wait 60s for consumer finish consuming all the messages
+      notTimeout = latch.await(waitTime, TimeUnit.MILLISECONDS);
+      lc.shutdown();
+
+      //wait until control thread finished.
+      monitorThread.join();
+    } catch (Exception e) {
+      logger.warn(e.getMessage());
+    }
+    
+    t.join();
+    
+    if (!notTimeout || expectedReceiveCount != tupleCollection.size()) {
+      logger.info("Number of received/expected tuples: {}/{}, testName: {}, 
tuples: \n{}", tupleCollection.size(),
+          expectedReceiveCount, testName, tupleCollection);
+    }
+    Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected 
data: " + tupleCollection, notTimeout);
 
     // Check results
-    Assert.assertTrue("Collected tuples " + tupleCollection + " Tuple count is 
not expected", totalCount <=+ tupleCollection.size());
-    logger.debug(String.format("Number of emitted tuples: %d", 
tupleCollection.size()));
-
-    t.join();
-    p.close();
-    lc.shutdown();
+    Assert.assertTrue( "testName: " + testName + "; Collected tuple size: " + 
tupleCollection.size() + "; Expected tuple size: " + expectedReceiveCount + "; 
data: \n" + tupleCollection, 
+        expectedReceiveCount == tupleCollection.size());
+    
+    logger.info("End of test case: {}", testName);
   }
 
+  
   private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG 
dag)
   {
     operator.setHoldingBufferSize(5000);
     dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
     //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new 
FSStorageAgent(APPLICATION_PATH + "failureck", new Configuration()));
-    operator.setMaxTuplesPerWindow(500);
+    operator.setMaxTuplesPerWindow(tuplesPerWindow);
   }
 
-  private String getClusterConfig() {
+  private String getClusterConfig()
+  {
     String l = "localhost:";
     return l + TEST_KAFKA_BROKER_PORT[0][0] +
       (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") +

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f2b7a856/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java 
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
index 36130ce..2f24a8a 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
@@ -28,8 +28,11 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.StringSerializer;
 
+import com.google.common.collect.Lists;
+
 /**
  * A kafka producer for testing
  */
@@ -102,31 +105,27 @@ public class KafkaTestProducer implements Runnable
     this(topic, hasPartition, false);
   }
 
+  private transient List<Future<RecordMetadata>> sendTasks = 
Lists.newArrayList();
+  
   private void generateMessages()
   {
     // Create dummy message
     int messageNo = 1;
     while (messageNo <= sendCount) {
-      String messageStr = "Message_" + messageNo;
+      String messageStr = "_" + messageNo++;
       int k = rand.nextInt(100);
-      producer.send(new ProducerRecord<>(topic, "" + k, "c1" + messageStr));
-      if(hasMultiCluster){
-        messageNo++;
-        producer1.send(new ProducerRecord<>(topic, "" + k, "c2" + messageStr));
+      sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + k, "c1" + 
messageStr)));
+      if(hasMultiCluster && messageNo <= sendCount){
+        messageStr = "_" + messageNo++;
+        sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + k, "c2" 
+ messageStr)));
       }
-      messageNo++;
       // logger.debug(String.format("Producing %s", messageStr));
     }
     // produce the end tuple to let the test input operator know it's done 
produce messages
-    producer.send(new ProducerRecord<>(topic, "" + 0, 
KafkaOperatorTestBase.END_TUPLE));
-    if(hasMultiCluster) {
-      producer1.send(new ProducerRecord<>(topic, "" + 0, 
KafkaOperatorTestBase.END_TUPLE));
-    }
-    if(hasPartition){
-      // Send end_tuple to other partition if it exist
-      producer.send(new ProducerRecord<>(topic, "" + 1, 
KafkaOperatorTestBase.END_TUPLE));
-      if(hasMultiCluster) {
-        producer1.send(new ProducerRecord<>(topic, "" + 1, 
KafkaOperatorTestBase.END_TUPLE));
+    for (int i = 0; i < (hasPartition ? 2 : 1); ++i) {
+      sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + i, 
KafkaOperatorTestBase.END_TUPLE)));
+      if (hasMultiCluster) {
+        sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + i, 
KafkaOperatorTestBase.END_TUPLE)));
       }
     }
   }
@@ -138,23 +137,32 @@ public class KafkaTestProducer implements Runnable
       generateMessages();
     } else {
       for (String msg : messages) {
-        Future f = producer.send(new ProducerRecord<>(topic, "", msg));
-        try {
-          f.get(30, TimeUnit.SECONDS);
-        } catch (Exception e) {
-          throw new RuntimeException(e);
-        }
+        sendTasks.add(producer.send(new ProducerRecord<>(topic, "", msg)));
       }
     }
-    producer.close();
+    
+    producer.flush();
     if (producer1!=null) {
-      producer1.close();
+      producer1.flush();
     }
+    
+    try {
+      for (Future<RecordMetadata> task : sendTasks) {
+        task.get(30, TimeUnit.SECONDS);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    
+    close();
   }
 
   public void close()
   {
     producer.close();
+    if (producer1!=null) {
+      producer1.close();
+    }
   }
 
   public String getAckType()

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/f2b7a856/kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/kafka/src/test/resources/log4j.properties 
b/kafka/src/test/resources/log4j.properties
new file mode 100644
index 0000000..c115950
--- /dev/null
+++ b/kafka/src/test/resources/log4j.properties
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+log4j.rootLogger=DEBUG,CONSOLE
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M 
- %m%n
+log4j.appender.CONSOLE.threshold=INFO
+#log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=WARN
+
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - 
%m%n
+log4j.appender.RFA.File=/tmp/app.log
+
+# to enable, add SYSLOG to rootLogger
+log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
+log4j.appender.SYSLOG.syslogHost=127.0.0.1
+log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
+log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - 
%m%n
+log4j.appender.SYSLOG.Facility=LOCAL1
+
+log4j.logger.org=INFO
+
+#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.com.datatorrent=INFO
+log4j.logger.org.apache.apex=INFO
+
+log4j.logger.org.apacke.kafka=WARN
+log4j.logger.kafka.consumer=WARN
+log4j.logger.kafka=WARN

Reply via email to