amaliujia commented on a change in pull request #12153:
URL: https://github.com/apache/beam/pull/12153#discussion_r448540223
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -107,6 +128,43 @@ ResolvedStatement analyze(String sql) {
return Analyzer.analyzeStatement(sql, options, catalog);
}
+ /**
+ * Accepts the ParseResumeLocation for the current position in the SQL
string. Advances the
+ * ParseResumeLocation to the start of the next statement. Adds user-defined
functions to the
+ * catalog for use in following statements. Returns the resolved AST.
+ */
+ ResolvedStatement analyzeNextStatement(
+ ParseResumeLocation parseResumeLocation, AnalyzerOptions options,
SimpleCatalog catalog) {
+ ResolvedStatement resolvedStatement =
+ Analyzer.analyzeNextStatement(parseResumeLocation, options, catalog);
+ if (resolvedStatement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) {
+ ResolvedCreateFunctionStmt createFunctionStmt =
+ (ResolvedCreateFunctionStmt) resolvedStatement;
+ Function userFunction =
+ new Function(
+ createFunctionStmt.getNamePath(),
+ USER_DEFINED_FUNCTIONS,
+ // TODO(BEAM-9954) handle aggregate functions
+ // TODO(BEAM-9969) handle table functions
+ Mode.SCALAR,
+
com.google.common.collect.ImmutableList.of(createFunctionStmt.getSignature()));
+ try {
+ catalog.addFunction(userFunction);
+ } catch (IllegalArgumentException e) {
+ throw new ParseException(
Review comment:
Why add a ParseException? Seems to me that throw
`IllegalArgumentException e` directly will give clear cause?
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPlannerImpl.java
##########
@@ -130,30 +135,57 @@ public RelRoot rel(SqlNode sqlNode) throws
RelConversionException {
}
public RelRoot rel(String sql, QueryParameters params) {
- this.cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory));
- this.expressionConverter = new ExpressionConverter(cluster, params);
+ RelOptCluster cluster = RelOptCluster.create(planner, new
RexBuilder(typeFactory));
QueryTrait trait = new QueryTrait();
- // Set up table providers that need to be pre-registered
- // TODO(https://issues.apache.org/jira/browse/BEAM-8817): share this logic
between dialects
- List<List<String>> tables = Analyzer.extractTableNamesFromStatement(sql);
- TableResolution.registerTables(this.defaultSchemaPlus, tables);
-
- ResolvedStatement statement =
+ SqlAnalyzer analyzer =
SqlAnalyzer.getBuilder()
.withQueryParams(params)
.withQueryTrait(trait)
.withCalciteContext(config.getContext())
.withTopLevelSchema(defaultSchemaPlus)
.withTypeFactory((JavaTypeFactory) cluster.getTypeFactory())
- .analyze(sql);
+ .build();
+
+ AnalyzerOptions options = SqlAnalyzer.initAnalyzerOptions(params);
+
+ // Set up table providers that need to be pre-registered
+ List<List<String>> tables = analyzer.extractTableNames(sql, options);
+ TableResolution.registerTables(this.defaultSchemaPlus, tables);
+ SimpleCatalog catalog =
+ analyzer.createPopulatedCatalog(defaultSchemaPlus.getName(), options,
tables);
+
+ ImmutableMap.Builder<String, ResolvedCreateFunctionStmt> udfBuilder =
ImmutableMap.builder();
+
+ ResolvedStatement statement;
+ ParseResumeLocation parseResumeLocation = new ParseResumeLocation(sql);
+ do {
+ statement = analyzer.analyzeNextStatement(parseResumeLocation, options,
catalog);
+ if (statement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) {
+ ResolvedCreateFunctionStmt createFunctionStmt =
(ResolvedCreateFunctionStmt) statement;
+ // ResolvedCreateFunctionStmt does not include the full function name,
so build it here.
Review comment:
Can you clarify what is "full function name"?
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
##########
@@ -920,8 +946,23 @@ private RexNode convertResolvedFunctionCall(
}
for (ResolvedExpr expr : functionCall.getArgumentList()) {
- operands.add(convertRexNodeFromResolvedExpr(expr, columnList,
fieldList));
+ operands.add(
+ convertRexNodeFromResolvedExpr(expr, columnList, fieldList,
outerFunctionArguments));
+ }
+ } else if (funGroup.equals(USER_DEFINED_FUNCTIONS)) {
Review comment:
Do you think for Java UDF, will this code path help?
##########
File path:
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##########
@@ -2841,6 +2842,129 @@ public void testSelectNullExceptAll() {
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
+ @Test
+ public void testMultipleSelectStatementsThrowsException() {
+ String sql = "SELECT 1; SELECT 2;";
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ thrown.expect(UnsupportedOperationException.class);
+ thrown.expectMessage(
+ "Statement list must end in a SELECT statement, and cannot contain
more than one SELECT statement.");
+ zetaSQLQueryPlanner.convertToBeamRel(sql);
+ }
+
+ @Test
+ public void testAlreadyDefinedUDFThrowsException() {
+ String sql = "CREATE FUNCTION foo() AS (0); CREATE FUNCTION foo() AS (1);
SELECT foo();";
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ thrown.expect(ParseException.class);
+ thrown.expectMessage("Failed to define function foo");
+ zetaSQLQueryPlanner.convertToBeamRel(sql);
+ }
+
+ @Test
+ public void testCreateFunctionNoSelectThrowsException() {
+ String sql = "CREATE FUNCTION plusOne(x INT64) AS (x + 1);";
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ thrown.expect(UnsupportedOperationException.class);
+ thrown.expectMessage("Statement list must end in a SELECT statement, not
CreateFunctionStmt");
+ zetaSQLQueryPlanner.convertToBeamRel(sql);
+ }
+
+ @Test
+ public void testNullaryUdf() {
+ String sql = "CREATE FUNCTION zero() AS (0); SELECT zero();";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+
Row.withSchema(Schema.builder().addInt64Field("x").build()).addValue(0L).build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testQualifiedNameUdfUnqualifiedCall() {
+ String sql = "CREATE FUNCTION foo.bar.baz() AS (\"uwu\"); SELECT baz();";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+
Row.withSchema(Schema.builder().addStringField("x").build()).addValue("uwu").build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ @Ignore("Qualified paths can't be resolved due to a bug in ZetaSQL.")
Review comment:
Any link to what this bug is (or log a JIRA to describe what has failed?)
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -107,6 +128,43 @@ ResolvedStatement analyze(String sql) {
return Analyzer.analyzeStatement(sql, options, catalog);
}
+ /**
+ * Accepts the ParseResumeLocation for the current position in the SQL
string. Advances the
+ * ParseResumeLocation to the start of the next statement. Adds user-defined
functions to the
+ * catalog for use in following statements. Returns the resolved AST.
+ */
+ ResolvedStatement analyzeNextStatement(
+ ParseResumeLocation parseResumeLocation, AnalyzerOptions options,
SimpleCatalog catalog) {
+ ResolvedStatement resolvedStatement =
+ Analyzer.analyzeNextStatement(parseResumeLocation, options, catalog);
+ if (resolvedStatement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) {
+ ResolvedCreateFunctionStmt createFunctionStmt =
+ (ResolvedCreateFunctionStmt) resolvedStatement;
+ Function userFunction =
+ new Function(
+ createFunctionStmt.getNamePath(),
+ USER_DEFINED_FUNCTIONS,
+ // TODO(BEAM-9954) handle aggregate functions
+ // TODO(BEAM-9969) handle table functions
+ Mode.SCALAR,
+
com.google.common.collect.ImmutableList.of(createFunctionStmt.getSignature()));
+ try {
+ catalog.addFunction(userFunction);
+ } catch (IllegalArgumentException e) {
+ throw new ParseException(
+ String.format(
+ "Failed to define function %s", String.join(".",
createFunctionStmt.getNamePath())),
+ e);
+ }
+ return resolvedStatement;
Review comment:
line 159 and line 161 are duplicates and they can be put at the end of
this function.
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPlannerImpl.java
##########
@@ -130,30 +135,58 @@ public RelRoot rel(SqlNode sqlNode) throws
RelConversionException {
}
public RelRoot rel(String sql, QueryParameters params) {
- this.cluster = RelOptCluster.create(planner, new RexBuilder(typeFactory));
- this.expressionConverter = new ExpressionConverter(cluster, params);
+ RelOptCluster cluster = RelOptCluster.create(planner, new
RexBuilder(typeFactory));
QueryTrait trait = new QueryTrait();
- // Set up table providers that need to be pre-registered
- // TODO(https://issues.apache.org/jira/browse/BEAM-8817): share this logic
between dialects
- List<List<String>> tables = Analyzer.extractTableNamesFromStatement(sql);
- TableResolution.registerTables(this.defaultSchemaPlus, tables);
-
- ResolvedStatement statement =
+ SqlAnalyzer analyzer =
SqlAnalyzer.getBuilder()
.withQueryParams(params)
.withQueryTrait(trait)
.withCalciteContext(config.getContext())
.withTopLevelSchema(defaultSchemaPlus)
.withTypeFactory((JavaTypeFactory) cluster.getTypeFactory())
- .analyze(sql);
+ .build();
+
+ AnalyzerOptions options = SqlAnalyzer.initAnalyzerOptions(params);
+
+ // Set up table providers that need to be pre-registered
+ List<List<String>> tables = analyzer.extractTableNames(sql, options);
+ TableResolution.registerTables(this.defaultSchemaPlus, tables);
+ SimpleCatalog catalog =
+ analyzer.createPopulatedCatalog(defaultSchemaPlus.getName(), options,
tables);
+
+ ImmutableMap.Builder<String, ResolvedCreateFunctionStmt> udfBuilder =
ImmutableMap.builder();
+
+ ResolvedStatement statement;
+ ParseResumeLocation parseResumeLocation = new ParseResumeLocation(sql);
+ do {
+ statement = analyzer.analyzeNextStatement(parseResumeLocation, options,
catalog);
+ if (statement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) {
+ ResolvedCreateFunctionStmt createFunctionStmt =
(ResolvedCreateFunctionStmt) statement;
+ // ResolvedCreateFunctionStmt does not include the full function name,
so build it here.
+ String functionFullName =
+ String.format(
+ "%s:%s",
+ SqlAnalyzer.USER_DEFINED_FUNCTIONS,
+ String.join(".", createFunctionStmt.getNamePath()));
+ udfBuilder.put(functionFullName, createFunctionStmt);
+ } else if (statement.nodeKind() == RESOLVED_QUERY_STMT) {
+ if (!SqlAnalyzer.isEndOfInput(parseResumeLocation)) {
Review comment:
I can tell line 185 and this line combined together to verify:
1. only one SELECT in a statement list.
2. that SELECT statement should be in the end of list.
But from readability perspective, neither one explicitly tests there are
more two SELECT in a list. I am afraid that for people who don't have context
to read code here, they could not get the one single SELECT constraint
(although it is implied implicitly).
My suggestion is you only validate `cannot contain more than one SELECT
statement` here and leave `Statement list must end in a SELECT statement` to
line 185.
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -107,6 +128,43 @@ ResolvedStatement analyze(String sql) {
return Analyzer.analyzeStatement(sql, options, catalog);
}
+ /**
+ * Accepts the ParseResumeLocation for the current position in the SQL
string. Advances the
+ * ParseResumeLocation to the start of the next statement. Adds user-defined
functions to the
+ * catalog for use in following statements. Returns the resolved AST.
+ */
+ ResolvedStatement analyzeNextStatement(
+ ParseResumeLocation parseResumeLocation, AnalyzerOptions options,
SimpleCatalog catalog) {
+ ResolvedStatement resolvedStatement =
+ Analyzer.analyzeNextStatement(parseResumeLocation, options, catalog);
+ if (resolvedStatement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) {
+ ResolvedCreateFunctionStmt createFunctionStmt =
+ (ResolvedCreateFunctionStmt) resolvedStatement;
+ Function userFunction =
+ new Function(
+ createFunctionStmt.getNamePath(),
+ USER_DEFINED_FUNCTIONS,
+ // TODO(BEAM-9954) handle aggregate functions
+ // TODO(BEAM-9969) handle table functions
+ Mode.SCALAR,
+
com.google.common.collect.ImmutableList.of(createFunctionStmt.getSignature()));
+ try {
+ catalog.addFunction(userFunction);
Review comment:
I wasn't aware that this addFunction can throw an exception. Any example
that could fail this line? E.g. adding a function with duplicated name?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]