[BEAM-2824] Uses PipelineResult in TestJStormRunner.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6f40506a Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6f40506a Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6f40506a Branch: refs/heads/jstorm-runner Commit: 6f40506a979bdcac3d1125bbe809b092d497a2f6 Parents: df75d80 Author: Pei He <[email protected]> Authored: Wed Aug 30 14:50:20 2017 +0800 Committer: Pei He <[email protected]> Committed: Mon Sep 4 12:57:53 2017 +0800 ---------------------------------------------------------------------- .../beam/runners/jstorm/JStormRunnerResult.java | 6 ++ .../beam/runners/jstorm/TestJStormRunner.java | 100 ++++++++++++------- .../jstorm/translation/DoFnExecutor.java | 1 - 3 files changed, 68 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6f40506a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java index 782896e..3962ca2 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/JStormRunnerResult.java @@ -78,6 +78,7 @@ public abstract class JStormRunnerResult implements PipelineResult { private final LocalCluster localCluster; private final long localModeExecuteTimeSecs; + private boolean cancelled; LocalJStormPipelineResult( String topologyName, @@ -87,6 +88,7 @@ public abstract class JStormRunnerResult implements PipelineResult { super(topologyName, config); this.localCluster = checkNotNull(localCluster, "localCluster"); this.localModeExecuteTimeSecs = localModeExecuteTimeSecs; + this.cancelled = false; } @Override @@ -94,11 +96,15 @@ public abstract class JStormRunnerResult implements PipelineResult { localCluster.killTopology(getTopologyName()); localCluster.shutdown(); JStormUtils.sleepMs(1000); + cancelled = true; return State.CANCELLED; } @Override public State waitUntilFinish(Duration duration) { + if (cancelled) { + return State.CANCELLED; + } Sleeper sleeper = Sleeper.DEFAULT; BackOff backOff = FluentBackoff.DEFAULT.withMaxCumulativeBackoff(duration).backoff(); try { http://git-wip-us.apache.org/repos/asf/beam/blob/6f40506a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java index c9990e4..9d2e2f1 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/TestJStormRunner.java @@ -21,10 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.alibaba.jstorm.common.metric.AsmMetric; import com.alibaba.jstorm.metric.AsmMetricRegistry; -import com.alibaba.jstorm.metric.AsmWindow; import com.alibaba.jstorm.metric.JStormMetrics; -import com.alibaba.jstorm.metric.MetaType; -import com.alibaba.jstorm.metric.MetricType; import com.alibaba.jstorm.task.error.TaskReportErrorAndDie; import com.alibaba.jstorm.utils.JStormUtils; import com.google.common.base.Optional; @@ -34,8 +31,13 @@ import java.util.Iterator; import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineRunner; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsFilter; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.testing.PAssert; +import org.joda.time.Duration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,31 +82,32 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> { LOG.info("Running JStorm job {} with {} expected assertions.", result.getTopologyName(), numberOfAssertions); - - int maxTimeoutMs = - numberOfAssertions > 0 ? ASSERTION_WAITING_TIME_MS : RESULT_WAITING_TIME_MS; - for (int waitTime = 0; waitTime <= maxTimeoutMs; ) { - Optional<Boolean> success = numberOfAssertions > 0 - ? checkForPAssertSuccess(numberOfAssertions) : Optional.<Boolean>absent(); + if (numberOfAssertions == 0) { + result.waitUntilFinish(Duration.millis(RESULT_WAITING_TIME_MS)); Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); - if (success.isPresent() && success.get()) { - return result; - } else if (success.isPresent() && !success.get()) { - throw new AssertionError("Failed assertion checks."); - } else if (taskExceptionRec != null) { + if (taskExceptionRec != null) { LOG.info("Exception was found.", taskExceptionRec); throw new RuntimeException(taskExceptionRec.getCause()); - } else { - JStormUtils.sleepMs(RESULT_CHECK_INTERVAL_MS); - waitTime += RESULT_CHECK_INTERVAL_MS; } - } - - if (numberOfAssertions > 0) { + return result; + } else { + for (int waitTime = 0; waitTime <= ASSERTION_WAITING_TIME_MS;) { + Optional<Boolean> success = checkForPAssertSuccess(result.metrics(), numberOfAssertions); + Exception taskExceptionRec = TaskReportErrorAndDie.getExceptionRecord(); + if (success.isPresent() && success.get()) { + return result; + } else if (success.isPresent() && !success.get()) { + throw new AssertionError("Failed assertion checks."); + } else if (taskExceptionRec != null) { + LOG.info("Exception was found.", taskExceptionRec); + throw new RuntimeException(taskExceptionRec.getCause()); + } else { + JStormUtils.sleepMs(RESULT_CHECK_INTERVAL_MS); + waitTime += RESULT_CHECK_INTERVAL_MS; + } + } LOG.info("Assertion checks timed out."); throw new AssertionError("Assertion checks timed out."); - } else { - return result; } } finally { clearPAssertCount(); @@ -113,31 +116,52 @@ public class TestJStormRunner extends PipelineRunner<JStormRunnerResult> { } } - private Optional<Boolean> checkForPAssertSuccess(int expectedNumberOfAssertions) { - int successes = 0; - for (AsmMetric metric : - JStormMetrics.search(PAssert.SUCCESS_COUNTER, MetaType.TASK, MetricType.COUNTER)) { - successes += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue(); + private Optional<Boolean> checkForPAssertSuccess( + MetricResults metricResults, + int expectedNumberOfAssertions) { + Iterable<MetricResult<Long>> successCounterResults = metricResults + .queryMetrics(MetricsFilter.builder() + .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.SUCCESS_COUNTER)) + .build()) + .counters(); + + long successes = 0; + for (MetricResult<Long> counter : successCounterResults) { + if (counter.attempted() > 0) { + successes++; + } } - int failures = 0; - for (AsmMetric metric : - JStormMetrics.search(PAssert.FAILURE_COUNTER, MetaType.TASK, MetricType.COUNTER)) { - failures += ((Long) metric.getValue(AsmWindow.M1_WINDOW)).intValue(); + + Iterable<MetricResult<Long>> failureCounterResults = metricResults + .queryMetrics(MetricsFilter.builder() + .addNameFilter(MetricNameFilter.named(PAssert.class, PAssert.FAILURE_COUNTER)) + .build()) + .counters(); + + long failures = 0; + for (MetricResult<Long> counter : failureCounterResults) { + if (counter.attempted() > 0) { + failures++; + } } if (failures > 0) { LOG.info("Found {} success, {} failures out of {} expected assertions.", - successes, failures, expectedNumberOfAssertions); + successes, failures, expectedNumberOfAssertions); return Optional.of(false); - } else if (successes >= expectedNumberOfAssertions) { + } else if (successes == expectedNumberOfAssertions) { LOG.info("Found {} success, {} failures out of {} expected assertions.", - successes, failures, expectedNumberOfAssertions); + successes, failures, expectedNumberOfAssertions); return Optional.of(true); + } else if (successes > expectedNumberOfAssertions) { + LOG.info("Found {} success, {} failures out of {} expected assertions.", + successes, failures, expectedNumberOfAssertions); + return Optional.of(false); + } else { + LOG.info("Found {} success, {} failures out of {} expected assertions.", + successes, failures, expectedNumberOfAssertions); + return Optional.absent(); } - - LOG.info("Found {} success, {} failures out of {} expected assertions.", - successes, failures, expectedNumberOfAssertions); - return Optional.absent(); } private void clearPAssertCount() { http://git-wip-us.apache.org/repos/asf/beam/blob/6f40506a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java index 1ceaf9e..5425b6c 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java @@ -21,7 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import com.alibaba.jstorm.cache.IKvStoreManager; -import com.alibaba.jstorm.metric.MetricClient; import com.google.common.collect.Iterables; import java.io.Serializable; import java.util.ArrayList;
