ibzib commented on a change in pull request #15174:
URL: https://github.com/apache/beam/pull/15174#discussion_r701442197



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
##########
@@ -216,6 +216,8 @@ public RelWriter explainTerms(RelWriter pw) {
     private WindowFn<Row, IntervalWindow> windowFn;
     private int windowFieldIndex;
     private List<FieldAggregation> fieldAggregations;
+    private final int groupSetCount;
+    private boolean ignoreValues;

Review comment:
       Nit: this can be `final` too.

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##########
@@ -163,6 +167,32 @@ static CombineFn createSum(Schema.FieldType fieldType) {
     }
   }
 
+  /**
+   * {@link CombineFn} for Sum0 where sum of null returns 0 based on {@link 
Sum} and {@link
+   * Combine.BinaryCombineFn}.
+   */
+  static CombineFn createSum0(Schema.FieldType fieldType) {
+    switch (fieldType.getTypeName()) {
+      case INT32:
+        return new IntegerSum0();
+      case INT16:
+        return new ShortSum0();
+      case BYTE:
+        return new ByteSum0();
+      case INT64:
+        return new LongSum0();
+      case FLOAT:
+        return new FloatSum0();
+      case DOUBLE:
+        return new DoubleSum0();
+      case DECIMAL:
+        return new BigDecimalSum0();
+      default:
+        throw new UnsupportedOperationException(
+            String.format("[%s] is not supported in SUM", fieldType));

Review comment:
       ```suggestion
               String.format("[%s] is not supported in SUM0", fieldType));
   ```

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/rel/BeamAggregationRel.java
##########
@@ -243,55 +247,72 @@ private Transform(
       if (windowFn != null) {
         windowedStream = assignTimestampsAndWindow(upstream);
       }
-
       validateWindowIsSupported(windowedStream);
+      // Check if have fields to be grouped
+      if (groupSetCount > 0) {
+        
org.apache.beam.sdk.schemas.transforms.Group.AggregateCombinerInterface<Row> 
byFields =
+            
org.apache.beam.sdk.schemas.transforms.Group.byFieldIds(keyFieldsIds);
+        PTransform<PCollection<Row>, PCollection<Row>> combiner = 
createCombiner(byFields);
+        boolean verifyRowValues =
+            
pinput.getPipeline().getOptions().as(BeamSqlPipelineOptions.class).getVerifyRowValues();
+        return windowedStream
+            .apply(combiner)
+            .apply(
+                "mergeRecord",
+                ParDo.of(
+                    mergeRecord(outputSchema, windowFieldIndex, ignoreValues, 
verifyRowValues)))
+            .setRowSchema(outputSchema);
+      }
+      
org.apache.beam.sdk.schemas.transforms.Group.AggregateCombinerInterface<Row> 
globally =
+          org.apache.beam.sdk.schemas.transforms.Group.globally();
+      PTransform<PCollection<Row>, PCollection<Row>> combiner = 
createCombiner(globally);
+      return windowedStream.apply(combiner).setRowSchema(outputSchema);
+    }
+
+    private PTransform<PCollection<Row>, PCollection<Row>> createCombiner(
+        
org.apache.beam.sdk.schemas.transforms.Group.AggregateCombinerInterface<Row>
+            initialCombiner) {
 
-      org.apache.beam.sdk.schemas.transforms.Group.ByFields<Row> byFields =
-          
org.apache.beam.sdk.schemas.transforms.Group.byFieldIds(keyFieldsIds);
-      org.apache.beam.sdk.schemas.transforms.Group.CombineFieldsByFields<Row> 
combined = null;
+      
org.apache.beam.sdk.schemas.transforms.Group.AggregateCombinerInterface<Row> 
combined = null;
       for (FieldAggregation fieldAggregation : fieldAggregations) {
         List<Integer> inputs = fieldAggregation.inputs;
         CombineFn combineFn = fieldAggregation.combineFn;
-        if (inputs.size() > 1 || inputs.isEmpty()) {
-          // In this path we extract a Row (an empty row if inputs.isEmpty).
+        if (inputs.size() == 1) {
+          // Combining over a single field, so extract just that field.
           combined =
               (combined == null)
-                  ? byFields.aggregateFieldsById(inputs, combineFn, 
fieldAggregation.outputField)
-                  : combined.aggregateFieldsById(inputs, combineFn, 
fieldAggregation.outputField);
+                  ? initialCombiner.aggregateField(
+                      inputs.get(0), combineFn, fieldAggregation.outputField)
+                  : combined.aggregateField(inputs.get(0), combineFn, 
fieldAggregation.outputField);
         } else {
-          // Combining over a single field, so extract just that field.
+          // In this path we extract a Row (an empty row if inputs.isEmpty).
           combined =
               (combined == null)
-                  ? byFields.aggregateField(inputs.get(0), combineFn, 
fieldAggregation.outputField)
-                  : combined.aggregateField(inputs.get(0), combineFn, 
fieldAggregation.outputField);
+                  ? initialCombiner.aggregateFieldsById(
+                      inputs, combineFn, fieldAggregation.outputField)
+                  : combined.aggregateFieldsById(inputs, combineFn, 
fieldAggregation.outputField);
         }
       }
 
-      PTransform<PCollection<Row>, PCollection<Row>> combiner = combined;
-      boolean ignoreValues = false;
+      PTransform<PCollection<Row>, PCollection<Row>> combiner =
+          (PTransform<PCollection<Row>, PCollection<Row>>) combined;

Review comment:
       It looks like the problem is that `CombineFieldsGlobally` and 
`CombineFieldsByFields` extend `PTransform<PCollection<InputT>, 
PCollection<Row>>`, but `Global` extends `PTransform<PCollection<InputT>, 
PCollection<Iterable<InputT>>>`. However, it looks like you only use 
`Global.globally().aggregateField(...)`, which returns a 
`CombineFieldsGlobally` transform anyway. 
https://github.com/apache/beam/blob/3a7b8e757d0c220b5838ca6a0805ee463b24f85d/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Group.java#L171
   
   So can you try using `CombineFieldsGlobally` as the initial combiner instead 
of `Global`? Then `Global` doesn't have to extend `AggregateCombiner` at all.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to