ibzib commented on a change in pull request #12153:
URL: https://github.com/apache/beam/pull/12153#discussion_r448613591



##########
File path: 
sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLDialectSpecTest.java
##########
@@ -2841,6 +2842,129 @@ public void testSelectNullExceptAll() {
     
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
+  @Test
+  public void testMultipleSelectStatementsThrowsException() {
+    String sql = "SELECT 1; SELECT 2;";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage(
+        "Statement list must end in a SELECT statement, and cannot contain 
more than one SELECT statement.");
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
+  public void testAlreadyDefinedUDFThrowsException() {
+    String sql = "CREATE FUNCTION foo() AS (0); CREATE FUNCTION foo() AS (1); 
SELECT foo();";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(ParseException.class);
+    thrown.expectMessage("Failed to define function foo");
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
+  public void testCreateFunctionNoSelectThrowsException() {
+    String sql = "CREATE FUNCTION plusOne(x INT64) AS (x + 1);";
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    thrown.expect(UnsupportedOperationException.class);
+    thrown.expectMessage("Statement list must end in a SELECT statement, not 
CreateFunctionStmt");
+    zetaSQLQueryPlanner.convertToBeamRel(sql);
+  }
+
+  @Test
+  public void testNullaryUdf() {
+    String sql = "CREATE FUNCTION zero() AS (0); SELECT zero();";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            
Row.withSchema(Schema.builder().addInt64Field("x").build()).addValue(0L).build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testQualifiedNameUdfUnqualifiedCall() {
+    String sql = "CREATE FUNCTION foo.bar.baz() AS (\"uwu\"); SELECT baz();";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            
Row.withSchema(Schema.builder().addStringField("x").build()).addValue("uwu").build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  @Ignore("Qualified paths can't be resolved due to a bug in ZetaSQL.")

Review comment:
       I filed a public bug for this: 
https://github.com/google/zetasql/issues/42

##########
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -107,6 +128,43 @@ ResolvedStatement analyze(String sql) {
     return Analyzer.analyzeStatement(sql, options, catalog);
   }
 
+  /**
+   * Accepts the ParseResumeLocation for the current position in the SQL 
string. Advances the
+   * ParseResumeLocation to the start of the next statement. Adds user-defined 
functions to the
+   * catalog for use in following statements. Returns the resolved AST.
+   */
+  ResolvedStatement analyzeNextStatement(
+      ParseResumeLocation parseResumeLocation, AnalyzerOptions options, 
SimpleCatalog catalog) {
+    ResolvedStatement resolvedStatement =
+        Analyzer.analyzeNextStatement(parseResumeLocation, options, catalog);
+    if (resolvedStatement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) {
+      ResolvedCreateFunctionStmt createFunctionStmt =
+          (ResolvedCreateFunctionStmt) resolvedStatement;
+      Function userFunction =
+          new Function(
+              createFunctionStmt.getNamePath(),
+              USER_DEFINED_FUNCTIONS,
+              // TODO(BEAM-9954) handle aggregate functions
+              // TODO(BEAM-9969) handle table functions
+              Mode.SCALAR,
+              
com.google.common.collect.ImmutableList.of(createFunctionStmt.getSignature()));
+      try {
+        catalog.addFunction(userFunction);
+      } catch (IllegalArgumentException e) {
+        throw new ParseException(
+            String.format(
+                "Failed to define function %s", String.join(".", 
createFunctionStmt.getNamePath())),
+            e);
+      }
+      return resolvedStatement;

Review comment:
       done

##########
File path: 
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -107,6 +128,43 @@ ResolvedStatement analyze(String sql) {
     return Analyzer.analyzeStatement(sql, options, catalog);
   }
 
+  /**
+   * Accepts the ParseResumeLocation for the current position in the SQL 
string. Advances the
+   * ParseResumeLocation to the start of the next statement. Adds user-defined 
functions to the
+   * catalog for use in following statements. Returns the resolved AST.
+   */
+  ResolvedStatement analyzeNextStatement(
+      ParseResumeLocation parseResumeLocation, AnalyzerOptions options, 
SimpleCatalog catalog) {
+    ResolvedStatement resolvedStatement =
+        Analyzer.analyzeNextStatement(parseResumeLocation, options, catalog);
+    if (resolvedStatement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) {
+      ResolvedCreateFunctionStmt createFunctionStmt =
+          (ResolvedCreateFunctionStmt) resolvedStatement;
+      Function userFunction =
+          new Function(
+              createFunctionStmt.getNamePath(),
+              USER_DEFINED_FUNCTIONS,
+              // TODO(BEAM-9954) handle aggregate functions
+              // TODO(BEAM-9969) handle table functions
+              Mode.SCALAR,
+              
com.google.common.collect.ImmutableList.of(createFunctionStmt.getSignature()));
+      try {
+        catalog.addFunction(userFunction);
+      } catch (IllegalArgumentException e) {
+        throw new ParseException(

Review comment:
       see comment above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to