This is an automated email from the ASF dual-hosted git repository.

ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 02d603a  [BEAM-12123] Proactively reject unsupported types in Java 
UDFs.
     new 253bf38  Merge pull request #14464 from ibzib/BEAM-12123
02d603a is described below

commit 02d603acbfae43ef7025c7dab7e4a53d50f7e892
Author: Kyle Weaver <[email protected]>
AuthorDate: Wed Apr 7 11:09:55 2021 -0700

    [BEAM-12123] Proactively reject unsupported types in Java UDFs.
---
 .../extensions/sql/zetasql/BeamZetaSqlCatalog.java |  82 ++++++++++++++++-
 .../sql/zetasql/BeamZetaSqlCatalogTest.java        | 102 ++++++++++++++++++++-
 2 files changed, 179 insertions(+), 5 deletions(-)

diff --git 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
index a73a7ce..a2b9a8c 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
@@ -26,6 +26,7 @@ import com.google.zetasql.FunctionSignature;
 import com.google.zetasql.SimpleCatalog;
 import com.google.zetasql.TVFRelation;
 import com.google.zetasql.TableValuedFunction;
+import com.google.zetasql.Type;
 import com.google.zetasql.TypeFactory;
 import com.google.zetasql.ZetaSQLBuiltinFunctionOptions;
 import com.google.zetasql.ZetaSQLFunctions;
@@ -49,6 +50,7 @@ import 
org.apache.beam.sdk.extensions.sql.zetasql.translation.UserFunctionDefini
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.adapter.java.JavaTypeFactory;
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeField;
+import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.FunctionParameter;
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
 
@@ -134,6 +136,7 @@ public class BeamZetaSqlCatalog {
         sqlScalarUdfs.put(createFunctionStmt.getNamePath(), 
createFunctionStmt);
         break;
       case USER_DEFINED_JAVA_SCALAR_FUNCTIONS:
+        validateJavaUdf(createFunctionStmt);
         String jarPath = getJarPath(createFunctionStmt);
         ScalarFn scalarFn =
             javaUdfLoader.loadScalarFunction(createFunctionStmt.getNamePath(), 
jarPath);
@@ -156,6 +159,47 @@ public class BeamZetaSqlCatalog {
             ImmutableList.of(createFunctionStmt.getSignature())));
   }
 
+  void validateJavaUdf(ResolvedNodes.ResolvedCreateFunctionStmt 
createFunctionStmt) {
+    for (FunctionArgumentType argumentType :
+        createFunctionStmt.getSignature().getFunctionArgumentList()) {
+      Type type = argumentType.getType();
+      if (type == null) {
+        throw new UnsupportedOperationException("UDF templated argument types 
are not supported.");
+      }
+      validateJavaUdfZetaSqlType(type);
+    }
+    if (createFunctionStmt.getReturnType() == null) {
+      throw new IllegalArgumentException("UDF return type must not be null.");
+    }
+    validateJavaUdfZetaSqlType(createFunctionStmt.getReturnType());
+  }
+
+  /**
+   * Throws {@link UnsupportedOperationException} if ZetaSQL type is not 
supported in Java UDF.
+   * Supported types are a subset of the types supported by {@link 
BeamJavaUdfCalcRule}.
+   */
+  void validateJavaUdfZetaSqlType(Type type) {
+    switch (type.getKind()) {
+      case TYPE_INT64:
+      case TYPE_DOUBLE:
+      case TYPE_BOOL:
+      case TYPE_STRING:
+      case TYPE_BYTES:
+        // These types are supported.
+        break;
+      case TYPE_NUMERIC:
+      case TYPE_DATE:
+      case TYPE_TIME:
+      case TYPE_DATETIME:
+      case TYPE_TIMESTAMP:
+      case TYPE_ARRAY:
+      case TYPE_STRUCT:
+      default:
+        throw new UnsupportedOperationException(
+            "ZetaSQL type not allowed in Java UDF: " + type.getKind().name());
+    }
+  }
+
   void addTableValuedFunction(
       ResolvedNodes.ResolvedCreateTableFunctionStmt createTableFunctionStmt) {
     zetaSqlCatalog.addTableValuedFunction(
@@ -299,10 +343,12 @@ public class BeamZetaSqlCatalog {
           functions) {
         if (function instanceof ScalarFunctionImpl) {
           ScalarFunctionImpl scalarFunction = (ScalarFunctionImpl) function;
+          // Validate types before converting from Calcite to ZetaSQL, since 
the conversion may fail
+          // for unsupported types.
+          validateScalarFunctionImpl(scalarFunction);
           List<String> path = Arrays.asList(functionName.split("\\."));
           Method method = scalarFunction.method;
           javaScalarUdfs.put(path, 
UserFunctionDefinitions.JavaScalarFunction.create(method, ""));
-
           FunctionArgumentType resultType =
               new FunctionArgumentType(
                   ZetaSqlCalciteTranslationUtils.toZetaSqlType(
@@ -333,6 +379,40 @@ public class BeamZetaSqlCatalog {
     }
   }
 
+  private void validateScalarFunctionImpl(ScalarFunctionImpl scalarFunction) {
+    for (FunctionParameter parameter : scalarFunction.getParameters()) {
+      validateJavaUdfCalciteType(parameter.getType(typeFactory));
+    }
+    validateJavaUdfCalciteType(scalarFunction.getReturnType(typeFactory));
+  }
+
+  /**
+   * Throws {@link UnsupportedOperationException} if Calcite type is not 
supported in Java UDF.
+   * Supported types are a subset of the corresponding Calcite types supported 
by {@link
+   * BeamJavaUdfCalcRule}.
+   */
+  private void validateJavaUdfCalciteType(RelDataType type) {
+    switch (type.getSqlTypeName()) {
+      case BIGINT:
+      case DOUBLE:
+      case BOOLEAN:
+      case VARCHAR:
+      case VARBINARY:
+        // These types are supported.
+        break;
+      case DECIMAL:
+      case DATE:
+      case TIME:
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+      case TIMESTAMP:
+      case ARRAY:
+      case ROW:
+      default:
+        throw new UnsupportedOperationException(
+            "Calcite type not allowed in ZetaSQL Java UDF: " + 
type.getSqlTypeName().getName());
+    }
+  }
+
   private String getFunctionGroup(ResolvedNodes.ResolvedCreateFunctionStmt 
createFunctionStmt) {
     switch (createFunctionStmt.getLanguage().toUpperCase()) {
       case "JAVA":
diff --git 
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java
 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java
index 2619ec2..2196046 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java
@@ -21,7 +21,11 @@ import static 
org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCatalog.USER
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 
+import com.google.zetasql.Analyzer;
+import com.google.zetasql.AnalyzerOptions;
+import com.google.zetasql.resolvedast.ResolvedNodes;
 import java.lang.reflect.Method;
+import java.util.List;
 import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
 import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
@@ -32,24 +36,38 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
+/** Tests for {@link BeamZetaSqlCatalog}. */
 @RunWith(JUnit4.class)
 public class BeamZetaSqlCatalogTest {
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
   public static class IncrementFn implements BeamSqlUdf {
     public Long eval(Long i) {
       return i + 1;
     }
   }
 
+  public static class ReturnsArrayFn implements BeamSqlUdf {
+    public List<Long> eval() {
+      return ImmutableList.of(1L, 2L, 3L);
+    }
+  }
+
+  public static class TakesArrayFn implements BeamSqlUdf {
+    public Long eval(List<Long> ls) {
+      return 0L;
+    }
+  }
+
   @Test
   public void loadsUserDefinedFunctionsFromSchema() throws 
NoSuchMethodException {
-    JdbcConnection jdbcConnection =
-        JdbcDriver.connect(
-            new ReadOnlyTableProvider("empty_table_provider", 
ImmutableMap.of()),
-            PipelineOptionsFactory.create());
+    JdbcConnection jdbcConnection = createJdbcConnection();
     SchemaPlus calciteSchema = jdbcConnection.getCurrentSchemaPlus();
     Method method = IncrementFn.class.getMethod("eval", Long.class);
     calciteSchema.add("increment", ScalarFunctionImpl.create(method));
@@ -69,4 +87,80 @@ public class BeamZetaSqlCatalogTest {
             .javaScalarFunctions()
             .get(ImmutableList.of("increment")));
   }
+
+  @Test
+  public void rejectsScalarFunctionImplWithUnsupportedReturnType() throws 
NoSuchMethodException {
+    JdbcConnection jdbcConnection = createJdbcConnection();
+    SchemaPlus calciteSchema = jdbcConnection.getCurrentSchemaPlus();
+    Method method = ReturnsArrayFn.class.getMethod("eval");
+    calciteSchema.add("return_array", ScalarFunctionImpl.create(method));
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("Calcite type not allowed in ZetaSQL Java UDF: 
ARRAY");
+    BeamZetaSqlCatalog beamCatalog =
+        BeamZetaSqlCatalog.create(
+            calciteSchema, jdbcConnection.getTypeFactory(), 
SqlAnalyzer.baseAnalyzerOptions());
+  }
+
+  @Test
+  public void rejectsScalarFunctionImplWithUnsupportedParameterType() throws 
NoSuchMethodException {
+    JdbcConnection jdbcConnection = createJdbcConnection();
+    SchemaPlus calciteSchema = jdbcConnection.getCurrentSchemaPlus();
+    Method method = TakesArrayFn.class.getMethod("eval", List.class);
+    calciteSchema.add("take_array", ScalarFunctionImpl.create(method));
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("Calcite type not allowed in ZetaSQL Java UDF: 
ARRAY");
+    BeamZetaSqlCatalog beamCatalog =
+        BeamZetaSqlCatalog.create(
+            calciteSchema, jdbcConnection.getTypeFactory(), 
SqlAnalyzer.baseAnalyzerOptions());
+  }
+
+  @Test
+  public void rejectsCreateFunctionStmtWithUnsupportedReturnType() {
+    JdbcConnection jdbcConnection = createJdbcConnection();
+    AnalyzerOptions analyzerOptions = SqlAnalyzer.baseAnalyzerOptions();
+    BeamZetaSqlCatalog beamCatalog =
+        BeamZetaSqlCatalog.create(
+            jdbcConnection.getCurrentSchemaPlus(),
+            jdbcConnection.getTypeFactory(),
+            analyzerOptions);
+
+    String sql =
+        "CREATE FUNCTION foo() RETURNS ARRAY<INT64> LANGUAGE java OPTIONS 
(path='/does/not/exist');";
+    ResolvedNodes.ResolvedStatement resolvedStatement =
+        Analyzer.analyzeStatement(sql, analyzerOptions, 
beamCatalog.getZetaSqlCatalog());
+    ResolvedNodes.ResolvedCreateFunctionStmt createFunctionStmt =
+        (ResolvedNodes.ResolvedCreateFunctionStmt) resolvedStatement;
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("ZetaSQL type not allowed in Java UDF: TYPE_ARRAY");
+    beamCatalog.addFunction(createFunctionStmt);
+  }
+
+  @Test
+  public void rejectsCreateFunctionStmtWithUnsupportedParameterType() {
+    JdbcConnection jdbcConnection = createJdbcConnection();
+    AnalyzerOptions analyzerOptions = SqlAnalyzer.baseAnalyzerOptions();
+    BeamZetaSqlCatalog beamCatalog =
+        BeamZetaSqlCatalog.create(
+            jdbcConnection.getCurrentSchemaPlus(),
+            jdbcConnection.getTypeFactory(),
+            analyzerOptions);
+
+    String sql =
+        "CREATE FUNCTION foo(a ARRAY<INT64>) RETURNS INT64 LANGUAGE java 
OPTIONS (path='/does/not/exist');";
+    ResolvedNodes.ResolvedStatement resolvedStatement =
+        Analyzer.analyzeStatement(sql, analyzerOptions, 
beamCatalog.getZetaSqlCatalog());
+    ResolvedNodes.ResolvedCreateFunctionStmt createFunctionStmt =
+        (ResolvedNodes.ResolvedCreateFunctionStmt) resolvedStatement;
+
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("ZetaSQL type not allowed in Java UDF: TYPE_ARRAY");
+    beamCatalog.addFunction(createFunctionStmt);
+  }
+
+  private JdbcConnection createJdbcConnection() {
+    return JdbcDriver.connect(
+        new ReadOnlyTableProvider("empty_table_provider", ImmutableMap.of()),
+        PipelineOptionsFactory.create());
+  }
 }

Reply via email to