kennknowles commented on a change in pull request #16856:
URL: https://github.com/apache/beam/pull/16856#discussion_r807976012



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/CountIf.java
##########
@@ -27,49 +30,41 @@
 public class CountIf {
   private CountIf() {}
 
-  public static CountIfFn combineFn() {
-    return new CountIf.CountIfFn();
+  public static Combine.CombineFn<Boolean, ?, Long> combineFn() {

Review comment:
       I suppose technically this is a breaking change. But of course 
everything implementing SQL is not intended for users. Is `sql/impl` marked 
`@Internal`? (this change still LGTM because it is not actually intended as a 
user API)

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/CountIf.java
##########
@@ -27,49 +30,41 @@
 public class CountIf {
   private CountIf() {}
 
-  public static CountIfFn combineFn() {
-    return new CountIf.CountIfFn();
+  public static Combine.CombineFn<Boolean, ?, Long> combineFn() {
+    return new CountIfFn();
   }
 
-  public static class CountIfFn extends Combine.CombineFn<Boolean, 
CountIfFn.Accum, Long> {
+  public static class CountIfFn extends Combine.CombineFn<Boolean, long[], 
Long> {
+    private final Combine.CombineFn<Boolean, long[], Long> countFn =
+        (Combine.CombineFn<Boolean, long[], Long>) Count.<Boolean>combineFn();
 
-    public static class Accum implements Serializable {
-      boolean isExpressionFalse = true;
-      long countIfResult = 0L;
+    @Override
+    public long[] createAccumulator() {
+      return countFn.createAccumulator();
     }
 
     @Override
-    public Accum createAccumulator() {
-      return new Accum();
+    public long[] addInput(long[] accumulator, Boolean input) {
+      if (Boolean.TRUE.equals(input)) {
+        countFn.addInput(accumulator, input);
+      }
+      return accumulator;
     }
 
     @Override
-    public Accum addInput(Accum accum, Boolean input) {
-      if (input) {
-        accum.isExpressionFalse = false;

Review comment:
       I never looked at this code before. Now that I see it... why was this 
field ever needed? Seems like the `0L` result contains all the useful info. 
Makes me worried there is something tricky that I am not noticing...

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/CountIf.java
##########
@@ -27,49 +30,41 @@
 public class CountIf {
   private CountIf() {}
 
-  public static CountIfFn combineFn() {
-    return new CountIf.CountIfFn();
+  public static Combine.CombineFn<Boolean, ?, Long> combineFn() {
+    return new CountIfFn();
   }
 
-  public static class CountIfFn extends Combine.CombineFn<Boolean, 
CountIfFn.Accum, Long> {
+  public static class CountIfFn extends Combine.CombineFn<Boolean, long[], 
Long> {
+    private final Combine.CombineFn<Boolean, long[], Long> countFn =
+        (Combine.CombineFn<Boolean, long[], Long>) Count.<Boolean>combineFn();

Review comment:
       This cast makes me think that `Count.combineFn()` worked too hard to 
hide the accumulator type. It should just expose it.

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/CountIf.java
##########
@@ -27,49 +30,41 @@
 public class CountIf {
   private CountIf() {}
 
-  public static CountIfFn combineFn() {
-    return new CountIf.CountIfFn();
+  public static Combine.CombineFn<Boolean, ?, Long> combineFn() {
+    return new CountIfFn();
   }
 
-  public static class CountIfFn extends Combine.CombineFn<Boolean, 
CountIfFn.Accum, Long> {
+  public static class CountIfFn extends Combine.CombineFn<Boolean, long[], 
Long> {
+    private final Combine.CombineFn<Boolean, long[], Long> countFn =
+        (Combine.CombineFn<Boolean, long[], Long>) Count.<Boolean>combineFn();
 
-    public static class Accum implements Serializable {
-      boolean isExpressionFalse = true;
-      long countIfResult = 0L;
+    @Override
+    public long[] createAccumulator() {
+      return countFn.createAccumulator();
     }
 
     @Override
-    public Accum createAccumulator() {
-      return new Accum();
+    public long[] addInput(long[] accumulator, Boolean input) {
+      if (Boolean.TRUE.equals(input)) {
+        countFn.addInput(accumulator, input);
+      }
+      return accumulator;
     }
 
     @Override
-    public Accum addInput(Accum accum, Boolean input) {
-      if (input) {
-        accum.isExpressionFalse = false;
-        accum.countIfResult += 1;
-      }
-      return accum;
+    public long[] mergeAccumulators(Iterable<long[]> accumulators) {
+      return countFn.mergeAccumulators(accumulators);
     }
 
     @Override
-    public Accum mergeAccumulators(Iterable<Accum> accums) {
-      CountIfFn.Accum merged = createAccumulator();
-      for (CountIfFn.Accum accum : accums) {
-        if (!accum.isExpressionFalse) {
-          merged.isExpressionFalse = false;
-          merged.countIfResult += accum.countIfResult;
-        }
-      }
-      return merged;
+    public Long extractOutput(long[] accumulator) {
+      return countFn.extractOutput(accumulator);
     }
 
     @Override
-    public Long extractOutput(Accum accum) {
-      if (!accum.isExpressionFalse) {
-        return accum.countIfResult;
-      }
-      return 0L;
+    public Coder<long[]> getAccumulatorCoder(CoderRegistry registry, 
Coder<Boolean> inputCoder)
+        throws CannotProvideCoderException {
+      return countFn.getAccumulatorCoder(registry, inputCoder);

Review comment:
       Technically changing coders here would break in-place update. But SQL 
really just cannot be relied on for that, since the optimizer might change. So 
I am just noting that I _explicitly_ say it is OK to break in-place update here.

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/agg/CountIf.java
##########
@@ -31,62 +30,41 @@
 public class CountIf {
   private CountIf() {}
 
-  public static CountIfFn combineFn() {
-    return new CountIf.CountIfFn();
+  public static Combine.CombineFn<Boolean, ?, Long> combineFn() {
+    return new CountIfFn();
   }
 
-  public static class CountIfFn extends Combine.CombineFn<Boolean, 
CountIfFn.Accum, Long> {
-
-    @AutoValue
-    public abstract static class Accum implements Serializable {
-      abstract boolean isExpressionFalse();
-
-      abstract long countIfResult();
-
-      static Accum empty() {
-        return of(true, 0L);
-      }
-
-      static Accum of(boolean isExpressionFalse, long countIfResult) {
-        return new AutoValue_CountIf_CountIfFn_Accum(isExpressionFalse, 
countIfResult);
-      }
-    }
+  public static class CountIfFn extends Combine.CombineFn<Boolean, long[], 
Long> {

Review comment:
       IMO using it via composition like this is preferable anyhow. You can see 
it is more flexible, since you can achieve all the same things without needing 
it to be public.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to