Repository: apex-core Updated Branches: refs/heads/master 7ea7f6073 -> a469dfb22
APEXCORE-596 Setting the thread for all oio nodes in the oio group, refactoring tests Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/a469dfb2 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/a469dfb2 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/a469dfb2 Branch: refs/heads/master Commit: a469dfb229c0d42c07048d95d070703331e2c429 Parents: 7ea7f60 Author: francisf <[email protected]> Authored: Thu Jan 5 13:02:05 2017 +0530 Committer: francisf <[email protected]> Committed: Fri Jan 20 22:51:21 2017 +0530 ---------------------------------------------------------------------- .../stram/engine/StreamingContainer.java | 7 +++ .../stram/engine/StreamingContainerTest.java | 66 +++++++++++++++----- 2 files changed, 58 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/a469dfb2/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java index 78f3421..86c0402 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java @@ -1460,6 +1460,13 @@ public class StreamingContainer extends YarnContainerMain } }; node.context.setThread(thread); + List<Integer> oioNodeIdList = oioGroups.get(ndi.id); + if (oioNodeIdList != null) { + for (Integer oioNodeId : oioNodeIdList) { + Node<?> oioNode = nodes.get(oioNodeId); + oioNode.context.setThread(thread); + } + } thread.start(); } http://git-wip-us.apache.org/repos/asf/apex-core/blob/a469dfb2/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java index 18aee4c..451972e 100644 --- a/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java +++ b/engine/src/test/java/com/datatorrent/stram/engine/StreamingContainerTest.java @@ -20,8 +20,9 @@ package com.datatorrent.stram.engine; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; import org.junit.Assert; import org.junit.Test; @@ -30,8 +31,13 @@ import org.slf4j.LoggerFactory; import com.datatorrent.api.Context; import com.datatorrent.api.Context.DAGContext; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DAG.Locality; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.InputOperator; import com.datatorrent.api.Operator.CheckpointListener; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.common.util.AsyncFSStorageAgent; import com.datatorrent.common.util.BaseOperator; import com.datatorrent.stram.StramLocalCluster; @@ -42,6 +48,9 @@ import com.datatorrent.stram.plan.logical.LogicalPlan; */ public class StreamingContainerTest { + private static final Logger logger = LoggerFactory.getLogger(StreamingContainerTest.class); + private static Set<String> committedWindowIds = Collections.synchronizedSet(new HashSet<String>()); + private static Set<String> checkpointedWindowIds = Collections.synchronizedSet(new HashSet<String>()); @Test public void testCommitted() throws IOException, ClassNotFoundException @@ -50,40 +59,68 @@ public class StreamingContainerTest String workingDir = new File("target/testCommitted").getAbsolutePath(); lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null)); lp.setAttribute(DAGContext.CHECKPOINT_WINDOW_COUNT, 1); - CommitAwareOperator operator = lp.addOperator("CommitAwareOperator", new CommitAwareOperator()); + String opName = "CommitAwareOperatorTestCommit"; + lp.addOperator(opName, new CommitAwareOperator()); - List<Long> myCommittedWindowIds = CommitAwareOperator.getCommittedWindowIdsContainer(); + StramLocalCluster lc = new StramLocalCluster(lp); + lc.run(5000); + + /* this is not foolproof but some insurance is better than nothing */ + Assert.assertTrue("No Committed Windows", committedWindowIds.contains(opName)); + } + + @Test + public void testOiOCommitted() throws IOException, ClassNotFoundException + { + LogicalPlan lp = new LogicalPlan(); + String workingDir = new File("target/testCommitted").getAbsolutePath(); + lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null)); + lp.setAttribute(DAGContext.CHECKPOINT_WINDOW_COUNT, 1); + String op1Name = "CommitAwareOperatorTestOioCommit1"; + String op2Name = "CommitAwareOperatorTestOioCommit2"; + CommitAwareOperator operator1 = lp.addOperator(op1Name, new CommitAwareOperator()); + CommitAwareOperator operator2 = lp.addOperator(op2Name, new CommitAwareOperator()); + lp.addStream("local", operator1.output, operator2.input).setLocality(Locality.THREAD_LOCAL); StramLocalCluster lc = new StramLocalCluster(lp); lc.run(5000); /* this is not foolproof but some insurance is better than nothing */ - Assert.assertSame("Concurrent Use detected", myCommittedWindowIds, CommitAwareOperator.committedWindowIds); - Assert.assertFalse("No Committed Windows", myCommittedWindowIds.isEmpty()); + Assert.assertTrue("No Committed Windows", committedWindowIds.contains(op1Name)); + Assert.assertTrue("No Committed Windows", committedWindowIds.contains(op2Name)); } private static class CommitAwareOperator extends BaseOperator implements CheckpointListener, InputOperator { - public static ArrayList<Long> committedWindowIds; - public static ArrayList<Long> checkpointedWindowIds = new ArrayList<Long>(); + private transient String name; + public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>(); - public static final synchronized List<Long> getCommittedWindowIdsContainer() + @InputPortFieldAnnotation(optional = true) + public final transient DefaultInputPort<String> input = new DefaultInputPort<String>() + { + @Override + public void process(String tuple) + { + } + }; + @Override + public void setup(OperatorContext context) { - return committedWindowIds = new ArrayList<Long>(); + this.name = context.getName(); } @Override public void checkpointed(long windowId) { - checkpointedWindowIds.add(windowId); - logger.debug("checkpointed {}", windowId); + checkpointedWindowIds.add(name); + logger.debug("checkpointed {} {}", name, windowId); } @Override public void committed(long windowId) { - committedWindowIds.add(windowId); - logger.debug("committed {}", windowId); + committedWindowIds.add(name); + logger.debug("committed {} {}", name, windowId); } @Override @@ -91,7 +128,6 @@ public class StreamingContainerTest { } - private static final Logger logger = LoggerFactory.getLogger(CommitAwareOperator.class); } }
