Repository: incubator-apex-core
Updated Branches:
  refs/heads/devel-3 c30dd952c -> 38a6f4e3b


- APEX-83 Fixed race condition for storing the applicationWindowCount and 
checkpointWindowCount


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/2c4254f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/2c4254f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/2c4254f9

Branch: refs/heads/devel-3
Commit: 2c4254f95eb40aa18b3daa09c382e2158909b2e3
Parents: c30dd95
Author: Timothy Farkas <[email protected]>
Authored: Sun Nov 15 20:52:27 2015 -0800
Committer: Timothy Farkas <[email protected]>
Committed: Tue Dec 1 00:39:52 2015 -0800

----------------------------------------------------------------------
 .../java/com/datatorrent/stram/engine/Node.java |  27 ++-
 .../stram/engine/GenericNodeTest.java           | 211 ++++++++++++++++++-
 2 files changed, 229 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/2c4254f9/engine/src/main/java/com/datatorrent/stram/engine/Node.java
----------------------------------------------------------------------
diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java 
b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
index c66df12..5c6b86f 100644
--- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java
+++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java
@@ -124,7 +124,7 @@ public abstract class Node<OPERATOR extends Operator> 
implements Component<Opera
   private final List<Field> metricFields;
   private final Map<String, Method> metricMethods;
   private ExecutorService executorService;
-  private Queue<Pair<FutureTask<Stats.CheckpointStats>, Long>> taskQueue;
+  private Queue<Pair<FutureTask<Stats.CheckpointStats>, CheckpointWindowInfo>> 
taskQueue;
   protected Stats.CheckpointStats checkpointStats;
 
   public Node(OPERATOR operator, OperatorContext context)
@@ -132,7 +132,7 @@ public abstract class Node<OPERATOR extends Operator> 
implements Component<Opera
     this.operator = operator;
     this.context = context;
     executorService = Executors.newSingleThreadExecutor();
-    taskQueue = new LinkedList<Pair<FutureTask<Stats.CheckpointStats>, 
Long>>();
+    taskQueue = new LinkedList<Pair<FutureTask<Stats.CheckpointStats>, 
CheckpointWindowInfo>>();
 
     outputs = new HashMap<String, Sink<Object>>();
 
@@ -441,14 +441,16 @@ public abstract class Node<OPERATOR extends Operator> 
implements Component<Opera
       checkpoint = null;
     }
     else {
-      Pair<FutureTask<Stats.CheckpointStats>, Long> pair = taskQueue.peek();
+      Pair<FutureTask<Stats.CheckpointStats>, CheckpointWindowInfo> pair = 
taskQueue.peek();
       if (pair != null && pair.getFirst().isDone()) {
         taskQueue.poll();
         try {
+          CheckpointWindowInfo checkpointWindowInfo = pair.getSecond();
           stats.checkpointStats = pair.getFirst().get();
-          stats.checkpoint = new Checkpoint(pair.getSecond(), 
applicationWindowCount, checkpointWindowCount);
+          stats.checkpoint = new Checkpoint(checkpointWindowInfo.windowId, 
checkpointWindowInfo.applicationWindowCount,
+              checkpointWindowInfo.checkpointWindowCount);
           if (operator instanceof Operator.CheckpointListener) {
-            ((Operator.CheckpointListener) 
operator).checkpointed(pair.getSecond());
+            ((Operator.CheckpointListener) 
operator).checkpointed(checkpointWindowInfo.windowId);
           }
         } catch (Exception ex) {
           throw DTThrowable.wrapIfChecked(ex);
@@ -494,13 +496,17 @@ public abstract class Node<OPERATOR extends Operator> 
implements Component<Opera
             AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent) ba;
             if (!asyncFSStorageAgent.isSyncCheckpoint()) {
               if(PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) {
+                CheckpointWindowInfo checkpointWindowInfo = new 
CheckpointWindowInfo();
+                checkpointWindowInfo.windowId = windowId;
+                checkpointWindowInfo.applicationWindowCount = 
applicationWindowCount;
+                checkpointWindowInfo.checkpointWindowCount = 
checkpointWindowCount;
                 CheckpointHandler checkpointHandler = new CheckpointHandler();
                 checkpointHandler.agent = asyncFSStorageAgent;
                 checkpointHandler.operatorId = id;
                 checkpointHandler.windowId = windowId;
                 checkpointHandler.stats = checkpointStats;
-                FutureTask<Stats.CheckpointStats> futureTask = new 
FutureTask<Stats.CheckpointStats>(checkpointHandler);
-                taskQueue.add(new Pair<FutureTask<Stats.CheckpointStats>, 
Long>(futureTask, windowId));
+                FutureTask<Stats.CheckpointStats> futureTask = new 
FutureTask<>(checkpointHandler);
+                taskQueue.add(new Pair<FutureTask<Stats.CheckpointStats>, 
CheckpointWindowInfo>(futureTask, checkpointWindowInfo));
                 executorService.submit(futureTask);
                 checkpoint = null;
                 checkpointStats = null;
@@ -657,5 +663,12 @@ public abstract class Node<OPERATOR extends Operator> 
implements Component<Opera
     }
   }
 
+  private class CheckpointWindowInfo
+  {
+    public int applicationWindowCount;
+    public int checkpointWindowCount;
+    public long windowId;
+  }
+
   private static final Logger logger = LoggerFactory.getLogger(Node.class);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/2c4254f9/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
----------------------------------------------------------------------
diff --git 
a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java 
b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
index f2c23b2..b8bff8a 100644
--- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
+++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java
@@ -18,23 +18,42 @@
  */
 package com.datatorrent.stram.engine;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
-import com.datatorrent.api.*;
+import com.datatorrent.api.Attribute.AttributeMap;
 import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.CheckpointListener;
 import com.datatorrent.api.Operator.ProcessingMode;
+import com.datatorrent.api.Sink;
+import com.datatorrent.api.Stats.OperatorStats;
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.bufferserver.packet.MessageType;
+import com.datatorrent.common.util.AsyncFSStorageAgent;
 import com.datatorrent.common.util.ScheduledThreadPoolExecutor;
+import com.datatorrent.stram.api.Checkpoint;
 import com.datatorrent.stram.tuple.EndStreamTuple;
 import com.datatorrent.stram.tuple.EndWindowTuple;
 import com.datatorrent.stram.tuple.Tuple;
@@ -44,6 +63,104 @@ import com.datatorrent.stram.tuple.Tuple;
  */
 public class GenericNodeTest
 {
+  @Rule
+  public FSTestWatcher testMeta = new FSTestWatcher();
+
+  public static class FSTestWatcher extends TestWatcher
+  {
+    private String dir;
+
+    public String getDir()
+    {
+      return dir;
+    }
+
+    @Override
+    protected void starting(org.junit.runner.Description description)
+    {
+      dir = "target/" + description.getClassName() + "/" + 
description.getMethodName();
+    }
+
+    @Override
+    protected void finished(org.junit.runner.Description description)
+    {
+      super.finished(description);
+      FileUtils.deleteQuietly(new File(dir));
+    }
+  }
+
+  public static class DelayAsyncFSStorageAgent extends AsyncFSStorageAgent
+  {
+    private static final long serialVersionUID = 201511301205L;
+
+    public DelayAsyncFSStorageAgent(String localBasePath, String path, 
Configuration conf)
+    {
+      super(localBasePath, path, conf);
+    }
+
+    private long delayMS = 2000L;
+
+    public DelayAsyncFSStorageAgent(String path, Configuration conf)
+    {
+      super(path, conf);
+    }
+
+    @Override
+    public void save(final Object object, final int operatorId, final long 
windowId) throws IOException
+    {
+      LOG.info("Saving");
+      //Do nothing
+    }
+
+    @Override
+    public void copyToHDFS(int operatorId, long windowId) throws IOException
+    {
+      try {
+        Thread.sleep(delayMS);
+      } catch (InterruptedException ex) {
+        throw new RuntimeException(ex);
+      }
+    }
+
+    /**
+     * @return the delayMS
+     */
+    public long getDelayMS()
+    {
+      return delayMS;
+    }
+
+    /**
+     * @param delayMS the delayMS to set
+     */
+    public void setDelayMS(long delayMS)
+    {
+      this.delayMS = delayMS;
+    }
+  }
+
+  public static class TestStatsOperatorContext extends OperatorContext
+  {
+    private static final long serialVersionUID = 201511301206L;
+
+    public volatile List<Checkpoint> checkpoints = Lists.newArrayList();
+
+    public TestStatsOperatorContext(int id, AttributeMap attributes, Context 
parentContext)
+    {
+      super(id, attributes, parentContext);
+    }
+
+    @Override
+    public void report(OperatorStats stats, long windowId)
+    {
+      super.report(stats, windowId);
+
+      if (stats.checkpoint != null) {
+        checkpoints.add((Checkpoint)stats.checkpoint);
+      }
+    }
+  }
+
   public static class GenericOperator implements Operator
   {
     long beginWindowId;
@@ -391,7 +508,6 @@ public class GenericNodeTest
         in.run();
         in.deactivate();
       }
-
     };
 
     t.start();
@@ -411,4 +527,95 @@ public class GenericNodeTest
     Assert.assertFalse(gco.checkpointTwice);
     Assert.assertTrue("Timed out", (endTime - startTime) < 5000);
   }
+
+  /**
+   * This tests to make sure that the race condition reported in APEX-83 is 
fixed.
+   */
+  @Test
+  public void testCheckpointApplicationWindowCountAtleastOnce() throws 
Exception
+  {
+    testCheckpointApplicationWindowCount(ProcessingMode.AT_LEAST_ONCE);
+  }
+
+  /**
+   * This tests to make sure that the race condition reported in APEX-83 is 
fixed.
+   */
+  @Test
+  public void testCheckpointApplicationWindowCountAtMostOnce() throws Exception
+  {
+    testCheckpointApplicationWindowCount(ProcessingMode.AT_MOST_ONCE);
+  }
+
+  private void testCheckpointApplicationWindowCount(ProcessingMode 
processingMode) throws Exception
+  {
+    final long timeoutMillis = 10000L;
+    final long sleepTime = 25L;
+
+    WindowGenerator windowGenerator = new WindowGenerator(new 
ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024);
+    windowGenerator.setResetWindow(0L);
+    windowGenerator.setFirstWindow(1448909287863L);
+    windowGenerator.setWindowWidth(100);
+    windowGenerator.setCheckpointCount(1, 0);
+
+    GenericOperator go = new GenericOperator();
+
+    DefaultAttributeMap dam = new DefaultAttributeMap();
+    dam.put(OperatorContext.APPLICATION_WINDOW_COUNT, 5);
+    dam.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 5);
+    dam.put(OperatorContext.PROCESSING_MODE, processingMode);
+
+    DelayAsyncFSStorageAgent storageAgent = new 
DelayAsyncFSStorageAgent(testMeta.getDir(), new Configuration());
+    storageAgent.setDelayMS(200L);
+
+    dam.put(OperatorContext.STORAGE_AGENT, storageAgent);
+
+    TestStatsOperatorContext operatorContext = new TestStatsOperatorContext(0, 
dam, null);
+    final GenericNode gn = new GenericNode(go, operatorContext);
+    gn.setId(1);
+
+    TestSink testSink = new TestSink();
+
+    gn.connectInputPort("ip1", 
windowGenerator.acquireReservoir(String.valueOf(gn.id), 1024));
+    gn.connectOutputPort("output", testSink);
+
+    windowGenerator.activate(null);
+
+    Thread t = new Thread()
+    {
+      @Override
+      public void run()
+      {
+        gn.activate();
+        gn.run();
+        gn.deactivate();
+      }
+    };
+
+    t.start();
+
+    long startTime = System.currentTimeMillis();
+    long endTime = 0;
+
+    while (operatorContext.checkpoints.size() < 8 && ((endTime = 
System.currentTimeMillis()) - startTime) < timeoutMillis) {
+      Thread.sleep(sleepTime);
+    }
+
+    gn.shutdown();
+    t.join();
+
+    windowGenerator.deactivate();
+
+    Assert.assertTrue(!operatorContext.checkpoints.isEmpty());
+
+    for (int index = 0; index < operatorContext.checkpoints.size(); index++) {
+      if (operatorContext.checkpoints.get(index) == null) {
+        continue;
+      }
+
+      Assert.assertEquals(0, 
operatorContext.checkpoints.get(index).applicationWindowCount);
+      Assert.assertEquals(0, 
operatorContext.checkpoints.get(index).checkpointWindowCount);
+    }
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(GenericNodeTest.class);
 }

Reply via email to