benWize commented on a change in pull request #15174:
URL: https://github.com/apache/beam/pull/15174#discussion_r698874206



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
##########
@@ -216,6 +216,8 @@ public RelWriter explainTerms(RelWriter pw) {
     private WindowFn<Row, IntervalWindow> windowFn;
     private int windowFieldIndex;
     private List<FieldAggregation> fieldAggregations;
+    private int groupSetCount;

Review comment:
       Yes, fixed!

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
##########
@@ -243,9 +247,70 @@ private Transform(
       if (windowFn != null) {
         windowedStream = assignTimestampsAndWindow(upstream);
       }
-
       validateWindowIsSupported(windowedStream);
+      return createCombiner(pinput, windowedStream);
+    }
+
+    private PCollection<Row> createCombiner(
+        PCollectionList<Row> pinput, PCollection<Row> windowedStream) {
+      // Check if have fields to be grouped
+      if (groupSetCount > 0) {
+        PTransform<PCollection<Row>, PCollection<Row>> combiner = 
createGroupCombiner();
+        boolean verifyRowValues =
+            
pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues();
+        return windowedStream
+            .apply(combiner)
+            .apply(
+                "mergeRecord",
+                ParDo.of(
+                    mergeRecord(outputSchema, windowFieldIndex, ignoreValues, 
verifyRowValues)))
+            .setRowSchema(outputSchema);
+      }
+      PTransform<PCollection<Row>, PCollection<Row>> combiner = 
createGlobalCombiner();
+      return windowedStream.apply(combiner).setRowSchema(outputSchema);
+    }
 
+    private PTransform<PCollection<Row>, PCollection<Row>> 
createGlobalCombiner() {
+      org.apache.beam.sdk.schemas.transforms.Group.Global<Row> globally =
+          org.apache.beam.sdk.schemas.transforms.Group.globally();
+      org.apache.beam.sdk.schemas.transforms.Group.CombineFieldsGlobally<Row> 
combined = null;
+      for (FieldAggregation fieldAggregation : fieldAggregations) {
+        List<Integer> inputs = fieldAggregation.inputs;
+        CombineFn combineFn = fieldAggregation.combineFn;
+        if (inputs.size() > 1 || inputs.isEmpty()) {

Review comment:
       Changed

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
##########
@@ -243,9 +247,70 @@ private Transform(
       if (windowFn != null) {
         windowedStream = assignTimestampsAndWindow(upstream);
       }
-
       validateWindowIsSupported(windowedStream);
+      return createCombiner(pinput, windowedStream);
+    }
+
+    private PCollection<Row> createCombiner(
+        PCollectionList<Row> pinput, PCollection<Row> windowedStream) {
+      // Check if have fields to be grouped
+      if (groupSetCount > 0) {
+        PTransform<PCollection<Row>, PCollection<Row>> combiner = 
createGroupCombiner();
+        boolean verifyRowValues =
+            
pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues();
+        return windowedStream
+            .apply(combiner)
+            .apply(
+                "mergeRecord",
+                ParDo.of(
+                    mergeRecord(outputSchema, windowFieldIndex, ignoreValues, 
verifyRowValues)))
+            .setRowSchema(outputSchema);
+      }
+      PTransform<PCollection<Row>, PCollection<Row>> combiner = 
createGlobalCombiner();
+      return windowedStream.apply(combiner).setRowSchema(outputSchema);
+    }
 
+    private PTransform<PCollection<Row>, PCollection<Row>> 
createGlobalCombiner() {

Review comment:
       I created a new interface in order to make the code reusable for both 
`global` and `byFields` combiners.

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##########
@@ -56,12 +58,13 @@
       BUILTIN_AGGREGATOR_FACTORIES =
           ImmutableMap.<String, Function<Schema.FieldType, CombineFn<?, ?, 
?>>>builder()
               .put("ANY_VALUE", typeName -> Sample.anyValueCombineFn())
-              .put("COUNT", typeName -> Count.combineFn())
-              .put("MAX", BeamBuiltinAggregations::createMax)
-              .put("MIN", BeamBuiltinAggregations::createMin)
-              .put("SUM", BeamBuiltinAggregations::createSum)
-              .put("$SUM0", BeamBuiltinAggregations::createSum)
-              .put("AVG", BeamBuiltinAggregations::createAvg)
+              // Drop null elements for these aggregations BEAM-10379

Review comment:
       Comment changed

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##########
@@ -229,27 +232,93 @@ public T apply(T left, T right) {
     public Short apply(Short left, Short right) {
       return (short) (left + right);
     }
+
+    @Override
+    public @Nullable Short identity() {
+      return 0;
+    }
   }
 
   static class ByteSum extends Combine.BinaryCombineFn<Byte> {
     @Override
     public Byte apply(Byte left, Byte right) {
       return (byte) (left + right);
     }
+
+    @Override
+    public @Nullable Byte identity() {
+      return 0;
+    }
   }
 
   static class FloatSum extends Combine.BinaryCombineFn<Float> {
     @Override
     public Float apply(Float left, Float right) {
       return left + right;
     }
+
+    @Override
+    public @Nullable Float identity() {
+      return 0F;
+    }
+  }
+
+  static class LongSum extends Combine.BinaryCombineFn<Long> {
+    @Override
+    public Long apply(Long left, Long right) {
+      return Math.addExact(left, right);
+    }
+
+    @Override
+    public @Nullable Long identity() {
+      return 0L;
+    }
   }
 
   static class BigDecimalSum extends Combine.BinaryCombineFn<BigDecimal> {
     @Override
     public BigDecimal apply(BigDecimal left, BigDecimal right) {
       return left.add(right);
     }
+
+    @Override
+    public @Nullable BigDecimal identity() {
+      return BigDecimal.ZERO;
+    }
+  }
+
+  static class DropNullFn<InputT, AccumT, OutputT> extends CombineFn<InputT, 
AccumT, OutputT> {
+    CombineFn<InputT, AccumT, OutputT> combineFn;

Review comment:
       Yes, fixed

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##########
@@ -229,27 +232,93 @@ public T apply(T left, T right) {
     public Short apply(Short left, Short right) {
       return (short) (left + right);
     }
+
+    @Override
+    public @Nullable Short identity() {
+      return 0;
+    }
   }
 
   static class ByteSum extends Combine.BinaryCombineFn<Byte> {
     @Override
     public Byte apply(Byte left, Byte right) {
       return (byte) (left + right);
     }
+
+    @Override
+    public @Nullable Byte identity() {
+      return 0;
+    }
   }
 
   static class FloatSum extends Combine.BinaryCombineFn<Float> {
     @Override
     public Float apply(Float left, Float right) {
       return left + right;
     }
+
+    @Override
+    public @Nullable Float identity() {
+      return 0F;
+    }
+  }
+
+  static class LongSum extends Combine.BinaryCombineFn<Long> {
+    @Override
+    public Long apply(Long left, Long right) {
+      return Math.addExact(left, right);
+    }
+
+    @Override
+    public @Nullable Long identity() {
+      return 0L;
+    }
   }
 
   static class BigDecimalSum extends Combine.BinaryCombineFn<BigDecimal> {
     @Override
     public BigDecimal apply(BigDecimal left, BigDecimal right) {
       return left.add(right);
     }
+
+    @Override
+    public @Nullable BigDecimal identity() {
+      return BigDecimal.ZERO;
+    }
+  }
+
+  static class DropNullFn<InputT, AccumT, OutputT> extends CombineFn<InputT, 
AccumT, OutputT> {

Review comment:
       Yes, fixed

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##########
@@ -376,29 +445,45 @@ public BigDecimal toBigDecimal(BigDecimal record) {
     }
   }
 
-  static class BitOr<T extends Number> extends CombineFn<T, Long, Long> {
+  static class BitOr<T extends Number> extends CombineFn<T, BitOr.Accum, Long> 
{
+    static class Accum implements Serializable {
+      /** True if no inputs have been seen yet. */
+      boolean isEmpty = true;
+      /** The bitwise-and of the inputs seen so far. */
+      long bitOr = 0L;
+    }
+
     @Override
-    public Long createAccumulator() {
-      return 0L;
+    public Accum createAccumulator() {
+      return new Accum();
     }
 
     @Override
-    public Long addInput(Long accum, T input) {
-      return accum | input.longValue();
+    public Accum addInput(Accum accum, T input) {
+      accum.bitOr |= input.longValue();

Review comment:
       I added a validation similar to BitAnd

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/CovarianceFn.java
##########
@@ -124,6 +124,11 @@ private BigDecimal getCovariance(CovarianceAccumulator 
covariance) {
     BigDecimal adjustedCount =
         this.isSample ? covariance.count().subtract(BigDecimal.ONE) : 
covariance.count();
 
+    // Avoid ArithmeticException: Division is undefined when adjustedCount and 
covariance are 0

Review comment:
       Yes, I changed the validation and comment




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to