Repository: apex-core Updated Branches: refs/heads/master e92741474 -> 899f4cb0a
APEXCORE-720 Update cloned LogicalPlan in Context before discovery of plugins Project: http://git-wip-us.apache.org/repos/asf/apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-core/commit/899f4cb0 Tree: http://git-wip-us.apache.org/repos/asf/apex-core/tree/899f4cb0 Diff: http://git-wip-us.apache.org/repos/asf/apex-core/diff/899f4cb0 Branch: refs/heads/master Commit: 899f4cb0a01d799c8a3f6905ee98429bbde2bc82 Parents: e927414 Author: Chinmay Kolhatkar <[email protected]> Authored: Fri May 12 16:22:24 2017 +0530 Committer: Chinmay Kolhatkar <[email protected]> Committed: Tue May 16 16:58:24 2017 +0530 ---------------------------------------------------------------------- .../apex/engine/plugin/AbstractApexPluginDispatcher.java | 10 ++++------ .../java/org/apache/apex/engine/plugin/DebugPlugin.java | 10 ++++++++++ .../java/org/apache/apex/engine/plugin/PluginTests.java | 5 ++++- 3 files changed, 18 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-core/blob/899f4cb0/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java index 5e468f5..a4aca46 100644 --- a/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java +++ b/engine/src/main/java/org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.java @@ -170,12 +170,10 @@ public abstract class AbstractApexPluginDispatcher extends AbstractService imple @Override public void dispatch(Event event) { - if (!plugins.isEmpty()) { - if (event.getType() == ApexPluginDispatcher.DAG_CHANGE) { - clonedDAG = SerializationUtils.clone(((DAGChangeEvent)event).dag); - } else if (event instanceof DAGExecutionEvent) { - dispatchExecutionEvent((DAGExecutionEvent)event); - } + if (event.getType() == ApexPluginDispatcher.DAG_CHANGE) { + clonedDAG = SerializationUtils.clone(((DAGChangeEvent)event).dag); + } else if (!plugins.isEmpty() && (event instanceof DAGExecutionEvent)) { + dispatchExecutionEvent((DAGExecutionEvent)event); } } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/899f4cb0/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java index 833d69f..654a4ce 100644 --- a/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java +++ b/engine/src/test/java/org/apache/apex/engine/plugin/DebugPlugin.java @@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory; import org.apache.apex.engine.api.plugin.DAGExecutionEvent; import org.apache.apex.engine.api.plugin.DAGExecutionPlugin; +import com.datatorrent.api.DAG; + import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.COMMIT_EVENT; import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.HEARTBEAT_EVENT; import static org.apache.apex.engine.api.plugin.DAGExecutionEvent.Type.STRAM_EVENT; @@ -39,10 +41,13 @@ public class DebugPlugin implements DAGExecutionPlugin<DAGExecutionPlugin.Contex private int heartbeatCount = 0; private int commitCount = 0; CountDownLatch latch = new CountDownLatch(3); + private Context context; @Override public void setup(DAGExecutionPlugin.Context context) { + this.context = context; + context.register(STRAM_EVENT, new EventHandler<DAGExecutionEvent.StramExecutionEvent>() { @Override @@ -102,4 +107,9 @@ public class DebugPlugin implements DAGExecutionPlugin<DAGExecutionPlugin.Contex { latch.await(timeout, TimeUnit.SECONDS); } + + public DAG getLogicalPlan() + { + return context.getDAG(); + } } http://git-wip-us.apache.org/repos/asf/apex-core/blob/899f4cb0/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java ---------------------------------------------------------------------- diff --git a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java index 140dc65..34589b0 100644 --- a/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java +++ b/engine/src/test/java/org/apache/apex/engine/plugin/PluginTests.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import com.datatorrent.api.Attribute; import com.datatorrent.stram.api.StramEvent; import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol; +import com.datatorrent.stram.plan.logical.LogicalPlan; import com.datatorrent.stram.support.StramTestSupport; public class PluginTests @@ -93,12 +94,14 @@ public class PluginTests })); pluginManager.dispatch(new DAGExecutionEvent.CommitExecutionEvent(1234)); pluginManager.dispatch(new DAGExecutionEvent.HeartbeatExecutionEvent(new StreamingContainerUmbilicalProtocol.ContainerHeartbeat())); + LogicalPlan plan = new LogicalPlan(); + pluginManager.dispatch(new ApexPluginDispatcher.DAGChangeEvent(plan)); debugPlugin.waitForEventDelivery(10); pluginManager.stop(); Assert.assertEquals(1, debugPlugin.getEventCount()); Assert.assertEquals(1, debugPlugin.getHeartbeatCount()); Assert.assertEquals(1, debugPlugin.getCommitCount()); + Assert.assertEquals(plan, debugPlugin.getLogicalPlan()); } - }
