iemejia commented on a change in pull request #16856:
URL: https://github.com/apache/beam/pull/16856#discussion_r808123618



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/CountIf.java
##########
@@ -27,49 +30,41 @@
 public class CountIf {
   private CountIf() {}
 
-  public static CountIfFn combineFn() {
-    return new CountIf.CountIfFn();
+  public static Combine.CombineFn<Boolean, ?, Long> combineFn() {
+    return new CountIfFn();
   }
 
-  public static class CountIfFn extends Combine.CombineFn<Boolean, 
CountIfFn.Accum, Long> {
+  public static class CountIfFn extends Combine.CombineFn<Boolean, long[], 
Long> {
+    private final Combine.CombineFn<Boolean, long[], Long> countFn =
+        (Combine.CombineFn<Boolean, long[], Long>) Count.<Boolean>combineFn();
 
-    public static class Accum implements Serializable {
-      boolean isExpressionFalse = true;
-      long countIfResult = 0L;
+    @Override
+    public long[] createAccumulator() {
+      return countFn.createAccumulator();
     }
 
     @Override
-    public Accum createAccumulator() {
-      return new Accum();
+    public long[] addInput(long[] accumulator, Boolean input) {
+      if (Boolean.TRUE.equals(input)) {
+        countFn.addInput(accumulator, input);
+      }
+      return accumulator;
     }
 
     @Override
-    public Accum addInput(Accum accum, Boolean input) {
-      if (input) {
-        accum.isExpressionFalse = false;

Review comment:
       It was the same conclusion we arrived too with Ryan, that's why I went 
into the simplification road. I added the tests to try to find issues and fixed 
the ones I saw.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to