This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fc8f4e6  [BEAM-10925] Test Java UDF on columns
     new 16f757c  Merge pull request #13947 from amaliujia/rw-more-testing
fc8f4e6 is described below

commit fc8f4e630763bac6227e52a567cfe63052b4422f
Author: amaliujia <[email protected]>
AuthorDate: Tue Feb 9 22:39:16 2021 -0800

    [BEAM-10925] Test Java UDF on columns
---
 .../extensions/sql/zetasql/ZetaSqlJavaUdfTest.java  | 21 +++++++++++++++++++++
 1 file changed, 21 insertions(+)

diff --git 
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
index 34b1d23..9d118be 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
@@ -95,6 +95,27 @@ public class ZetaSqlJavaUdfTest extends ZetaSqlTestBase {
   }
 
   @Test
+  public void testJavaUdfColumnReference() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(Key) FROM KeyValue;",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(singleField).addValues(15L).build(),
+            Row.withSchema(singleField).addValues(16L).build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
   public void testNestedJavaUdf() {
     String sql =
         String.format(

Reply via email to