Fixed Combine display data
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9943fd7d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9943fd7d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9943fd7d Branch: refs/heads/gearpump-runner Commit: 9943fd7d47819d522cef248d23c8db8f42981ad3 Parents: f44fa2c Author: Ian Zhou <ianz...@google.com> Authored: Thu Aug 18 13:50:52 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Mon Sep 12 17:40:12 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/transforms/Combine.java | 53 ++++++++++++++++++-- .../apache/beam/sdk/transforms/CombineTest.java | 19 +++++++ 2 files changed, 68 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9943fd7d/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java index 26f0f66..d432e15 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.Context; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.RequiresContextInternal; import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.DisplayData.Builder; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; @@ -1815,7 +1816,14 @@ public class Combine { */ public PerKeyWithHotKeyFanout<K, InputT, OutputT> withHotKeyFanout(final int hotKeyFanout) { return new PerKeyWithHotKeyFanout<>(name, fn, fnDisplayData, - new SerializableFunction<K, Integer>() { + new SimpleFunction<K, Integer>() { + @Override + public void populateDisplayData(Builder builder) { + super.populateDisplayData(builder); + builder.addIfNotDefault(DisplayData.item("fanout", hotKeyFanout) + .withLabel("Key Fanout Size"), 0); + } + @Override public Integer apply(K unused) { return hotKeyFanout; @@ -1904,7 +1912,7 @@ public class Combine { new InputOrAccum.InputOrAccumCoder<InputT, AccumT>( inputCoder.getValueCoder(), accumCoder); - // A CombineFn's mergeAccumulator can be applied in a tree-like fashon. + // A CombineFn's mergeAccumulator can be applied in a tree-like fashion. // Here we shard the key using an integer nonce, combine on that partial // set of values, then drop the nonce and do a final combine of the // aggregates. We do this by splitting the original CombineFn into two, @@ -1944,6 +1952,16 @@ public class Combine { throws CannotProvideCoderException { return accumCoder; } + + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) + .withLabel("Fanout Function")); + if (hotKeyFanout instanceof HasDisplayData) { + ((HasDisplayData) hotKeyFanout).populateDisplayData(builder); + } + } }; postCombine = new KeyedCombineFn<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT>() { @@ -1988,6 +2006,15 @@ public class Combine { throws CannotProvideCoderException { return accumCoder; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) + .withLabel("Fanout Function")); + if (hotKeyFanout instanceof HasDisplayData) { + ((HasDisplayData) hotKeyFanout).populateDisplayData(builder); + } + } }; } else { final KeyedCombineFnWithContext<K, InputT, AccumT, OutputT> keyedFnWithContext = @@ -2028,6 +2055,15 @@ public class Combine { throws CannotProvideCoderException { return accumCoder; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) + .withLabel("Fanout Function")); + if (hotKeyFanout instanceof HasDisplayData) { + ((HasDisplayData) hotKeyFanout).populateDisplayData(builder); + } + } }; postCombine = new KeyedCombineFnWithContext<K, InputOrAccum<InputT, AccumT>, AccumT, OutputT>() { @@ -2073,6 +2109,15 @@ public class Combine { throws CannotProvideCoderException { return accumCoder; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + super.populateDisplayData(builder); + builder.add(DisplayData.item("fanoutFn", hotKeyFanout.getClass()) + .withLabel("Fanout Function")); + if (hotKeyFanout instanceof HasDisplayData) { + ((HasDisplayData) hotKeyFanout).populateDisplayData(builder); + } + } }; } @@ -2117,7 +2162,7 @@ public class Combine { .setCoder(KvCoder.of(KvCoder.of(inputCoder.getKeyCoder(), VarIntCoder.of()), inputCoder.getValueCoder())) .setWindowingStrategyInternal(preCombineStrategy) - .apply("PreCombineHot", Combine.perKey(hotPreCombine)) + .apply("PreCombineHot", Combine.perKey(hotPreCombine, fnDisplayData)) .apply("StripNonce", MapElements.via( new SimpleFunction<KV<KV<K, Integer>, AccumT>, KV<K, InputOrAccum<InputT, AccumT>>>() { @@ -2147,7 +2192,7 @@ public class Combine { // Combine the union of the pre-processed hot and cold key results. return PCollectionList.of(precombinedHot).and(preprocessedCold) .apply(Flatten.<KV<K, InputOrAccum<InputT, AccumT>>>pCollections()) - .apply("PostCombine", Combine.perKey(postCombine)); + .apply("PostCombine", Combine.perKey(postCombine, fnDisplayData)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9943fd7d/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java index 77a1d6b..be061af 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/CombineTest.java @@ -731,6 +731,25 @@ public class CombineTest implements Serializable { displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass()))); } + @Test + @Category(RunnableOnService.class) + public void testCombinePerKeyWithHotKeyFanoutPrimitiveDisplayData() { + int hotKeyFanout = 2; + DisplayDataEvaluator evaluator = DisplayDataEvaluator.create(); + + CombineTest.UniqueInts combineFn = new CombineTest.UniqueInts(); + PTransform<PCollection<KV<Integer, Integer>>, PCollection<KV<Integer, Set<Integer>>>> combine = + Combine.<Integer, Integer, Set<Integer>>perKey(combineFn).withHotKeyFanout(hotKeyFanout); + + Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(combine, + KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); + + assertThat("Combine.perKey.withHotKeyFanout should include the combineFn in its primitive " + + "transform", displayData, hasItem(hasDisplayItem("combineFn", combineFn.getClass()))); + assertThat("Combine.perKey.withHotKeyFanout(int) should include the fanout in its primitive " + + "transform", displayData, hasItem(hasDisplayItem("fanout", hotKeyFanout))); + } + //////////////////////////////////////////////////////////////////////////// // Test classes, for different kinds of combining fns.