[ 
https://issues.apache.org/jira/browse/BEAM-11389?focusedWorklogId=555404&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555404
 ]

ASF GitHub Bot logged work on BEAM-11389:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Feb/21 13:31
            Start Date: 21/Feb/21 13:31
    Worklog Time Spent: 10m 
      Work Description: sonam-vend commented on a change in pull request #13817:
URL: https://github.com/apache/beam/pull/13817#discussion_r579807894



##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/transform/BeamBuiltinAggregations.java
##########
@@ -495,4 +504,81 @@ public Long extractOutput(Long accumulator) {
       return accumulator;
     }
   }
+  /**
+   * Logical_And function implementation
+   *
+   * <p>Returns the logical AND of all non-NULL expressions. Returns NULL if 
there are zero input
+   * rows or expression evaluates to NULL for all rows.
+   */
+  public static class LogicalAnd extends CombineFn<Boolean, LogicalAnd.Accum, 
Boolean> {
+    static class Accum {
+      /** Initially, input is empty. */
+      boolean isEmpty = true;
+      /** true if any null value is seen in the input, null values are to be 
ignored. */
+      boolean isNull = false;
+      /** logical_and operation result. */
+      boolean logicalAnd = true;
+    }
+
+    @Override
+    public Accum createAccumulator() {
+      return new Accum();
+    }
+
+    @Override
+    public Accum addInput(Accum accum, Boolean input) {
+      /** when accum is empty and it sees null, it remains null */
+      if (accum.isEmpty && input == null) {
+        accum.isNull = true;
+        return accum;
+      }
+      /** when accum is null and it sees null, it remains null */
+      if (accum.isNull && input == null) {
+        accum.isNull = true;
+        return accum;
+      }
+      /** when accum is neither null and nor empty, it remains unchanged */
+      if (!accum.isNull && !accum.isEmpty && input == null) {
+        return accum;
+      }
+      /** when accum sees non-null value, accum becomes non-empty, non-null */
+      accum.isEmpty = false;
+      accum.isNull = false;
+      accum.logicalAnd = (accum.logicalAnd && input);
+      return accum;
+    }
+
+    @Override
+    public Accum mergeAccumulators(Iterable<Accum> accums) {
+      LogicalAnd.Accum merged = createAccumulator();
+
+      /** merged accum has isNull=true when all accums have isNull=true */
+      if (StreamSupport.stream(accums.spliterator(), false).allMatch(a -> 
a.isNull)) {

Review comment:
       @ibzib Thanks for this wonderful neat code.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 555404)
    Time Spent: 4h 40m  (was: 4.5h)

> Implement the LOGICAL_AND function for Beam SQL ZetaSQL dialect, as CombineFn.
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-11389
>                 URL: https://issues.apache.org/jira/browse/BEAM-11389
>             Project: Beam
>          Issue Type: Sub-task
>          Components: dsl-sql-zetasql
>            Reporter: Sonam Ramchand
>            Priority: P2
>          Time Spent: 4h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to