This is an automated email from the ASF dual-hosted git repository. taegeonum pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-nemo.git
The following commit(s) were added to refs/heads/master by this push: new 3d46caf [NEMO-460] Setting coders in CombinePerKey transformation (#303) 3d46caf is described below commit 3d46caf02c84c739e448d8cc6220099fa8df58d7 Author: jaehwan0214 <60352009+jaehwan0...@users.noreply.github.com> AuthorDate: Sun Oct 18 14:05:34 2020 +0900 [NEMO-460] Setting coders in CombinePerKey transformation (#303) JIRA: [NEMO-460: Setting coders in CombinePerKey transformation](https://issues.apache.org/jira/projects/NEMO/issues/NEMO-460) **Major changes:** - Added the additional parameter "inputCoder" for GBKTransform constructor. - Fixed the input coder and the output coder for the partial combine transform and the final combine transform. **Minor changes to note:** - Fixed the main output TupleTags for the partial combine transform and the final combine transform. **Tests for the changes:** - Current tests suffice. **Other comments:** - This needs to be merged after merging #302 Closes #303 --- .../nemo/compiler/frontend/beam/PipelineTranslator.java | 14 ++++++++------ .../compiler/frontend/beam/transform/GBKTransform.java | 11 ++++++----- .../frontend/beam/transform/GBKTransformTest.java | 16 +++++++++------- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java index 21ded1c..cd9d7ad 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/PipelineTranslator.java @@ -406,10 +406,11 @@ final class PipelineTranslator { KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder), null, mainInput.getWindowingStrategy())); + final TupleTag<?> partialMainOutputTag = new TupleTag<>(); final GBKTransform partialCombineStreamTransform = - new GBKTransform( - getOutputCoders(pTransform), - new TupleTag<>(), + new GBKTransform(inputCoder, + Collections.singletonMap(partialMainOutputTag, KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder)), + partialMainOutputTag, mainInput.getWindowingStrategy(), ctx.getPipelineOptions(), partialSystemReduceFn, @@ -418,9 +419,9 @@ final class PipelineTranslator { true); final GBKTransform finalCombineStreamTransform = - new GBKTransform( + new GBKTransform(KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder), getOutputCoders(pTransform), - new TupleTag<>(), + Iterables.getOnlyElement(beamNode.getOutputs().keySet()), mainInput.getWindowingStrategy(), ctx.getPipelineOptions(), finalSystemReduceFn, @@ -556,7 +557,7 @@ final class PipelineTranslator { final AppliedPTransform<?, ?, ?> pTransform = beamNode.toAppliedPTransform(ctx.getPipeline()); final PCollection<?> mainInput = (PCollection<?>) Iterables.getOnlyElement(TransformInputs.nonAdditionalInputs(pTransform)); - final TupleTag mainOutputTag = new TupleTag<>(); + final TupleTag mainOutputTag = Iterables.getOnlyElement(beamNode.getOutputs().keySet()); if (isGlobalWindow(beamNode, ctx.getPipeline())) { // GroupByKey Transform when using a global windowing strategy. @@ -564,6 +565,7 @@ final class PipelineTranslator { } else { // GroupByKey Transform when using a non-global windowing strategy. return new GBKTransform<>( + (KvCoder) mainInput.getCoder(), getOutputCoders(pTransform), mainOutputTag, mainInput.getWindowingStrategy(), diff --git a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java index 9dd2e5a..1bf6cb8 100644 --- a/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java +++ b/compiler/frontend/beam/src/main/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransform.java @@ -58,7 +58,8 @@ public final class GBKTransform<K, InputT, OutputT> private transient OutputCollector originOc; private final boolean isPartialCombining; - public GBKTransform(final Map<TupleTag<?>, Coder<?>> outputCoders, + public GBKTransform(final Coder<KV<K, InputT>> inputCoder, + final Map<TupleTag<?>, Coder<?>> outputCoders, final TupleTag<KV<K, OutputT>> mainOutputTag, final WindowingStrategy<?, ?> windowingStrategy, final PipelineOptions options, @@ -67,7 +68,7 @@ public final class GBKTransform<K, InputT, OutputT> final DisplayData displayData, final boolean isPartialCombining) { super(null, - null, + inputCoder, outputCoders, mainOutputTag, Collections.emptyList(), /* no additional outputs */ @@ -278,7 +279,7 @@ public final class GBKTransform<K, InputT, OutputT> /** Emit output. If {@param output} is emitted on-time, save its timestamp in the output watermark map. */ @Override - public void emit(final WindowedValue<KV<K, OutputT>> output) { + public final void emit(final WindowedValue<KV<K, OutputT>> output) { // The watermark advances only in ON_TIME if (output.getPane().getTiming().equals(PaneInfo.Timing.ON_TIME)) { KV<K, OutputT> value = output.getValue(); @@ -296,13 +297,13 @@ public final class GBKTransform<K, InputT, OutputT> /** Emit watermark. */ @Override - public void emitWatermark(final Watermark watermark) { + public final void emitWatermark(final Watermark watermark) { oc.emitWatermark(watermark); } /** Emit output value to {@param dstVertexId}. */ @Override - public <T> void emit(final String dstVertexId, final T output) { + public final <T> void emit(final String dstVertexId, final T output) { oc.emit(dstVertexId, output); } } diff --git a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java index 45933b0..3c08c50 100644 --- a/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java +++ b/compiler/test/src/test/java/org/apache/nemo/compiler/frontend/beam/transform/GBKTransformTest.java @@ -18,6 +18,7 @@ */ package org.apache.nemo.compiler.frontend.beam.transform; +import com.google.common.collect.Iterables; import junit.framework.TestCase; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.sdk.coders.*; @@ -41,15 +42,12 @@ import java.util.*; import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.*; import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES; -import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.DISCARDING_FIRED_PANES; -import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; public class GBKTransformTest extends TestCase { private static final Logger LOG = LoggerFactory.getLogger(GBKTransformTest.class.getName()); private final static Coder STRING_CODER = StringUtf8Coder.of(); private final static Coder INTEGER_CODER = BigEndianIntegerCoder.of(); - private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS = null; private void checkOutput(final KV<String, Integer> expected, final KV<String, Integer> result) { // check key @@ -155,7 +153,8 @@ public class GBKTransformTest extends TestCase { final GBKTransform<String, Integer, Integer> combine_transform = new GBKTransform( - NULL_OUTPUT_CODERS, + KvCoder.of(STRING_CODER, INTEGER_CODER), + Collections.singletonMap(outputTag, KvCoder.of(STRING_CODER, INTEGER_CODER)), outputTag, WindowingStrategy.of(slidingWindows).withMode(ACCUMULATING_FIRED_PANES), PipelineOptionsFactory.as(NemoPipelineOptions.class), @@ -283,7 +282,8 @@ public class GBKTransformTest extends TestCase { final GBKTransform<String, Integer, Integer> combine_transform = new GBKTransform( - NULL_OUTPUT_CODERS, + KvCoder.of(STRING_CODER, INTEGER_CODER), + Collections.singletonMap(outputTag, KvCoder.of(STRING_CODER, INTEGER_CODER)), outputTag, WindowingStrategy.of(slidingWindows).withMode(ACCUMULATING_FIRED_PANES).withAllowedLateness(lateness), PipelineOptionsFactory.as(NemoPipelineOptions.class), @@ -377,7 +377,8 @@ public class GBKTransformTest extends TestCase { final GBKTransform<String, String, Iterable<String>> doFnTransform = new GBKTransform( - NULL_OUTPUT_CODERS, + KvCoder.of(STRING_CODER, STRING_CODER), + Collections.singletonMap(outputTag, KvCoder.of(STRING_CODER, IterableCoder.of(STRING_CODER))), outputTag, WindowingStrategy.of(slidingWindows), PipelineOptionsFactory.as(NemoPipelineOptions.class), @@ -562,7 +563,8 @@ public class GBKTransformTest extends TestCase { final GBKTransform<String, String, Iterable<String>> doFnTransform = new GBKTransform( - NULL_OUTPUT_CODERS, + KvCoder.of(STRING_CODER, STRING_CODER), + Collections.singletonMap(outputTag, KvCoder.of(STRING_CODER, IterableCoder.of(STRING_CODER))), outputTag, WindowingStrategy.of(window).withTrigger(trigger) .withMode(ACCUMULATING_FIRED_PANES)