[ https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=143359&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143359 ]
ASF GitHub Bot logged work on BEAM-4461: ---------------------------------------- Author: ASF GitHub Bot Created on: 12/Sep/18 04:17 Start Date: 12/Sep/18 04:17 Worklog Time Spent: 10m Work Description: reuvenlax closed pull request #6318: [BEAM-4461] Some fixes to Combiners needed for Schema support. URL: https://github.com/apache/beam/pull/6318 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java index c4f7813fbdb..b7e68d2a896 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.base.Objects; +import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; @@ -32,6 +33,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; @@ -112,6 +114,16 @@ public static ComposeCombineFnBuilder compose() { return new ComposedCombineFn<DataT>().with(extractInputFn, combineFn, outputTag); } + /** Like {@link #with(SimpleFunction, CombineFn, TupleTag)} but with an explicit input coder. */ + public <DataT, InputT, OutputT> ComposedCombineFn<DataT> with( + SimpleFunction<DataT, InputT> extractInputFn, + Coder combineInputCoder, + CombineFn<InputT, ?, OutputT> combineFn, + TupleTag<OutputT> outputTag) { + return new ComposedCombineFn<DataT>() + .with(extractInputFn, combineInputCoder, combineFn, outputTag); + } + /** * Returns a {@link ComposedCombineFnWithContext} that can take additional {@link * GlobalCombineFn GlobalCombineFns} and apply them as a single combine function. @@ -127,6 +139,16 @@ public static ComposeCombineFnBuilder compose() { return new ComposedCombineFnWithContext<DataT>() .with(extractInputFn, combineFnWithContext, outputTag); } + + /** Like {@link #with(SimpleFunction, CombineFnWithContext, TupleTag)} but with input coder. */ + public <DataT, InputT, OutputT> ComposedCombineFnWithContext<DataT> with( + SimpleFunction<DataT, InputT> extractInputFn, + Coder combineInputCoder, + CombineFnWithContext<InputT, ?, OutputT> combineFnWithContext, + TupleTag<OutputT> outputTag) { + return new ComposedCombineFnWithContext<DataT>() + .with(extractInputFn, combineInputCoder, combineFnWithContext, outputTag); + } } ///////////////////////////////////////////////////////////////////////////// @@ -212,12 +234,14 @@ public int hashCode() { public static class ComposedCombineFn<DataT> extends CombineFn<DataT, Object[], CoCombineResult> { private final List<CombineFn<Object, Object, Object>> combineFns; + private final List<Optional<Coder>> combineInputCoders; private final List<SerializableFunction<DataT, Object>> extractInputFns; private final List<TupleTag<?>> outputTags; private final int combineFnCount; private ComposedCombineFn() { this.extractInputFns = ImmutableList.of(); + this.combineInputCoders = ImmutableList.of(); this.combineFns = ImmutableList.of(); this.outputTags = ImmutableList.of(); this.combineFnCount = 0; @@ -225,11 +249,13 @@ private ComposedCombineFn() { private ComposedCombineFn( ImmutableList<SerializableFunction<DataT, ?>> extractInputFns, + List<Optional<Coder>> combineInputCoders, ImmutableList<CombineFn<?, ?, ?>> combineFns, ImmutableList<TupleTag<?>> outputTags) { @SuppressWarnings({"unchecked", "rawtypes"}) List<SerializableFunction<DataT, Object>> castedExtractInputFns = (List) extractInputFns; this.extractInputFns = castedExtractInputFns; + this.combineInputCoders = combineInputCoders; @SuppressWarnings({"unchecked", "rawtypes"}) List<CombineFn<Object, Object, Object>> castedCombineFns = (List) combineFns; @@ -250,6 +276,10 @@ private ComposedCombineFn( .addAll(extractInputFns) .add(extractInputFn) .build(), + ImmutableList.<Optional<Coder>>builder() + .addAll(combineInputCoders) + .add(Optional.absent()) + .build(), ImmutableList.<CombineFn<?, ?, ?>>builder().addAll(combineFns).add(combineFn).build(), ImmutableList.<TupleTag<?>>builder().addAll(outputTags).add(outputTag).build()); } @@ -272,6 +302,59 @@ private ComposedCombineFn( .addAll(extractInputFns) .add(extractInputFn) .build(), + ImmutableList.<Optional<Coder>>builder() + .addAll(combineInputCoders) + .add(Optional.absent()) + .build(), + ImmutableList.<CombineFnWithContext<?, ?, ?>>builder() + .addAll(fnsWithContext) + .add(combineFn) + .build(), + ImmutableList.<TupleTag<?>>builder().addAll(outputTags).add(outputTag).build()); + } + + /** Returns a {@link ComposedCombineFn} with an additional {@link CombineFn}. */ + public <InputT, OutputT> ComposedCombineFn<DataT> with( + SimpleFunction<DataT, InputT> extractInputFn, + Coder combineInputCoder, + CombineFn<InputT, ?, OutputT> combineFn, + TupleTag<OutputT> outputTag) { + checkUniqueness(outputTags, outputTag); + return new ComposedCombineFn<>( + ImmutableList.<SerializableFunction<DataT, ?>>builder() + .addAll(extractInputFns) + .add(extractInputFn) + .build(), + ImmutableList.<Optional<Coder>>builder() + .addAll(combineInputCoders) + .add(Optional.of(combineInputCoder)) + .build(), + ImmutableList.<CombineFn<?, ?, ?>>builder().addAll(combineFns).add(combineFn).build(), + ImmutableList.<TupleTag<?>>builder().addAll(outputTags).add(outputTag).build()); + } + + /** + * Returns a {@link ComposedCombineFnWithContext} with an additional {@link + * CombineFnWithContext}. + */ + public <InputT, OutputT> ComposedCombineFnWithContext<DataT> with( + SimpleFunction<DataT, InputT> extractInputFn, + Coder combineInputCoder, + CombineFnWithContext<InputT, ?, OutputT> combineFn, + TupleTag<OutputT> outputTag) { + checkUniqueness(outputTags, outputTag); + List<CombineFnWithContext<Object, Object, Object>> fnsWithContext = + combineFns.stream().map(CombineFnUtil::toFnWithContext).collect(Collectors.toList()); + + return new ComposedCombineFnWithContext<>( + ImmutableList.<SerializableFunction<DataT, ?>>builder() + .addAll(extractInputFns) + .add(extractInputFn) + .build(), + ImmutableList.<Optional<Coder>>builder() + .addAll(combineInputCoders) + .add(Optional.of(combineInputCoder)) + .build(), ImmutableList.<CombineFnWithContext<?, ?, ?>>builder() .addAll(fnsWithContext) .add(combineFn) @@ -336,7 +419,10 @@ public CoCombineResult extractOutput(Object[] accumulator) { throws CannotProvideCoderException { List<Coder<Object>> coders = Lists.newArrayList(); for (int i = 0; i < combineFnCount; ++i) { - Coder<Object> inputCoder = registry.getOutputCoder(extractInputFns.get(i), dataCoder); + Coder<Object> inputCoder = + combineInputCoders.get(i).isPresent() + ? combineInputCoders.get(i).get() + : registry.getOutputCoder(extractInputFns.get(i), dataCoder); coders.add(combineFns.get(i).getAccumulatorCoder(registry, inputCoder)); } return new ComposedAccumulatorCoder(coders); @@ -361,12 +447,14 @@ public void populateDisplayData(DisplayData.Builder builder) { extends CombineFnWithContext<DataT, Object[], CoCombineResult> { private final List<SerializableFunction<DataT, Object>> extractInputFns; + private final List<Optional<Coder>> combineInputCoders; private final List<CombineFnWithContext<Object, Object, Object>> combineFnWithContexts; private final List<TupleTag<?>> outputTags; private final int combineFnCount; private ComposedCombineFnWithContext() { this.extractInputFns = ImmutableList.of(); + this.combineInputCoders = ImmutableList.of(); this.combineFnWithContexts = ImmutableList.of(); this.outputTags = ImmutableList.of(); this.combineFnCount = 0; @@ -374,11 +462,13 @@ private ComposedCombineFnWithContext() { private ComposedCombineFnWithContext( ImmutableList<SerializableFunction<DataT, ?>> extractInputFns, + ImmutableList<Optional<Coder>> combineInputCoders, ImmutableList<CombineFnWithContext<?, ?, ?>> combineFnWithContexts, ImmutableList<TupleTag<?>> outputTags) { @SuppressWarnings({"unchecked", "rawtypes"}) List<SerializableFunction<DataT, Object>> castedExtractInputFns = (List) extractInputFns; this.extractInputFns = castedExtractInputFns; + this.combineInputCoders = combineInputCoders; @SuppressWarnings({"rawtypes", "unchecked"}) List<CombineFnWithContext<Object, Object, Object>> castedCombineFnWithContexts = @@ -402,6 +492,35 @@ private ComposedCombineFnWithContext( .addAll(extractInputFns) .add(extractInputFn) .build(), + ImmutableList.<Optional<Coder>>builder() + .addAll(combineInputCoders) + .add(Optional.absent()) + .build(), + ImmutableList.<CombineFnWithContext<?, ?, ?>>builder() + .addAll(combineFnWithContexts) + .add(CombineFnUtil.toFnWithContext(globalCombineFn)) + .build(), + ImmutableList.<TupleTag<?>>builder().addAll(outputTags).add(outputTag).build()); + } + + /** + * Returns a {@link ComposedCombineFnWithContext} with an additional {@link GlobalCombineFn}. + */ + public <InputT, OutputT> ComposedCombineFnWithContext<DataT> with( + SimpleFunction<DataT, InputT> extractInputFn, + Coder<InputT> combineInputCoder, + GlobalCombineFn<InputT, ?, OutputT> globalCombineFn, + TupleTag<OutputT> outputTag) { + checkUniqueness(outputTags, outputTag); + return new ComposedCombineFnWithContext<>( + ImmutableList.<SerializableFunction<DataT, ?>>builder() + .addAll(extractInputFns) + .add(extractInputFn) + .build(), + ImmutableList.<Optional<Coder>>builder() + .addAll(combineInputCoders) + .add(Optional.of(combineInputCoder)) + .build(), ImmutableList.<CombineFnWithContext<?, ?, ?>>builder() .addAll(combineFnWithContexts) .add(CombineFnUtil.toFnWithContext(globalCombineFn)) @@ -470,7 +589,10 @@ public CoCombineResult extractOutput(Object[] accumulator, Context c) { throws CannotProvideCoderException { List<Coder<Object>> coders = Lists.newArrayList(); for (int i = 0; i < combineFnCount; ++i) { - Coder<Object> inputCoder = registry.getOutputCoder(extractInputFns.get(i), dataCoder); + Coder<Object> inputCoder = + combineInputCoders.get(i).isPresent() + ? combineInputCoders.get(i).get() + : registry.getOutputCoder(extractInputFns.get(i), dataCoder); coders.add(combineFnWithContexts.get(i).getAccumulatorCoder(registry, inputCoder)); } return new ComposedAccumulatorCoder(coders); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java index 35bef7cd6a5..59e569e09a6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java @@ -168,7 +168,40 @@ private Top() { * {@code PCollection} of {@code KV}s and return the top values associated with each key. */ public static <T extends Comparable<T>> Combine.Globally<T, List<T>> largest(int count) { - return Combine.globally(new TopCombineFn<>(count, new Natural<T>())); + return Combine.globally(largestFn(count)); + } + + /** Returns a {@link TopCombineFn} that aggregates the largest count values. */ + public static <T extends Comparable<T>> TopCombineFn<T, Natural<T>> largestFn(int count) { + return new TopCombineFn<T, Natural<T>>(count, new Natural<T>()) {}; + } + /** Returns a {@link TopCombineFn} that aggregates the largest count long values. */ + public static TopCombineFn<Long, Natural<Long>> largestLongsFn(int count) { + return new TopCombineFn<Long, Natural<Long>>(count, new Natural<Long>()) {}; + } + /** Returns a {@link TopCombineFn} that aggregates the largest count int values. */ + public static TopCombineFn<Integer, Natural<Integer>> largestIntsFn(int count) { + return new TopCombineFn<Integer, Natural<Integer>>(count, new Natural<>()) {}; + } + /** Returns a {@link TopCombineFn} that aggregates the largest count double values. */ + public static TopCombineFn<Double, Natural<Double>> largestDoublesFn(int count) { + return new TopCombineFn<Double, Natural<Double>>(count, new Natural<>()) {}; + } + /** Returns a {@link TopCombineFn} that aggregates the smallest count values. */ + public static <T extends Comparable<T>> TopCombineFn<T, Reversed<T>> smallestFn(int count) { + return new TopCombineFn<T, Reversed<T>>(count, new Reversed<>()) {}; + } + /** Returns a {@link TopCombineFn} that aggregates the smallest count long values. */ + public static TopCombineFn<Long, Reversed<Long>> smallestLongsFn(int count) { + return new TopCombineFn<Long, Reversed<Long>>(count, new Reversed<>()) {}; + } + /** Returns a {@link TopCombineFn} that aggregates the smallest count int values. */ + public static TopCombineFn<Integer, Reversed<Integer>> smallestIntsFn(int count) { + return new TopCombineFn<Integer, Reversed<Integer>>(count, new Reversed<>()) {}; + } + /** Returns a {@link TopCombineFn} that aggregates the smallest count double values. */ + public static TopCombineFn<Double, Reversed<Double>> smallestDoublesFn(int count) { + return new TopCombineFn<Double, Reversed<Double>>(count, new Reversed<>()) {}; } /** @@ -248,7 +281,7 @@ private Top() { */ public static <K, V extends Comparable<V>> PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>> smallestPerKey(int count) { - return Combine.perKey(new TopCombineFn<>(count, new Reversed<V>())); + return Combine.perKey(smallestFn(count)); } /** @@ -287,7 +320,7 @@ private Top() { * PCollection} and return the top elements. */ public static <K, V extends Comparable<V>> PerKey<K, V, List<V>> largestPerKey(int count) { - return Combine.perKey(new TopCombineFn<>(count, new Natural<V>())); + return Combine.perKey(largestFn(count)); } /** @deprecated use {@link Natural} instead */ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 143359) Time Spent: 6.5h (was: 6h 20m) > Create a library of useful transforms that use schemas > ------------------------------------------------------ > > Key: BEAM-4461 > URL: https://issues.apache.org/jira/browse/BEAM-4461 > Project: Beam > Issue Type: Sub-task > Components: sdk-java-core > Reporter: Reuven Lax > Assignee: Reuven Lax > Priority: Major > Time Spent: 6.5h > Remaining Estimate: 0h > > e.g. JoinBy(fields). Project, Filter, etc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)