Repository: incubator-apex-core Updated Branches: refs/heads/devel-3 c30dd952c -> 38a6f4e3b
- APEX-83 Fixed race condition for storing the applicationWindowCount and checkpointWindowCount Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/2c4254f9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/2c4254f9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/2c4254f9 Branch: refs/heads/devel-3 Commit: 2c4254f95eb40aa18b3daa09c382e2158909b2e3 Parents: c30dd95 Author: Timothy Farkas <[email protected]> Authored: Sun Nov 15 20:52:27 2015 -0800 Committer: Timothy Farkas <[email protected]> Committed: Tue Dec 1 00:39:52 2015 -0800 ---------------------------------------------------------------------- .../java/com/datatorrent/stram/engine/Node.java | 27 ++- .../stram/engine/GenericNodeTest.java | 211 ++++++++++++++++++- 2 files changed, 229 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/2c4254f9/engine/src/main/java/com/datatorrent/stram/engine/Node.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java index c66df12..5c6b86f 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java @@ -124,7 +124,7 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera private final List<Field> metricFields; private final Map<String, Method> metricMethods; private ExecutorService executorService; - private Queue<Pair<FutureTask<Stats.CheckpointStats>, Long>> taskQueue; + private Queue<Pair<FutureTask<Stats.CheckpointStats>, CheckpointWindowInfo>> taskQueue; protected Stats.CheckpointStats checkpointStats; public Node(OPERATOR operator, OperatorContext context) @@ -132,7 +132,7 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera this.operator = operator; this.context = context; executorService = Executors.newSingleThreadExecutor(); - taskQueue = new LinkedList<Pair<FutureTask<Stats.CheckpointStats>, Long>>(); + taskQueue = new LinkedList<Pair<FutureTask<Stats.CheckpointStats>, CheckpointWindowInfo>>(); outputs = new HashMap<String, Sink<Object>>(); @@ -441,14 +441,16 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera checkpoint = null; } else { - Pair<FutureTask<Stats.CheckpointStats>, Long> pair = taskQueue.peek(); + Pair<FutureTask<Stats.CheckpointStats>, CheckpointWindowInfo> pair = taskQueue.peek(); if (pair != null && pair.getFirst().isDone()) { taskQueue.poll(); try { + CheckpointWindowInfo checkpointWindowInfo = pair.getSecond(); stats.checkpointStats = pair.getFirst().get(); - stats.checkpoint = new Checkpoint(pair.getSecond(), applicationWindowCount, checkpointWindowCount); + stats.checkpoint = new Checkpoint(checkpointWindowInfo.windowId, checkpointWindowInfo.applicationWindowCount, + checkpointWindowInfo.checkpointWindowCount); if (operator instanceof Operator.CheckpointListener) { - ((Operator.CheckpointListener) operator).checkpointed(pair.getSecond()); + ((Operator.CheckpointListener) operator).checkpointed(checkpointWindowInfo.windowId); } } catch (Exception ex) { throw DTThrowable.wrapIfChecked(ex); @@ -494,13 +496,17 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera AsyncFSStorageAgent asyncFSStorageAgent = (AsyncFSStorageAgent) ba; if (!asyncFSStorageAgent.isSyncCheckpoint()) { if(PROCESSING_MODE != ProcessingMode.EXACTLY_ONCE) { + CheckpointWindowInfo checkpointWindowInfo = new CheckpointWindowInfo(); + checkpointWindowInfo.windowId = windowId; + checkpointWindowInfo.applicationWindowCount = applicationWindowCount; + checkpointWindowInfo.checkpointWindowCount = checkpointWindowCount; CheckpointHandler checkpointHandler = new CheckpointHandler(); checkpointHandler.agent = asyncFSStorageAgent; checkpointHandler.operatorId = id; checkpointHandler.windowId = windowId; checkpointHandler.stats = checkpointStats; - FutureTask<Stats.CheckpointStats> futureTask = new FutureTask<Stats.CheckpointStats>(checkpointHandler); - taskQueue.add(new Pair<FutureTask<Stats.CheckpointStats>, Long>(futureTask, windowId)); + FutureTask<Stats.CheckpointStats> futureTask = new FutureTask<>(checkpointHandler); + taskQueue.add(new Pair<FutureTask<Stats.CheckpointStats>, CheckpointWindowInfo>(futureTask, checkpointWindowInfo)); executorService.submit(futureTask); checkpoint = null; checkpointStats = null; @@ -657,5 +663,12 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera } } + private class CheckpointWindowInfo + { + public int applicationWindowCount; + public int checkpointWindowCount; + public long windowId; + } + private static final Logger logger = LoggerFactory.getLogger(Node.class); } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/2c4254f9/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java index f2c23b2..b8bff8a 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/GenericNodeTest.java @@ -18,23 +18,42 @@ */ package com.datatorrent.stram.engine; +import java.io.File; +import java.io.IOException; import java.util.ArrayList; +import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; + +import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import com.datatorrent.api.*; +import com.datatorrent.api.Attribute.AttributeMap; import com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap; +import com.datatorrent.api.Context; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.CheckpointListener; import com.datatorrent.api.Operator.ProcessingMode; +import com.datatorrent.api.Sink; +import com.datatorrent.api.Stats.OperatorStats; import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.bufferserver.packet.MessageType; +import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.ScheduledThreadPoolExecutor; +import com.datatorrent.stram.api.Checkpoint; import com.datatorrent.stram.tuple.EndStreamTuple; import com.datatorrent.stram.tuple.EndWindowTuple; import com.datatorrent.stram.tuple.Tuple; @@ -44,6 +63,104 @@ import com.datatorrent.stram.tuple.Tuple; */ public class GenericNodeTest { + @Rule + public FSTestWatcher testMeta = new FSTestWatcher(); + + public static class FSTestWatcher extends TestWatcher + { + private String dir; + + public String getDir() + { + return dir; + } + + @Override + protected void starting(org.junit.runner.Description description) + { + dir = "target/" + description.getClassName() + "/" + description.getMethodName(); + } + + @Override + protected void finished(org.junit.runner.Description description) + { + super.finished(description); + FileUtils.deleteQuietly(new File(dir)); + } + } + + public static class DelayAsyncFSStorageAgent extends AsyncFSStorageAgent + { + private static final long serialVersionUID = 201511301205L; + + public DelayAsyncFSStorageAgent(String localBasePath, String path, Configuration conf) + { + super(localBasePath, path, conf); + } + + private long delayMS = 2000L; + + public DelayAsyncFSStorageAgent(String path, Configuration conf) + { + super(path, conf); + } + + @Override + public void save(final Object object, final int operatorId, final long windowId) throws IOException + { + LOG.info("Saving"); + //Do nothing + } + + @Override + public void copyToHDFS(int operatorId, long windowId) throws IOException + { + try { + Thread.sleep(delayMS); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + + /** + * @return the delayMS + */ + public long getDelayMS() + { + return delayMS; + } + + /** + * @param delayMS the delayMS to set + */ + public void setDelayMS(long delayMS) + { + this.delayMS = delayMS; + } + } + + public static class TestStatsOperatorContext extends OperatorContext + { + private static final long serialVersionUID = 201511301206L; + + public volatile List<Checkpoint> checkpoints = Lists.newArrayList(); + + public TestStatsOperatorContext(int id, AttributeMap attributes, Context parentContext) + { + super(id, attributes, parentContext); + } + + @Override + public void report(OperatorStats stats, long windowId) + { + super.report(stats, windowId); + + if (stats.checkpoint != null) { + checkpoints.add((Checkpoint)stats.checkpoint); + } + } + } + public static class GenericOperator implements Operator { long beginWindowId; @@ -391,7 +508,6 @@ public class GenericNodeTest in.run(); in.deactivate(); } - }; t.start(); @@ -411,4 +527,95 @@ public class GenericNodeTest Assert.assertFalse(gco.checkpointTwice); Assert.assertTrue("Timed out", (endTime - startTime) < 5000); } + + /** + * This tests to make sure that the race condition reported in APEX-83 is fixed. + */ + @Test + public void testCheckpointApplicationWindowCountAtleastOnce() throws Exception + { + testCheckpointApplicationWindowCount(ProcessingMode.AT_LEAST_ONCE); + } + + /** + * This tests to make sure that the race condition reported in APEX-83 is fixed. + */ + @Test + public void testCheckpointApplicationWindowCountAtMostOnce() throws Exception + { + testCheckpointApplicationWindowCount(ProcessingMode.AT_MOST_ONCE); + } + + private void testCheckpointApplicationWindowCount(ProcessingMode processingMode) throws Exception + { + final long timeoutMillis = 10000L; + final long sleepTime = 25L; + + WindowGenerator windowGenerator = new WindowGenerator(new ScheduledThreadPoolExecutor(1, "WindowGenerator"), 1024); + windowGenerator.setResetWindow(0L); + windowGenerator.setFirstWindow(1448909287863L); + windowGenerator.setWindowWidth(100); + windowGenerator.setCheckpointCount(1, 0); + + GenericOperator go = new GenericOperator(); + + DefaultAttributeMap dam = new DefaultAttributeMap(); + dam.put(OperatorContext.APPLICATION_WINDOW_COUNT, 5); + dam.put(OperatorContext.CHECKPOINT_WINDOW_COUNT, 5); + dam.put(OperatorContext.PROCESSING_MODE, processingMode); + + DelayAsyncFSStorageAgent storageAgent = new DelayAsyncFSStorageAgent(testMeta.getDir(), new Configuration()); + storageAgent.setDelayMS(200L); + + dam.put(OperatorContext.STORAGE_AGENT, storageAgent); + + TestStatsOperatorContext operatorContext = new TestStatsOperatorContext(0, dam, null); + final GenericNode gn = new GenericNode(go, operatorContext); + gn.setId(1); + + TestSink testSink = new TestSink(); + + gn.connectInputPort("ip1", windowGenerator.acquireReservoir(String.valueOf(gn.id), 1024)); + gn.connectOutputPort("output", testSink); + + windowGenerator.activate(null); + + Thread t = new Thread() + { + @Override + public void run() + { + gn.activate(); + gn.run(); + gn.deactivate(); + } + }; + + t.start(); + + long startTime = System.currentTimeMillis(); + long endTime = 0; + + while (operatorContext.checkpoints.size() < 8 && ((endTime = System.currentTimeMillis()) - startTime) < timeoutMillis) { + Thread.sleep(sleepTime); + } + + gn.shutdown(); + t.join(); + + windowGenerator.deactivate(); + + Assert.assertTrue(!operatorContext.checkpoints.isEmpty()); + + for (int index = 0; index < operatorContext.checkpoints.size(); index++) { + if (operatorContext.checkpoints.get(index) == null) { + continue; + } + + Assert.assertEquals(0, operatorContext.checkpoints.get(index).applicationWindowCount); + Assert.assertEquals(0, operatorContext.checkpoints.get(index).checkpointWindowCount); + } + } + + private static final Logger LOG = LoggerFactory.getLogger(GenericNodeTest.class); }
