Repository: apex-malhar
Updated Branches:
  refs/heads/master 0c4b3fce2 -> 7b2d7e3d9


APEXMALHAR-2230 simplify the kafka input operator test


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/5909dfdc
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/5909dfdc
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/5909dfdc

Branch: refs/heads/master
Commit: 5909dfdc491fdca0cea7eca56fe72b8e1d32bcc3
Parents: 9f9da0e
Author: Siyuan Hua <hsy...@apache.org>
Authored: Thu Sep 15 08:31:34 2016 -0700
Committer: Siyuan Hua <hsy...@apache.org>
Committed: Thu Sep 15 08:31:34 2016 -0700

----------------------------------------------------------------------
 .../malhar/kafka/KafkaInputOperatorTest.java    | 32 ++------------------
 1 file changed, 3 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/5909dfdc/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java 
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index 8440615..47a374b 100644
--- 
a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ 
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -137,10 +137,6 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
   private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(KafkaInputOperatorTest.class);
   private static List<String> tupleCollection = new LinkedList<>();
 
-  /**
-   * whether countDown latch count all tuples or just END_TUPLE
-   */
-  private static final boolean countDownAll = false;
   private static final int scale = 2;
   private static final int totalCount = 10 * scale;
   private static final int failureTrigger = 3 * scale;
@@ -232,33 +228,11 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
       int tupleSize = windowTupleCollector.size();
       tupleCollection.addAll(windowTupleCollector);
       
-      int countDownTupleSize = countDownAll ? tupleSize : endTuples;
+      int countDownTupleSize = endTuples;
 
       if (latch != null) {
-        Assert.assertTrue("received END_TUPLES more than expected.", 
latch.getCount() >= countDownTupleSize);
-        while (countDownTupleSize > 0) {
+        while (countDownTupleSize-- > 0) {
             latch.countDown();
-            --countDownTupleSize;
-        }
-        if (latch.getCount() == 0) {
-          /**
-           * The time before countDown() and the shutdown() of the application
-           * will cause fatal error:
-           * "Catastrophic Error: Invalid State - the operator blocked 
forever!"
-           * as the activeQueues could be cleared but alive haven't changed 
yet.
-           * throw the ShutdownException to let the engine shutdown;
-           */
-          try {
-            throw new ShutdownException();
-            //lc.shutdown();
-          } finally {
-            /**
-             * interrupt the engine thread, let it wake from sleep and handle
-             * the shutdown at this time, all payload should be handled. so it
-             * should be ok to interrupt
-             */
-            monitorThread.interrupt();
-          }
         }
       }
     }
@@ -301,7 +275,7 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
   public void testInputOperator(boolean hasFailure, boolean idempotent) throws 
Exception
   {
     // each broker should get a END_TUPLE message
-    latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : 
totalBrokers);
+    latch = new CountDownLatch(totalBrokers);
 
     logger.info("Test Case: name: {}; totalBrokers: {}; hasFailure: {}; 
hasMultiCluster: {}; hasMultiPartition: {}, partition: {}", 
         testName, totalBrokers, hasFailure, hasMultiCluster, 
hasMultiPartition, partition); 

Reply via email to