[ 
https://issues.apache.org/jira/browse/BEAM-11389?focusedWorklogId=555403&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-555403
 ]

ASF GitHub Bot logged work on BEAM-11389:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Feb/21 13:28
            Start Date: 21/Feb/21 13:28
    Worklog Time Spent: 10m 
      Work Description: sonam-vend commented on a change in pull request #13817:
URL: https://github.com/apache/beam/pull/13817#discussion_r579807532



##########
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
##########
@@ -4114,4 +4114,71 @@ public void testCountIfZetaSQLDialect() {
 
     
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
+
+  @Test
+  public void testLogicalAndZetaSQL() {
+    String sql = "SELECT LOGICAL_AND(x) AS logical_and  FROM UNNEST([true, 
false, true]) AS x";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+    
PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(false).build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testLogicalAndZetaSQLWithAllTrueValues() {
+    String sql = "SELECT LOGICAL_AND(x) AS logical_and  FROM UNNEST([true, 
true, true]) AS x";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+    
PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(true).build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testLogicalAndZetaSQLWithAllFalseValues() {
+    String sql = "SELECT LOGICAL_AND(x) AS logical_and  FROM UNNEST([false, 
false, false]) AS x";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+    
PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(false).build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testLogicalAndWithNullAndNoNullInput() throws Exception {
+    String sql = "SELECT LOGICAL_AND(x) AS logical_and FROM UNNEST([false, 
null, true]) AS x";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema schema = Schema.builder().addNullableField("bool_col", 
FieldType.BOOLEAN).build();
+    
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(false).build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testLogicalAndWithAllNullInputs() {
+    String sql =
+        "SELECT LOGICAL_AND(x) AS logical_and FROM UNNEST([CAST(null AS bool), 
null, null]) AS x";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema schema = Schema.builder().addNullableField("bool_col", 
FieldType.BOOLEAN).build();
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(schema).addValues((Boolean) 
null).build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
 }

Review comment:
       @ibzib I too have added `testLogicalAndWithEmptyInput`. Hope we can 
merge this PR after [BEAM-9514]( 
https://issues.apache.org/jira/browse/BEAM-9514) is resolved.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 555403)
    Time Spent: 4.5h  (was: 4h 20m)

> Implement the LOGICAL_AND function for Beam SQL ZetaSQL dialect, as CombineFn.
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-11389
>                 URL: https://issues.apache.org/jira/browse/BEAM-11389
>             Project: Beam
>          Issue Type: Sub-task
>          Components: dsl-sql-zetasql
>            Reporter: Sonam Ramchand
>            Priority: P2
>          Time Spent: 4.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to