Repository: incubator-apex-core Updated Branches: refs/heads/master 7bbb4cfd2 -> 139a9cac6
APEXCORE-130 - Throwing A Runtime Exception In Setup Causes The Operator To Block. Moved thread assignment from node active() to StreamingContainer. Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/139a9cac Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/139a9cac Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/139a9cac Branch: refs/heads/master Commit: 139a9cac6397948bb63a53ea80188f2ffd6e5da2 Parents: 7bbb4cf Author: Vlad Rozov <[email protected]> Authored: Mon Feb 29 15:16:35 2016 -0800 Committer: Vlad Rozov <[email protected]> Committed: Mon Feb 29 15:27:16 2016 -0800 ---------------------------------------------------------------------- engine/pom.xml | 2 +- .../main/java/com/datatorrent/stram/engine/Node.java | 1 - .../datatorrent/stram/engine/StreamingContainer.java | 13 ++++++++++--- 3 files changed, 11 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/139a9cac/engine/pom.xml ---------------------------------------------------------------------- diff --git a/engine/pom.xml b/engine/pom.xml index 277a770..fd7540d 100644 --- a/engine/pom.xml +++ b/engine/pom.xml @@ -149,7 +149,7 @@ <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-checkstyle-plugin</artifactId> <configuration> - <maxAllowedViolations>3184</maxAllowedViolations> + <maxAllowedViolations>3179</maxAllowedViolations> <logViolationsToConsole>${checkstyle.console}</logViolationsToConsole> </configuration> </plugin> http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/139a9cac/engine/src/main/java/com/datatorrent/stram/engine/Node.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/Node.java b/engine/src/main/java/com/datatorrent/stram/engine/Node.java index 9eae7e9..ae1435c 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/Node.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/Node.java @@ -631,7 +631,6 @@ public abstract class Node<OPERATOR extends Operator> implements Component<Opera CHECKPOINT_WINDOW_COUNT = 1; } - context.setThread(Thread.currentThread()); activateSinks(); if (operator instanceof Operator.ActivationListener) { ((Operator.ActivationListener<OperatorContext>) operator).activate(context); http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/139a9cac/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java ---------------------------------------------------------------------- diff --git a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java index 79d9037..f289ce3 100644 --- a/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java +++ b/engine/src/main/java/com/datatorrent/stram/engine/StreamingContainer.java @@ -1351,7 +1351,13 @@ public class StreamingContainer extends YarnContainerMain } final Node<?> node = nodes.get(ndi.id); - new Thread(Integer.toString(ndi.id).concat("/").concat(ndi.name).concat(":").concat(node.getOperator().getClass().getSimpleName())) + final String name = new StringBuilder(Integer.toString(ndi.id)) + .append('/') + .append(ndi.name) + .append(':') + .append(node.getOperator().getClass().getSimpleName()) + .toString(); + final Thread thread = new Thread(name) { @Override public void run() @@ -1437,8 +1443,9 @@ public class StreamingContainer extends YarnContainerMain } } } - - }.start(); + }; + node.context.setThread(thread); + thread.start(); } /**
