[ 
https://issues.apache.org/jira/browse/BEAM-4461?focusedWorklogId=143359&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-143359
 ]

ASF GitHub Bot logged work on BEAM-4461:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Sep/18 04:17
            Start Date: 12/Sep/18 04:17
    Worklog Time Spent: 10m 
      Work Description: reuvenlax closed pull request #6318: [BEAM-4461] Some 
fixes to Combiners needed for Schema support.
URL: https://github.com/apache/beam/pull/6318
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
index c4f7813fbdb..b7e68d2a896 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineFns.java
@@ -20,6 +20,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.base.Objects;
+import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
@@ -32,6 +33,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -112,6 +114,16 @@ public static ComposeCombineFnBuilder compose() {
       return new ComposedCombineFn<DataT>().with(extractInputFn, combineFn, 
outputTag);
     }
 
+    /** Like {@link #with(SimpleFunction, CombineFn, TupleTag)} but with an 
explicit input coder. */
+    public <DataT, InputT, OutputT> ComposedCombineFn<DataT> with(
+        SimpleFunction<DataT, InputT> extractInputFn,
+        Coder combineInputCoder,
+        CombineFn<InputT, ?, OutputT> combineFn,
+        TupleTag<OutputT> outputTag) {
+      return new ComposedCombineFn<DataT>()
+          .with(extractInputFn, combineInputCoder, combineFn, outputTag);
+    }
+
     /**
      * Returns a {@link ComposedCombineFnWithContext} that can take additional 
{@link
      * GlobalCombineFn GlobalCombineFns} and apply them as a single combine 
function.
@@ -127,6 +139,16 @@ public static ComposeCombineFnBuilder compose() {
       return new ComposedCombineFnWithContext<DataT>()
           .with(extractInputFn, combineFnWithContext, outputTag);
     }
+
+    /** Like {@link #with(SimpleFunction, CombineFnWithContext, TupleTag)} but 
with input coder. */
+    public <DataT, InputT, OutputT> ComposedCombineFnWithContext<DataT> with(
+        SimpleFunction<DataT, InputT> extractInputFn,
+        Coder combineInputCoder,
+        CombineFnWithContext<InputT, ?, OutputT> combineFnWithContext,
+        TupleTag<OutputT> outputTag) {
+      return new ComposedCombineFnWithContext<DataT>()
+          .with(extractInputFn, combineInputCoder, combineFnWithContext, 
outputTag);
+    }
   }
 
   /////////////////////////////////////////////////////////////////////////////
@@ -212,12 +234,14 @@ public int hashCode() {
   public static class ComposedCombineFn<DataT> extends CombineFn<DataT, 
Object[], CoCombineResult> {
 
     private final List<CombineFn<Object, Object, Object>> combineFns;
+    private final List<Optional<Coder>> combineInputCoders;
     private final List<SerializableFunction<DataT, Object>> extractInputFns;
     private final List<TupleTag<?>> outputTags;
     private final int combineFnCount;
 
     private ComposedCombineFn() {
       this.extractInputFns = ImmutableList.of();
+      this.combineInputCoders = ImmutableList.of();
       this.combineFns = ImmutableList.of();
       this.outputTags = ImmutableList.of();
       this.combineFnCount = 0;
@@ -225,11 +249,13 @@ private ComposedCombineFn() {
 
     private ComposedCombineFn(
         ImmutableList<SerializableFunction<DataT, ?>> extractInputFns,
+        List<Optional<Coder>> combineInputCoders,
         ImmutableList<CombineFn<?, ?, ?>> combineFns,
         ImmutableList<TupleTag<?>> outputTags) {
       @SuppressWarnings({"unchecked", "rawtypes"})
       List<SerializableFunction<DataT, Object>> castedExtractInputFns = (List) 
extractInputFns;
       this.extractInputFns = castedExtractInputFns;
+      this.combineInputCoders = combineInputCoders;
 
       @SuppressWarnings({"unchecked", "rawtypes"})
       List<CombineFn<Object, Object, Object>> castedCombineFns = (List) 
combineFns;
@@ -250,6 +276,10 @@ private ComposedCombineFn(
               .addAll(extractInputFns)
               .add(extractInputFn)
               .build(),
+          ImmutableList.<Optional<Coder>>builder()
+              .addAll(combineInputCoders)
+              .add(Optional.absent())
+              .build(),
           ImmutableList.<CombineFn<?, ?, 
?>>builder().addAll(combineFns).add(combineFn).build(),
           
ImmutableList.<TupleTag<?>>builder().addAll(outputTags).add(outputTag).build());
     }
@@ -272,6 +302,59 @@ private ComposedCombineFn(
               .addAll(extractInputFns)
               .add(extractInputFn)
               .build(),
+          ImmutableList.<Optional<Coder>>builder()
+              .addAll(combineInputCoders)
+              .add(Optional.absent())
+              .build(),
+          ImmutableList.<CombineFnWithContext<?, ?, ?>>builder()
+              .addAll(fnsWithContext)
+              .add(combineFn)
+              .build(),
+          
ImmutableList.<TupleTag<?>>builder().addAll(outputTags).add(outputTag).build());
+    }
+
+    /** Returns a {@link ComposedCombineFn} with an additional {@link 
CombineFn}. */
+    public <InputT, OutputT> ComposedCombineFn<DataT> with(
+        SimpleFunction<DataT, InputT> extractInputFn,
+        Coder combineInputCoder,
+        CombineFn<InputT, ?, OutputT> combineFn,
+        TupleTag<OutputT> outputTag) {
+      checkUniqueness(outputTags, outputTag);
+      return new ComposedCombineFn<>(
+          ImmutableList.<SerializableFunction<DataT, ?>>builder()
+              .addAll(extractInputFns)
+              .add(extractInputFn)
+              .build(),
+          ImmutableList.<Optional<Coder>>builder()
+              .addAll(combineInputCoders)
+              .add(Optional.of(combineInputCoder))
+              .build(),
+          ImmutableList.<CombineFn<?, ?, 
?>>builder().addAll(combineFns).add(combineFn).build(),
+          
ImmutableList.<TupleTag<?>>builder().addAll(outputTags).add(outputTag).build());
+    }
+
+    /**
+     * Returns a {@link ComposedCombineFnWithContext} with an additional {@link
+     * CombineFnWithContext}.
+     */
+    public <InputT, OutputT> ComposedCombineFnWithContext<DataT> with(
+        SimpleFunction<DataT, InputT> extractInputFn,
+        Coder combineInputCoder,
+        CombineFnWithContext<InputT, ?, OutputT> combineFn,
+        TupleTag<OutputT> outputTag) {
+      checkUniqueness(outputTags, outputTag);
+      List<CombineFnWithContext<Object, Object, Object>> fnsWithContext =
+          
combineFns.stream().map(CombineFnUtil::toFnWithContext).collect(Collectors.toList());
+
+      return new ComposedCombineFnWithContext<>(
+          ImmutableList.<SerializableFunction<DataT, ?>>builder()
+              .addAll(extractInputFns)
+              .add(extractInputFn)
+              .build(),
+          ImmutableList.<Optional<Coder>>builder()
+              .addAll(combineInputCoders)
+              .add(Optional.of(combineInputCoder))
+              .build(),
           ImmutableList.<CombineFnWithContext<?, ?, ?>>builder()
               .addAll(fnsWithContext)
               .add(combineFn)
@@ -336,7 +419,10 @@ public CoCombineResult extractOutput(Object[] accumulator) 
{
         throws CannotProvideCoderException {
       List<Coder<Object>> coders = Lists.newArrayList();
       for (int i = 0; i < combineFnCount; ++i) {
-        Coder<Object> inputCoder = 
registry.getOutputCoder(extractInputFns.get(i), dataCoder);
+        Coder<Object> inputCoder =
+            combineInputCoders.get(i).isPresent()
+                ? combineInputCoders.get(i).get()
+                : registry.getOutputCoder(extractInputFns.get(i), dataCoder);
         coders.add(combineFns.get(i).getAccumulatorCoder(registry, 
inputCoder));
       }
       return new ComposedAccumulatorCoder(coders);
@@ -361,12 +447,14 @@ public void populateDisplayData(DisplayData.Builder 
builder) {
       extends CombineFnWithContext<DataT, Object[], CoCombineResult> {
 
     private final List<SerializableFunction<DataT, Object>> extractInputFns;
+    private final List<Optional<Coder>> combineInputCoders;
     private final List<CombineFnWithContext<Object, Object, Object>> 
combineFnWithContexts;
     private final List<TupleTag<?>> outputTags;
     private final int combineFnCount;
 
     private ComposedCombineFnWithContext() {
       this.extractInputFns = ImmutableList.of();
+      this.combineInputCoders = ImmutableList.of();
       this.combineFnWithContexts = ImmutableList.of();
       this.outputTags = ImmutableList.of();
       this.combineFnCount = 0;
@@ -374,11 +462,13 @@ private ComposedCombineFnWithContext() {
 
     private ComposedCombineFnWithContext(
         ImmutableList<SerializableFunction<DataT, ?>> extractInputFns,
+        ImmutableList<Optional<Coder>> combineInputCoders,
         ImmutableList<CombineFnWithContext<?, ?, ?>> combineFnWithContexts,
         ImmutableList<TupleTag<?>> outputTags) {
       @SuppressWarnings({"unchecked", "rawtypes"})
       List<SerializableFunction<DataT, Object>> castedExtractInputFns = (List) 
extractInputFns;
       this.extractInputFns = castedExtractInputFns;
+      this.combineInputCoders = combineInputCoders;
 
       @SuppressWarnings({"rawtypes", "unchecked"})
       List<CombineFnWithContext<Object, Object, Object>> 
castedCombineFnWithContexts =
@@ -402,6 +492,35 @@ private ComposedCombineFnWithContext(
               .addAll(extractInputFns)
               .add(extractInputFn)
               .build(),
+          ImmutableList.<Optional<Coder>>builder()
+              .addAll(combineInputCoders)
+              .add(Optional.absent())
+              .build(),
+          ImmutableList.<CombineFnWithContext<?, ?, ?>>builder()
+              .addAll(combineFnWithContexts)
+              .add(CombineFnUtil.toFnWithContext(globalCombineFn))
+              .build(),
+          
ImmutableList.<TupleTag<?>>builder().addAll(outputTags).add(outputTag).build());
+    }
+
+    /**
+     * Returns a {@link ComposedCombineFnWithContext} with an additional 
{@link GlobalCombineFn}.
+     */
+    public <InputT, OutputT> ComposedCombineFnWithContext<DataT> with(
+        SimpleFunction<DataT, InputT> extractInputFn,
+        Coder<InputT> combineInputCoder,
+        GlobalCombineFn<InputT, ?, OutputT> globalCombineFn,
+        TupleTag<OutputT> outputTag) {
+      checkUniqueness(outputTags, outputTag);
+      return new ComposedCombineFnWithContext<>(
+          ImmutableList.<SerializableFunction<DataT, ?>>builder()
+              .addAll(extractInputFns)
+              .add(extractInputFn)
+              .build(),
+          ImmutableList.<Optional<Coder>>builder()
+              .addAll(combineInputCoders)
+              .add(Optional.of(combineInputCoder))
+              .build(),
           ImmutableList.<CombineFnWithContext<?, ?, ?>>builder()
               .addAll(combineFnWithContexts)
               .add(CombineFnUtil.toFnWithContext(globalCombineFn))
@@ -470,7 +589,10 @@ public CoCombineResult extractOutput(Object[] accumulator, 
Context c) {
         throws CannotProvideCoderException {
       List<Coder<Object>> coders = Lists.newArrayList();
       for (int i = 0; i < combineFnCount; ++i) {
-        Coder<Object> inputCoder = 
registry.getOutputCoder(extractInputFns.get(i), dataCoder);
+        Coder<Object> inputCoder =
+            combineInputCoders.get(i).isPresent()
+                ? combineInputCoders.get(i).get()
+                : registry.getOutputCoder(extractInputFns.get(i), dataCoder);
         coders.add(combineFnWithContexts.get(i).getAccumulatorCoder(registry, 
inputCoder));
       }
       return new ComposedAccumulatorCoder(coders);
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
index 35bef7cd6a5..59e569e09a6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Top.java
@@ -168,7 +168,40 @@ private Top() {
    * {@code PCollection} of {@code KV}s and return the top values associated 
with each key.
    */
   public static <T extends Comparable<T>> Combine.Globally<T, List<T>> 
largest(int count) {
-    return Combine.globally(new TopCombineFn<>(count, new Natural<T>()));
+    return Combine.globally(largestFn(count));
+  }
+
+  /** Returns a {@link TopCombineFn} that aggregates the largest count values. 
*/
+  public static <T extends Comparable<T>> TopCombineFn<T, Natural<T>> 
largestFn(int count) {
+    return new TopCombineFn<T, Natural<T>>(count, new Natural<T>()) {};
+  }
+  /** Returns a {@link TopCombineFn} that aggregates the largest count long 
values. */
+  public static TopCombineFn<Long, Natural<Long>> largestLongsFn(int count) {
+    return new TopCombineFn<Long, Natural<Long>>(count, new Natural<Long>()) 
{};
+  }
+  /** Returns a {@link TopCombineFn} that aggregates the largest count int 
values. */
+  public static TopCombineFn<Integer, Natural<Integer>> largestIntsFn(int 
count) {
+    return new TopCombineFn<Integer, Natural<Integer>>(count, new Natural<>()) 
{};
+  }
+  /** Returns a {@link TopCombineFn} that aggregates the largest count double 
values. */
+  public static TopCombineFn<Double, Natural<Double>> largestDoublesFn(int 
count) {
+    return new TopCombineFn<Double, Natural<Double>>(count, new Natural<>()) 
{};
+  }
+  /** Returns a {@link TopCombineFn} that aggregates the smallest count 
values. */
+  public static <T extends Comparable<T>> TopCombineFn<T, Reversed<T>> 
smallestFn(int count) {
+    return new TopCombineFn<T, Reversed<T>>(count, new Reversed<>()) {};
+  }
+  /** Returns a {@link TopCombineFn} that aggregates the smallest count long 
values. */
+  public static TopCombineFn<Long, Reversed<Long>> smallestLongsFn(int count) {
+    return new TopCombineFn<Long, Reversed<Long>>(count, new Reversed<>()) {};
+  }
+  /** Returns a {@link TopCombineFn} that aggregates the smallest count int 
values. */
+  public static TopCombineFn<Integer, Reversed<Integer>> smallestIntsFn(int 
count) {
+    return new TopCombineFn<Integer, Reversed<Integer>>(count, new 
Reversed<>()) {};
+  }
+  /** Returns a {@link TopCombineFn} that aggregates the smallest count double 
values. */
+  public static TopCombineFn<Double, Reversed<Double>> smallestDoublesFn(int 
count) {
+    return new TopCombineFn<Double, Reversed<Double>>(count, new Reversed<>()) 
{};
   }
 
   /**
@@ -248,7 +281,7 @@ private Top() {
    */
   public static <K, V extends Comparable<V>>
       PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>> 
smallestPerKey(int count) {
-    return Combine.perKey(new TopCombineFn<>(count, new Reversed<V>()));
+    return Combine.perKey(smallestFn(count));
   }
 
   /**
@@ -287,7 +320,7 @@ private Top() {
    * PCollection} and return the top elements.
    */
   public static <K, V extends Comparable<V>> PerKey<K, V, List<V>> 
largestPerKey(int count) {
-    return Combine.perKey(new TopCombineFn<>(count, new Natural<V>()));
+    return Combine.perKey(largestFn(count));
   }
 
   /** @deprecated use {@link Natural} instead */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 143359)
    Time Spent: 6.5h  (was: 6h 20m)

> Create a library of useful transforms that use schemas
> ------------------------------------------------------
>
>                 Key: BEAM-4461
>                 URL: https://issues.apache.org/jira/browse/BEAM-4461
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-core
>            Reporter: Reuven Lax
>            Assignee: Reuven Lax
>            Priority: Major
>          Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> e.g. JoinBy(fields). Project, Filter, etc.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to