amaliujia commented on a change in pull request #13891:
URL: https://github.com/apache/beam/pull/13891#discussion_r569826561



##########
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPlannerImpl.java
##########
@@ -104,14 +106,30 @@ public RelRoot rel(String sql, QueryParameters params) {
     ImmutableMap.Builder<List<String>, ResolvedCreateFunctionStmt> udfBuilder =
         ImmutableMap.builder();
     ImmutableMap.Builder<List<String>, ResolvedNode> udtvfBuilder = 
ImmutableMap.builder();
+    ImmutableMap.Builder<List<String>, 
UserFunctionDefinitions.JavaScalarFunction>
+        javaScalarFunctionBuilder = ImmutableMap.builder();
+    JavaUdfLoader javaUdfLoader = new JavaUdfLoader();
 
     ResolvedStatement statement;
     ParseResumeLocation parseResumeLocation = new ParseResumeLocation(sql);
     do {
       statement = analyzer.analyzeNextStatement(parseResumeLocation, options, 
catalog);
       if (statement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) {
         ResolvedCreateFunctionStmt createFunctionStmt = 
(ResolvedCreateFunctionStmt) statement;
-        udfBuilder.put(createFunctionStmt.getNamePath(), createFunctionStmt);
+        String functionGroup = 
SqlAnalyzer.getFunctionGroup(createFunctionStmt);
+        if (SqlAnalyzer.USER_DEFINED_FUNCTIONS.equals(functionGroup)) {

Review comment:
       Can you remind that if if `USER_DEFINED_FUNCTIONS` here refers to 
SQL-native UDF? If so can you update ` USER_DEFINED_FUNCTIONS` to  
`USER_DEFINED_SQL_FUNCTIONS`?

##########
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import static org.junit.Assert.fail;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for ZetaSQL UDFs written in Java.
+ *
+ * <p>System properties <code>beam.sql.udf.test.jarpath</code> and <code>
+ * beam.sql.udf.test.empty_jar_path</code> must be set.
+ */
+@RunWith(JUnit4.class)
+public class ZetaSqlJavaUdfTest extends ZetaSqlTestBase {
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private final String jarPathProperty = "beam.sql.udf.test.jar_path";
+  private final String emptyJarPathProperty = 
"beam.sql.udf.test.empty_jar_path";
+
+  private final @Nullable String jarPath = System.getProperty(jarPathProperty);
+  private final @Nullable String emptyJarPath = 
System.getProperty(emptyJarPathProperty);
+
+  @Before
+  public void setUp() {
+    if (jarPath == null) {
+      fail(
+          String.format(
+              "System property %s must be set to run %s.",
+              jarPathProperty, ZetaSqlJavaUdfTest.class.getSimpleName()));
+    }
+    if (emptyJarPath == null) {
+      fail(
+          String.format(
+              "System property %s must be set to run %s.",
+              emptyJarPathProperty, ZetaSqlJavaUdfTest.class.getSimpleName()));
+    }
+    initialize();
+  }
+
+  @Test
+  public void testNullaryJavaUdf() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION helloWorld() RETURNS STRING LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT helloWorld();",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema singleField = Schema.builder().addStringField("field1").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(singleField).addValues("Hello 
world!").build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testNestedJavaUdf() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(increment(0));",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    
PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(2L).build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testUnexpectedNullArgumentThrowsRuntimeException() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("CalcFn failed to evaluate");

Review comment:
       Question: do you know where the exception is thrown?
   
   ```
     public static class IncrementFn extends ScalarFn {
       @ApplyMethod
       public Long increment(Long i) {
         return i + 1;
       }
     }
     ```
     The `increment` seems does not handle `NULL` at all. 
     

##########
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import static org.junit.Assert.fail;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for ZetaSQL UDFs written in Java.
+ *
+ * <p>System properties <code>beam.sql.udf.test.jarpath</code> and <code>
+ * beam.sql.udf.test.empty_jar_path</code> must be set.
+ */
+@RunWith(JUnit4.class)
+public class ZetaSqlJavaUdfTest extends ZetaSqlTestBase {
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private final String jarPathProperty = "beam.sql.udf.test.jar_path";
+  private final String emptyJarPathProperty = 
"beam.sql.udf.test.empty_jar_path";
+
+  private final @Nullable String jarPath = System.getProperty(jarPathProperty);
+  private final @Nullable String emptyJarPath = 
System.getProperty(emptyJarPathProperty);
+
+  @Before
+  public void setUp() {
+    if (jarPath == null) {
+      fail(
+          String.format(
+              "System property %s must be set to run %s.",
+              jarPathProperty, ZetaSqlJavaUdfTest.class.getSimpleName()));
+    }
+    if (emptyJarPath == null) {
+      fail(
+          String.format(
+              "System property %s must be set to run %s.",
+              emptyJarPathProperty, ZetaSqlJavaUdfTest.class.getSimpleName()));
+    }
+    initialize();
+  }
+
+  @Test
+  public void testNullaryJavaUdf() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION helloWorld() RETURNS STRING LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT helloWorld();",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema singleField = Schema.builder().addStringField("field1").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(singleField).addValues("Hello 
world!").build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testNestedJavaUdf() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(increment(0));",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    
PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(2L).build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testUnexpectedNullArgumentThrowsRuntimeException() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("CalcFn failed to evaluate");
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testExpectedNullArgument() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(s STRING) RETURNS BOOLEAN LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+
+    
PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(true).build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  /**
+   * This is a loophole in type checking. The SQL function signature does not 
need to match the Java
+   * function signature; only the generated code is typechecked.
+   */
+  // TODO(BEAM-11171): fix this and adjust test accordingly.
+  @Test
+  public void testNullArgumentIsNotTypeChecked() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+
+    
PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(true).build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testFunctionSignatureTypeMismatchFailsPipelineConstruction() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(i INT64) RETURNS BOOLEAN LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(0);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("Could not compile CalcFn");

Review comment:
       Sorry I have a hard time to understand why this test case has failed?

##########
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import static org.junit.Assert.fail;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for ZetaSQL UDFs written in Java.
+ *
+ * <p>System properties <code>beam.sql.udf.test.jarpath</code> and <code>
+ * beam.sql.udf.test.empty_jar_path</code> must be set.
+ */
+@RunWith(JUnit4.class)
+public class ZetaSqlJavaUdfTest extends ZetaSqlTestBase {
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private final String jarPathProperty = "beam.sql.udf.test.jar_path";
+  private final String emptyJarPathProperty = 
"beam.sql.udf.test.empty_jar_path";
+
+  private final @Nullable String jarPath = System.getProperty(jarPathProperty);
+  private final @Nullable String emptyJarPath = 
System.getProperty(emptyJarPathProperty);
+
+  @Before
+  public void setUp() {
+    if (jarPath == null) {
+      fail(
+          String.format(
+              "System property %s must be set to run %s.",
+              jarPathProperty, ZetaSqlJavaUdfTest.class.getSimpleName()));
+    }
+    if (emptyJarPath == null) {
+      fail(
+          String.format(
+              "System property %s must be set to run %s.",
+              emptyJarPathProperty, ZetaSqlJavaUdfTest.class.getSimpleName()));
+    }
+    initialize();
+  }
+
+  @Test
+  public void testNullaryJavaUdf() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION helloWorld() RETURNS STRING LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT helloWorld();",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema singleField = Schema.builder().addStringField("field1").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(singleField).addValues("Hello 
world!").build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testNestedJavaUdf() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(increment(0));",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    
PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(2L).build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testUnexpectedNullArgumentThrowsRuntimeException() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("CalcFn failed to evaluate");
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testExpectedNullArgument() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(s STRING) RETURNS BOOLEAN LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+
+    
PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(true).build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  /**
+   * This is a loophole in type checking. The SQL function signature does not 
need to match the Java
+   * function signature; only the generated code is typechecked.
+   */
+  // TODO(BEAM-11171): fix this and adjust test accordingly.

Review comment:
       +1 thanks for filing this JIRA.

##########
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlJavaUdfTest.java
##########
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import static org.junit.Assert.fail;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for ZetaSQL UDFs written in Java.
+ *
+ * <p>System properties <code>beam.sql.udf.test.jarpath</code> and <code>
+ * beam.sql.udf.test.empty_jar_path</code> must be set.
+ */
+@RunWith(JUnit4.class)
+public class ZetaSqlJavaUdfTest extends ZetaSqlTestBase {
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  private final String jarPathProperty = "beam.sql.udf.test.jar_path";
+  private final String emptyJarPathProperty = 
"beam.sql.udf.test.empty_jar_path";
+
+  private final @Nullable String jarPath = System.getProperty(jarPathProperty);
+  private final @Nullable String emptyJarPath = 
System.getProperty(emptyJarPathProperty);
+
+  @Before
+  public void setUp() {
+    if (jarPath == null) {
+      fail(
+          String.format(
+              "System property %s must be set to run %s.",
+              jarPathProperty, ZetaSqlJavaUdfTest.class.getSimpleName()));
+    }
+    if (emptyJarPath == null) {
+      fail(
+          String.format(
+              "System property %s must be set to run %s.",
+              emptyJarPathProperty, ZetaSqlJavaUdfTest.class.getSimpleName()));
+    }
+    initialize();
+  }
+
+  @Test
+  public void testNullaryJavaUdf() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION helloWorld() RETURNS STRING LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT helloWorld();",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema singleField = Schema.builder().addStringField("field1").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(Row.withSchema(singleField).addValues("Hello 
world!").build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testNestedJavaUdf() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(increment(0));",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema singleField = Schema.builder().addInt64Field("field1").build();
+
+    
PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(2L).build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testUnexpectedNullArgumentThrowsRuntimeException() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION increment(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT increment(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("CalcFn failed to evaluate");
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testExpectedNullArgument() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(s STRING) RETURNS BOOLEAN LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+
+    
PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(true).build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  /**
+   * This is a loophole in type checking. The SQL function signature does not 
need to match the Java
+   * function signature; only the generated code is typechecked.
+   */
+  // TODO(BEAM-11171): fix this and adjust test accordingly.
+  @Test
+  public void testNullArgumentIsNotTypeChecked() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(i INT64) RETURNS INT64 LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(NULL);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    Schema singleField = Schema.builder().addBooleanField("field1").build();
+
+    
PAssert.that(stream).containsInAnyOrder(Row.withSchema(singleField).addValues(true).build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testFunctionSignatureTypeMismatchFailsPipelineConstruction() {
+    String sql =
+        String.format(
+            "CREATE FUNCTION isNull(i INT64) RETURNS BOOLEAN LANGUAGE java "
+                + "OPTIONS (path='%s'); "
+                + "SELECT isNull(0);",
+            jarPath);
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("Could not compile CalcFn");
+    BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
+  }
+
+  @Test
+  public void testBinaryJavaUdf() {

Review comment:
       Can you link https://issues.apache.org/jira/browse/BEAM-11747 to here.
   
   We will need to figure out how to better handle the mixed case. To me the 
better way is to reject such cases before we implement Calc splitting.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to