ibzib commented on a change in pull request #14035:
URL: https://github.com/apache/beam/pull/14035#discussion_r589663990



##########
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -92,6 +93,14 @@
           x -> createTypeFactory().createArrayType(x.getOperandType(0), -1),
           new UdafImpl<>(new ArrayAgg.ArrayAggArray()));
 
+  public static final SqlOperator ARRAY_CONCAT_AGG_FN =
+      createUdafOperator(
+          "array_concat_agg",
+          x ->
+              createTypeFactory()
+                  
.createArrayType(createTypeFactory().createSqlType(SqlTypeName.BIGINT), -1),

Review comment:
       You need to get the array element type from `x`. Otherwise this function 
will only work for integers.

##########
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
##########
@@ -3843,4 +3847,27 @@ public void testArrayAggZetasql() {
 
     
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
+
+  @Test
+  public void testArrayConcatAggZetasql() {
+    String sql =
+        "WITH aggregate_example AS (SELECT [1,2] AS numbers  UNION ALL SELECT 
[3,4] AS numbers UNION ALL SELECT [5, 6] AS numbers) SELECT 
ARRAY_CONCAT_AGG(numbers) AS count_to_six_agg FROM aggregate_example";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+    PAssert.thatSingleton(stream)
+        .satisfies(
+            row -> {
+              Collection<Object> output = row.getArray("count_to_six_agg");
+              HashSet<Object> outputSet = new HashSet<Object>(output);
+
+              HashSet<Object> expectedOutputSet = Sets.newHashSet(1L, 2L, 3L, 
4L, 5L, 6L);
+
+              assertThat("array_field", expectedOutputSet.equals(outputSet));

Review comment:
       Use `containsInAnyOrder` instead of building and comparing sets. It's 
simpler. 
http://hamcrest.org/JavaHamcrest/javadoc/1.3/org/hamcrest/Matchers.html#containsInAnyOrder(T...)

##########
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
##########
@@ -3843,4 +3847,27 @@ public void testArrayAggZetasql() {
 
     
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
+
+  @Test
+  public void testArrayConcatAggZetasql() {
+    String sql =
+        "WITH aggregate_example AS (SELECT [1,2] AS numbers  UNION ALL SELECT 
[3,4] AS numbers UNION ALL SELECT [5, 6] AS numbers) SELECT 
ARRAY_CONCAT_AGG(numbers) AS count_to_six_agg FROM aggregate_example";

Review comment:
       Break this query string into multiple lines.

##########
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -266,7 +275,7 @@ public SqlSyntax getSyntax() {
     };
   }
 
-  private static RelDataType createSqlType(SqlTypeName typeName, boolean 
withNullability) {
+  public static RelDataType createSqlType(SqlTypeName typeName, boolean 
withNullability) {

Review comment:
       Why does this need to be public?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to