This is an automated email from the ASF dual-hosted git repository.
ibzib pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 02d603a [BEAM-12123] Proactively reject unsupported types in Java
UDFs.
new 253bf38 Merge pull request #14464 from ibzib/BEAM-12123
02d603a is described below
commit 02d603acbfae43ef7025c7dab7e4a53d50f7e892
Author: Kyle Weaver <[email protected]>
AuthorDate: Wed Apr 7 11:09:55 2021 -0700
[BEAM-12123] Proactively reject unsupported types in Java UDFs.
---
.../extensions/sql/zetasql/BeamZetaSqlCatalog.java | 82 ++++++++++++++++-
.../sql/zetasql/BeamZetaSqlCatalogTest.java | 102 ++++++++++++++++++++-
2 files changed, 179 insertions(+), 5 deletions(-)
diff --git
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
index a73a7ce..a2b9a8c 100644
---
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
+++
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalog.java
@@ -26,6 +26,7 @@ import com.google.zetasql.FunctionSignature;
import com.google.zetasql.SimpleCatalog;
import com.google.zetasql.TVFRelation;
import com.google.zetasql.TableValuedFunction;
+import com.google.zetasql.Type;
import com.google.zetasql.TypeFactory;
import com.google.zetasql.ZetaSQLBuiltinFunctionOptions;
import com.google.zetasql.ZetaSQLFunctions;
@@ -49,6 +50,7 @@ import
org.apache.beam.sdk.extensions.sql.zetasql.translation.UserFunctionDefini
import
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.adapter.java.JavaTypeFactory;
import
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataType;
import
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.rel.type.RelDataTypeField;
+import
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.FunctionParameter;
import
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -134,6 +136,7 @@ public class BeamZetaSqlCatalog {
sqlScalarUdfs.put(createFunctionStmt.getNamePath(),
createFunctionStmt);
break;
case USER_DEFINED_JAVA_SCALAR_FUNCTIONS:
+ validateJavaUdf(createFunctionStmt);
String jarPath = getJarPath(createFunctionStmt);
ScalarFn scalarFn =
javaUdfLoader.loadScalarFunction(createFunctionStmt.getNamePath(),
jarPath);
@@ -156,6 +159,47 @@ public class BeamZetaSqlCatalog {
ImmutableList.of(createFunctionStmt.getSignature())));
}
+ void validateJavaUdf(ResolvedNodes.ResolvedCreateFunctionStmt
createFunctionStmt) {
+ for (FunctionArgumentType argumentType :
+ createFunctionStmt.getSignature().getFunctionArgumentList()) {
+ Type type = argumentType.getType();
+ if (type == null) {
+ throw new UnsupportedOperationException("UDF templated argument types
are not supported.");
+ }
+ validateJavaUdfZetaSqlType(type);
+ }
+ if (createFunctionStmt.getReturnType() == null) {
+ throw new IllegalArgumentException("UDF return type must not be null.");
+ }
+ validateJavaUdfZetaSqlType(createFunctionStmt.getReturnType());
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException} if ZetaSQL type is not
supported in Java UDF.
+ * Supported types are a subset of the types supported by {@link
BeamJavaUdfCalcRule}.
+ */
+ void validateJavaUdfZetaSqlType(Type type) {
+ switch (type.getKind()) {
+ case TYPE_INT64:
+ case TYPE_DOUBLE:
+ case TYPE_BOOL:
+ case TYPE_STRING:
+ case TYPE_BYTES:
+ // These types are supported.
+ break;
+ case TYPE_NUMERIC:
+ case TYPE_DATE:
+ case TYPE_TIME:
+ case TYPE_DATETIME:
+ case TYPE_TIMESTAMP:
+ case TYPE_ARRAY:
+ case TYPE_STRUCT:
+ default:
+ throw new UnsupportedOperationException(
+ "ZetaSQL type not allowed in Java UDF: " + type.getKind().name());
+ }
+ }
+
void addTableValuedFunction(
ResolvedNodes.ResolvedCreateTableFunctionStmt createTableFunctionStmt) {
zetaSqlCatalog.addTableValuedFunction(
@@ -299,10 +343,12 @@ public class BeamZetaSqlCatalog {
functions) {
if (function instanceof ScalarFunctionImpl) {
ScalarFunctionImpl scalarFunction = (ScalarFunctionImpl) function;
+ // Validate types before converting from Calcite to ZetaSQL, since
the conversion may fail
+ // for unsupported types.
+ validateScalarFunctionImpl(scalarFunction);
List<String> path = Arrays.asList(functionName.split("\\."));
Method method = scalarFunction.method;
javaScalarUdfs.put(path,
UserFunctionDefinitions.JavaScalarFunction.create(method, ""));
-
FunctionArgumentType resultType =
new FunctionArgumentType(
ZetaSqlCalciteTranslationUtils.toZetaSqlType(
@@ -333,6 +379,40 @@ public class BeamZetaSqlCatalog {
}
}
+ private void validateScalarFunctionImpl(ScalarFunctionImpl scalarFunction) {
+ for (FunctionParameter parameter : scalarFunction.getParameters()) {
+ validateJavaUdfCalciteType(parameter.getType(typeFactory));
+ }
+ validateJavaUdfCalciteType(scalarFunction.getReturnType(typeFactory));
+ }
+
+ /**
+ * Throws {@link UnsupportedOperationException} if Calcite type is not
supported in Java UDF.
+ * Supported types are a subset of the corresponding Calcite types supported
by {@link
+ * BeamJavaUdfCalcRule}.
+ */
+ private void validateJavaUdfCalciteType(RelDataType type) {
+ switch (type.getSqlTypeName()) {
+ case BIGINT:
+ case DOUBLE:
+ case BOOLEAN:
+ case VARCHAR:
+ case VARBINARY:
+ // These types are supported.
+ break;
+ case DECIMAL:
+ case DATE:
+ case TIME:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ case TIMESTAMP:
+ case ARRAY:
+ case ROW:
+ default:
+ throw new UnsupportedOperationException(
+ "Calcite type not allowed in ZetaSQL Java UDF: " +
type.getSqlTypeName().getName());
+ }
+ }
+
private String getFunctionGroup(ResolvedNodes.ResolvedCreateFunctionStmt
createFunctionStmt) {
switch (createFunctionStmt.getLanguage().toUpperCase()) {
case "JAVA":
diff --git
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java
index 2619ec2..2196046 100644
---
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java
+++
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCatalogTest.java
@@ -21,7 +21,11 @@ import static
org.apache.beam.sdk.extensions.sql.zetasql.BeamZetaSqlCatalog.USER
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import com.google.zetasql.Analyzer;
+import com.google.zetasql.AnalyzerOptions;
+import com.google.zetasql.resolvedast.ResolvedNodes;
import java.lang.reflect.Method;
+import java.util.List;
import org.apache.beam.sdk.extensions.sql.BeamSqlUdf;
import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
@@ -32,24 +36,38 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
import
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.schema.SchemaPlus;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+/** Tests for {@link BeamZetaSqlCatalog}. */
@RunWith(JUnit4.class)
public class BeamZetaSqlCatalogTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
public static class IncrementFn implements BeamSqlUdf {
public Long eval(Long i) {
return i + 1;
}
}
+ public static class ReturnsArrayFn implements BeamSqlUdf {
+ public List<Long> eval() {
+ return ImmutableList.of(1L, 2L, 3L);
+ }
+ }
+
+ public static class TakesArrayFn implements BeamSqlUdf {
+ public Long eval(List<Long> ls) {
+ return 0L;
+ }
+ }
+
@Test
public void loadsUserDefinedFunctionsFromSchema() throws
NoSuchMethodException {
- JdbcConnection jdbcConnection =
- JdbcDriver.connect(
- new ReadOnlyTableProvider("empty_table_provider",
ImmutableMap.of()),
- PipelineOptionsFactory.create());
+ JdbcConnection jdbcConnection = createJdbcConnection();
SchemaPlus calciteSchema = jdbcConnection.getCurrentSchemaPlus();
Method method = IncrementFn.class.getMethod("eval", Long.class);
calciteSchema.add("increment", ScalarFunctionImpl.create(method));
@@ -69,4 +87,80 @@ public class BeamZetaSqlCatalogTest {
.javaScalarFunctions()
.get(ImmutableList.of("increment")));
}
+
+ @Test
+ public void rejectsScalarFunctionImplWithUnsupportedReturnType() throws
NoSuchMethodException {
+ JdbcConnection jdbcConnection = createJdbcConnection();
+ SchemaPlus calciteSchema = jdbcConnection.getCurrentSchemaPlus();
+ Method method = ReturnsArrayFn.class.getMethod("eval");
+ calciteSchema.add("return_array", ScalarFunctionImpl.create(method));
+ thrown.expect(UnsupportedOperationException.class);
+ thrown.expectMessage("Calcite type not allowed in ZetaSQL Java UDF:
ARRAY");
+ BeamZetaSqlCatalog beamCatalog =
+ BeamZetaSqlCatalog.create(
+ calciteSchema, jdbcConnection.getTypeFactory(),
SqlAnalyzer.baseAnalyzerOptions());
+ }
+
+ @Test
+ public void rejectsScalarFunctionImplWithUnsupportedParameterType() throws
NoSuchMethodException {
+ JdbcConnection jdbcConnection = createJdbcConnection();
+ SchemaPlus calciteSchema = jdbcConnection.getCurrentSchemaPlus();
+ Method method = TakesArrayFn.class.getMethod("eval", List.class);
+ calciteSchema.add("take_array", ScalarFunctionImpl.create(method));
+ thrown.expect(UnsupportedOperationException.class);
+ thrown.expectMessage("Calcite type not allowed in ZetaSQL Java UDF:
ARRAY");
+ BeamZetaSqlCatalog beamCatalog =
+ BeamZetaSqlCatalog.create(
+ calciteSchema, jdbcConnection.getTypeFactory(),
SqlAnalyzer.baseAnalyzerOptions());
+ }
+
+ @Test
+ public void rejectsCreateFunctionStmtWithUnsupportedReturnType() {
+ JdbcConnection jdbcConnection = createJdbcConnection();
+ AnalyzerOptions analyzerOptions = SqlAnalyzer.baseAnalyzerOptions();
+ BeamZetaSqlCatalog beamCatalog =
+ BeamZetaSqlCatalog.create(
+ jdbcConnection.getCurrentSchemaPlus(),
+ jdbcConnection.getTypeFactory(),
+ analyzerOptions);
+
+ String sql =
+ "CREATE FUNCTION foo() RETURNS ARRAY<INT64> LANGUAGE java OPTIONS
(path='/does/not/exist');";
+ ResolvedNodes.ResolvedStatement resolvedStatement =
+ Analyzer.analyzeStatement(sql, analyzerOptions,
beamCatalog.getZetaSqlCatalog());
+ ResolvedNodes.ResolvedCreateFunctionStmt createFunctionStmt =
+ (ResolvedNodes.ResolvedCreateFunctionStmt) resolvedStatement;
+
+ thrown.expect(UnsupportedOperationException.class);
+ thrown.expectMessage("ZetaSQL type not allowed in Java UDF: TYPE_ARRAY");
+ beamCatalog.addFunction(createFunctionStmt);
+ }
+
+ @Test
+ public void rejectsCreateFunctionStmtWithUnsupportedParameterType() {
+ JdbcConnection jdbcConnection = createJdbcConnection();
+ AnalyzerOptions analyzerOptions = SqlAnalyzer.baseAnalyzerOptions();
+ BeamZetaSqlCatalog beamCatalog =
+ BeamZetaSqlCatalog.create(
+ jdbcConnection.getCurrentSchemaPlus(),
+ jdbcConnection.getTypeFactory(),
+ analyzerOptions);
+
+ String sql =
+ "CREATE FUNCTION foo(a ARRAY<INT64>) RETURNS INT64 LANGUAGE java
OPTIONS (path='/does/not/exist');";
+ ResolvedNodes.ResolvedStatement resolvedStatement =
+ Analyzer.analyzeStatement(sql, analyzerOptions,
beamCatalog.getZetaSqlCatalog());
+ ResolvedNodes.ResolvedCreateFunctionStmt createFunctionStmt =
+ (ResolvedNodes.ResolvedCreateFunctionStmt) resolvedStatement;
+
+ thrown.expect(UnsupportedOperationException.class);
+ thrown.expectMessage("ZetaSQL type not allowed in Java UDF: TYPE_ARRAY");
+ beamCatalog.addFunction(createFunctionStmt);
+ }
+
+ private JdbcConnection createJdbcConnection() {
+ return JdbcDriver.connect(
+ new ReadOnlyTableProvider("empty_table_provider", ImmutableMap.of()),
+ PipelineOptionsFactory.create());
+ }
}