Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master 6aa1357a1 -> ab800233b (forced update)


MLHR-1925 #resolve #comment report stats of offsets in committed window only
MLHR-1928 #resolve #comment update offsets after emit


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/41caa952
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/41caa952
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/41caa952

Branch: refs/heads/master
Commit: 41caa952e4a6b2f534fbac5230e68ea0607421a8
Parents: f1fee8f
Author: Siyuan Hua <[email protected]>
Authored: Sun Dec 13 21:55:38 2015 -0800
Committer: Siyuan Hua <[email protected]>
Committed: Sun Dec 13 21:55:38 2015 -0800

----------------------------------------------------------------------
 .../kafka/AbstractKafkaInputOperator.java       | 53 ++++++++++++++++----
 .../contrib/kafka/OffsetManager.java            |  2 +-
 .../contrib/kafka/SimpleKafkaConsumer.java      |  4 +-
 .../contrib/kafka/KafkaInputOperatorTest.java   | 41 ++++++++++++---
 .../contrib/kafka/KafkaOperatorTestBase.java    |  2 +-
 .../contrib/kafka/OffsetManagerTest.java        | 20 +++++---
 6 files changed, 93 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/41caa952/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index 952609f..ec50615 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -38,10 +38,8 @@ import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
 import com.google.common.base.Joiner;
 import com.google.common.base.Predicate;
-import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.common.collect.SetMultimap;
 import com.google.common.collect.Sets;
 
 import java.io.ByteArrayOutputStream;
@@ -74,6 +72,8 @@ import kafka.message.MessageAndOffset;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -145,7 +145,14 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
   protected transient long currentWindowId;
   protected transient int operatorId;
   protected final transient Map<KafkaPartition, MutablePair<Long, Integer>> 
currentWindowRecoveryState;
-  protected transient Map<KafkaPartition, Long> offsetStats = new 
HashMap<KafkaPartition, Long>();
+  /**
+   * Offsets that are checkpointed for recovery
+   */
+  protected Map<KafkaPartition, Long> offsetStats = new 
HashMap<KafkaPartition, Long>();
+  /**
+   * offset history with window id
+   */
+  protected transient List<Pair<Long, Map<KafkaPartition, Long>>> 
offsetTrackHistory = new LinkedList<>();
   private transient OperatorContext context = null;
   // By default the partition policy is 1:1
   public PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
@@ -212,6 +219,10 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
   {
     logger.debug("consumer {} topic {} cacheSize {}", consumer, 
consumer.getTopic(), consumer.getCacheSize());
     consumer.create();
+    // reset the offsets to checkpointed one
+    if (consumer instanceof SimpleKafkaConsumer && !offsetStats.isEmpty()) {
+      ((SimpleKafkaConsumer)consumer).resetOffset(offsetStats);
+    }
     this.context = context;
     operatorId = context.getId();
     if(consumer instanceof HighlevelKafkaConsumer && 
!(idempotentStorageManager instanceof 
IdempotentStorageManager.NoopIdempotentStorageManager)) {
@@ -301,12 +312,13 @@ public abstract class AbstractKafkaInputOperator<K 
extends KafkaConsumer> implem
   @Override
   public void endWindow()
   {
+    //TODO depends on APEX-78 only needs to keep the history of windows needs 
to be commit
+    if (getConsumer() instanceof SimpleKafkaConsumer) {
+      Map<KafkaPartition, Long> carryOn = new HashMap<>(offsetStats);
+      offsetTrackHistory.add(Pair.of(currentWindowId, carryOn));
+    }
     if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) 
{
       try {
-        if((getConsumer() instanceof  SimpleKafkaConsumer)) {
-          SimpleKafkaConsumer cons = (SimpleKafkaConsumer) getConsumer();
-          context.setCounters(cons.getConsumerStats(offsetStats));
-        }
         idempotentStorageManager.save(currentWindowRecoveryState, operatorId, 
currentWindowId);
       }
       catch (IOException e) {
@@ -326,6 +338,23 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
   @Override
   public void committed(long windowId)
   {
+    if ((getConsumer() instanceof  SimpleKafkaConsumer)) {
+      SimpleKafkaConsumer cons = (SimpleKafkaConsumer)getConsumer();
+      for (Iterator<Pair<Long, Map<KafkaPartition, Long>>> iter = 
offsetTrackHistory.iterator(); iter.hasNext(); ) {
+        Pair<Long, Map<KafkaPartition, Long>> item = iter.next();
+        if (item.getLeft() < windowId) {
+          iter.remove();
+          continue;
+        } else if (item.getLeft() == windowId) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("report offsets {} ", 
Joiner.on(';').withKeyValueSeparator("=").join(item.getRight()));
+          }
+          context.setCounters(cons.getConsumerStats(item.getRight()));
+        }
+        break;
+      }
+    }
+
     try {
       idempotentStorageManager.deleteUpTo(operatorId, windowId);
     }
@@ -365,9 +394,6 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
     }
     for (int i = 0; i < count; i++) {
       KafkaConsumer.KafkaMessage message = consumer.pollMessage();
-      // Ignore the duplicate messages
-      if(offsetStats.containsKey(message.kafkaPart) && message.offSet <= 
offsetStats.get(message.kafkaPart))
-        continue;
       emitTuple(message.msg);
       offsetStats.put(message.kafkaPart, message.offSet);
       MutablePair<Long, Integer> offsetAndCount = 
currentWindowRecoveryState.get(message.kafkaPart);
@@ -586,7 +612,12 @@ public abstract class AbstractKafkaInputOperator<K extends 
KafkaConsumer> implem
     Input lInput = new Input(bos.toByteArray());
     @SuppressWarnings("unchecked")
     Partitioner.Partition<AbstractKafkaInputOperator<K>> p = new 
DefaultPartition<AbstractKafkaInputOperator<K>>(kryo.readObject(lInput, 
this.getClass()));
-    p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, 
initOffsets);
+    if (p.getPartitionedInstance().getConsumer() instanceof 
SimpleKafkaConsumer) {
+      p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, 
initOffsets);
+      if (initOffsets != null) {
+        p.getPartitionedInstance().offsetStats.putAll(initOffsets);
+      }
+    }
     newManagers.add(p.getPartitionedInstance().idempotentStorageManager);
 
     PartitionInfo pif = new PartitionInfo();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/41caa952/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java
index 660b3d7..5eb0575 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java
@@ -30,7 +30,7 @@ import java.util.Map;
  */
 public interface OffsetManager
 {
-
+//
 
   /**
    * 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/41caa952/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
index 58ef95f..e10502b 100644
--- 
a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
+++ 
b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
@@ -305,13 +305,13 @@ public class SimpleKafkaConsumer extends KafkaConsumer
 
   // This map maintains mapping between kafka partition and it's leader broker 
in realtime monitored by a thread
   private transient final ConcurrentHashMap<KafkaPartition, Broker> 
partitionToBroker = new ConcurrentHashMap<KafkaPartition, Broker>();
-  
+
   /**
    * Track offset for each partition, so operator could start from the last 
serialized state Use ConcurrentHashMap to
    * avoid ConcurrentModificationException without blocking reads when 
updating in another thread(hashtable or
    * synchronizedmap)
    */
-  private final ConcurrentHashMap<KafkaPartition, Long> offsetTrack = new 
ConcurrentHashMap<KafkaPartition, Long>();
+  private final transient ConcurrentHashMap<KafkaPartition, Long> offsetTrack 
= new ConcurrentHashMap<KafkaPartition, Long>();
 
   private transient AtomicReference<Throwable> monitorException;
   private transient AtomicInteger monitorExceptionCount;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/41caa952/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
 
b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
index eeb9d20..76b3550 100644
--- 
a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
@@ -19,6 +19,7 @@
 package com.datatorrent.contrib.kafka;
 
 import com.datatorrent.api.Attribute;
+import com.datatorrent.api.StringCodec;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
@@ -27,6 +28,7 @@ import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.LocalMode;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Partitioner;
+import com.datatorrent.common.util.FSStorageAgent;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.io.IdempotentStorageManager;
 import com.datatorrent.lib.partitioner.StatelessPartitionerTest;
@@ -41,6 +43,8 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -54,6 +58,8 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
   static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(KafkaInputOperatorTest.class);
   static AtomicInteger tupleCount = new AtomicInteger();
   static CountDownLatch latch;
+  static boolean isSuicide = false;
+  static int suicideTrigger = 3000;
 
   /**
    * Test Operator to collect tuples from KafkaSingleInputStringOperator.
@@ -68,6 +74,8 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
   public static class CollectorInputPort<T> extends DefaultInputPort<T>
   {
 
+    private int k = 0;
+
     public CollectorInputPort(String id, Operator module)
     {
       super();
@@ -76,6 +84,11 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
     @Override
     public void process(T tuple)
     {
+      if (isSuicide && k++ == suicideTrigger) {
+        //you can only kill yourself once
+        isSuicide = false;
+        throw  new RuntimeException();
+      }
       if (tuple.equals(KafkaOperatorTestBase.END_TUPLE)) {
         if (latch != null) {
           latch.countDown();
@@ -84,12 +97,6 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
       }
       tupleCount.incrementAndGet();
     }
-
-    @Override
-    public void setConnected(boolean flag)
-    {
-      tupleCount.set(0);
-    }
   }
 
   /**
@@ -124,6 +131,13 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
 
     // Create KafkaSinglePortStringInputOperator
     KafkaSinglePortStringInputOperator node = dag.addOperator("Kafka message 
consumer", KafkaSinglePortStringInputOperator.class);
+    if(isSuicide) {
+      // make some extreme assumptions to make it fail if checkpointing wrong 
offsets
+      dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
+      dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new 
FSStorageAgent("target/ck", new Configuration()));
+      node.setMaxTuplesPerWindow(500);
+    }
+
     if(idempotent) {
       node.setIdempotentStorageManager(new 
IdempotentStorageManager.FSIdempotentStorageManager());
     }
@@ -131,6 +145,8 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
 
     node.setConsumer(consumer);
 
+    consumer.setCacheSize(5000);
+
     if (isValid) {
       node.setZookeeper("localhost:" + 
KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
     }
@@ -151,7 +167,8 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
     Assert.assertTrue("TIMEOUT: 30s ", latch.await(300000, 
TimeUnit.MILLISECONDS));
 
     // Check results
-    Assert.assertEquals("Tuple count", totalCount, tupleCount.intValue());
+    Assert.assertTrue("Expected count >= " + totalCount + "; Actual count " + 
tupleCount.intValue(),
+      totalCount <= tupleCount.intValue());
     logger.debug(String.format("Number of emitted tuples: %d", 
tupleCount.intValue()));
 
     p.close();
@@ -182,6 +199,16 @@ public class KafkaInputOperatorTest extends 
KafkaOperatorTestBase
   }
 
   @Test
+  public void testKafkaInputOperator_SimpleSuicide() throws Exception
+  {
+    int totalCount = 10000;
+    KafkaConsumer k = new SimpleKafkaConsumer();
+    k.setInitialOffset("earliest");
+    isSuicide = true;
+    testKafkaInputOperator(1000, totalCount, k, true, false);
+  }
+
+  @Test
   public void testKafkaInputOperator_Simple_Idempotent() throws Exception
   {
     int totalCount = 10000;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/41caa952/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
 
b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
index c21b2e4..f4f5ef2 100644
--- 
a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
+++ 
b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
@@ -58,7 +58,7 @@ public class KafkaOperatorTestBase
   // multiple cluster
   private final ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2];
 
-  public String baseDir = "/tmp";
+  public String baseDir = "target";
 
   private final String zkBaseDir = "zookeeper-server-data";
   private final String kafkaBaseDir = "kafka-server-data";

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/41caa952/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java 
b/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
index 7b36ea8..04fe282 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.Context;
 import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultInputPort;
@@ -64,6 +65,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase
   static CountDownLatch latch;
   static final String OFFSET_FILE = ".offset";
   static long initialPos = 10l;
+  static Path baseFolder = new Path("target");
 
 
   public static class TestOffsetManager implements OffsetManager{
@@ -98,8 +100,8 @@ public class OffsetManagerTest extends KafkaOperatorTestBase
       offsets.putAll(offsetsOfPartitions);
 
       try {
-        Path tmpFile = new Path(filename + ".tmp");
-        Path dataFile = new Path(filename);
+        Path tmpFile = new Path(baseFolder, filename + ".tmp");
+        Path dataFile = new Path(baseFolder, filename);
         FSDataOutputStream out = fs.create(tmpFile, true);
         for (Entry<KafkaPartition, Long> e : offsets.entrySet()) {
           out.writeBytes(e.getKey() +", " + e.getValue() + "\n");
@@ -142,7 +144,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase
   /**
    * Test Operator to collect tuples from KafkaSingleInputStringOperator.
    *
-   * @param <T>
+   * @param
    */
   public static class CollectorModule extends BaseOperator
   {
@@ -226,7 +228,7 @@ public class OffsetManagerTest extends KafkaOperatorTestBase
   private void cleanFile()
   {
     try {
-      FileSystem.get(new Configuration()).delete(new Path(TEST_TOPIC + 
OFFSET_FILE), true);
+      FileSystem.get(new Configuration()).delete(new Path(baseFolder, 
TEST_TOPIC + OFFSET_FILE), true);
     } catch (IOException e) {
 
     }
@@ -278,18 +280,22 @@ public class OffsetManagerTest extends 
KafkaOperatorTestBase
     // Connect ports
     dag.addStream("Kafka message", node.outputPort, 
collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
 
+    dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
+
     // Create local cluster
     final LocalMode.Controller lc = lma.getController();
     lc.setHeartbeatMonitoringEnabled(true);
 
     lc.runAsync();
 
-    // Wait 30s for consumer finish consuming all the messages and offsets has 
been updated to 100
-    assertTrue("TIMEOUT: 30s, collected " + collectedTuples + " tuples", 
latch.await(30000, TimeUnit.MILLISECONDS));
 
 
+    boolean isNotTimeout = latch.await(30000, TimeUnit.MILLISECONDS);
+    // Wait 30s for consumer finish consuming all the messages and offsets has 
been updated to 100
+    assertTrue("TIMEOUT: 30s, collected " + collectedTuples.size() + " 
tuples", isNotTimeout);
+
     // Check results
-    assertEquals("Tuple count", expectedCount, collectedTuples.size());
+    assertEquals("Tuple count " + collectedTuples, expectedCount, 
collectedTuples.size());
     logger.debug(String.format("Number of emitted tuples: %d", 
collectedTuples.size()));
 
     p.close();

Reply via email to