ibzib commented on a change in pull request #13817:
URL: https://github.com/apache/beam/pull/13817#discussion_r565759717



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##########
@@ -453,4 +462,57 @@ public Long extractOutput(Accum accum) {
       return accum.bitAnd;
     }
   }
+
+  public static class LogicalAnd extends CombineFn<Boolean, LogicalAnd.Accum, 
Boolean> {

Review comment:
       Please add a javadoc comment for this class. You can just use the 
[ZetaSQL function 
specification](https://github.com/google/zetasql/blob/master/docs/functions-and-operators.md#logical_and):
   
   "Returns the logical AND of all non-NULL expressions. Returns NULL if there 
are zero input rows or expression evaluates to NULL for all rows."

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##########
@@ -453,4 +462,57 @@ public Long extractOutput(Accum accum) {
       return accum.bitAnd;
     }
   }
+
+  public static class LogicalAnd extends CombineFn<Boolean, LogicalAnd.Accum, 
Boolean> {
+    static class Accum {
+      // Initially, input is empty
+      boolean isEmpty = true;
+      // true if any null value is seen in the input, null values are to be 
ignored
+      boolean isNull = false;
+      // logical_and operation result
+      boolean logicalAnd = Boolean.parseBoolean(null);
+    }
+
+    @Override
+    public Accum createAccumulator() {
+      return new Accum();
+    }
+
+    @Override
+    public Accum addInput(Accum accum, Boolean input) {
+      if (input == null) {
+        accum.isNull = true;
+        accum.isEmpty = false;
+        return accum;
+      }
+      accum.isEmpty = false;
+      accum.logicalAnd = accum.logicalAnd && input;
+      return accum;
+    }
+
+    @Override
+    public Accum mergeAccumulators(Iterable<Accum> accums) {
+      LogicalAnd.Accum merged = createAccumulator();
+      for (LogicalAnd.Accum accum : accums) {
+        if (accum.isNull) {
+          // ignore null case
+          continue;

Review comment:
       `testLogicalAndWithNullAndNoNullInput` is failing because this logic is 
incorrect. This logic looks to be partially copied from `BitAnd`, which handles 
nulls completely differently. In `BitAnd`, we return null if _any_ values were 
null; in `LogicalAnd`, we want to return null only if _all_ values were null.
   
   To fix this, you will need to fix both `addInput` and `mergeAccumulators` to 
properly handle nulls.

##########
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
##########
@@ -4067,4 +4067,46 @@ public void testSimpleTableName() {
             Row.withSchema(singleField).addValues(15L).build());
     
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
+
+  @Test
+  public void testLogicalAndZetaSQL() {
+    String sql = "SELECT LOGICAL_AND(x) AS logical_and  FROM UNNEST([true, 
false, true]) AS x";

Review comment:
       You should also test [true, true] and [false, false]. (I'd also prefer 
if you cut down on code duplication by using a single [parameterized 
test](https://junit.org/junit4/javadoc/4.12/org/junit/runners/Parameterized.html)
 for all these cases.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to