Repository: beam Updated Branches: refs/heads/jstorm-runner d24d2831d -> d2b285122
JStorm-runner: Performance improvement 1. remove some logs on critical path 2. register "TimestampedValue" in Kryo to reduce the serialized size of event value Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/43492000 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/43492000 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/43492000 Branch: refs/heads/jstorm-runner Commit: 43492000a49e81b6d9a2420148fb2df1735301b0 Parents: d24d283 Author: basti.lj <[email protected]> Authored: Fri Sep 8 12:19:49 2017 +0800 Committer: basti.lj <[email protected]> Committed: Fri Sep 8 12:19:49 2017 +0800 ---------------------------------------------------------------------- .../jstorm/serialization/BeamUtilsSerializer.java | 2 ++ .../runners/jstorm/translation/DoFnExecutor.java | 3 +-- .../runners/jstorm/translation/ExecutorsBolt.java | 1 - .../jstorm/translation/WindowAssignExecutor.java | 16 +++++++--------- 4 files changed, 10 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/43492000/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java index db1f037..8061a9f 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/serialization/BeamUtilsSerializer.java @@ -29,6 +29,7 @@ import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Instant; /** @@ -110,5 +111,6 @@ public class BeamUtilsSerializer { Lists.<BoundedWindow>newArrayList(w1), PaneInfo.NO_FIRING).getClass()); config.registerSerialization(WindowedValue.of(null, Instant.now(), Lists.<BoundedWindow>newArrayList(w1, w2), PaneInfo.NO_FIRING).getClass()); + config.registerSerialization(TimestampedValue.class); } } http://git-wip-us.apache.org/repos/asf/beam/blob/43492000/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java index 5425b6c..4b021a3 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/DoFnExecutor.java @@ -214,7 +214,6 @@ class DoFnExecutor<InputT, OutputT> implements Executor { } protected <T> void processMainInput(WindowedValue<T> elem) { - LOG.debug(String.format("Main input: tag=%s, elem=%s", mainInputTag, elem)); if (sideInputs.isEmpty()) { runner.processElement((WindowedValue<InputT>) elem); } else { @@ -236,7 +235,7 @@ class DoFnExecutor<InputT, OutputT> implements Executor { } protected void processSideInput(TupleTag tag, WindowedValue elem) { - LOG.debug(String.format("Side inputs: tag=%s, elem=%s.", tag, elem)); + LOG.debug("Side inputs: tag={}, elem={}.", tag, elem); PCollectionView<?> sideInputView = sideInputTagToView.get(tag); sideInputHandler.addSideInputValue(sideInputView, elem); http://git-wip-us.apache.org/repos/asf/beam/blob/43492000/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java index aca2ca4..1e9a4ff 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/ExecutorsBolt.java @@ -295,7 +295,6 @@ public class ExecutorsBolt extends AbstractComponent implements IRichBatchBolt { public <T> void processExecutorElem(TupleTag<T> inputTag, WindowedValue<T> elem) { if (elem != null) { - LOG.debug("ProcessExecutorElem: value={} from tag={}", elem.getValue(), inputTag); Executor executor = inputTagToExecutor.get(inputTag); if (executor != null) { executor.process(inputTag, elem); http://git-wip-us.apache.org/repos/asf/beam/blob/43492000/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java ---------------------------------------------------------------------- diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java index 832c95c..ffbfb1b 100644 --- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java +++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/WindowAssignExecutor.java @@ -17,8 +17,6 @@ */ package org.apache.beam.runners.jstorm.translation; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.common.collect.Iterables; import java.util.Collection; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -46,13 +44,13 @@ class WindowAssignExecutor<T, W extends BoundedWindow> implements Executor { JStormAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT> value) { fn.super(); - checkArgument( - Iterables.size(value.getWindows()) == 1, - String.format( - "%s passed to window assignment must be in a single window, but it was in %s: %s", - WindowedValue.class.getSimpleName(), - Iterables.size(value.getWindows()), - value.getWindows())); + if (value.getWindows().size() != 1) { + throw new IllegalArgumentException(String.format( + "%s passed to window assignment must be in a single window, but it was in %s: %s", + WindowedValue.class.getSimpleName(), + Iterables.size(value.getWindows()), + value.getWindows())); + } this.value = value; }
