apilloud commented on a change in pull request #13891:
URL: https://github.com/apache/beam/pull/13891#discussion_r569811118
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -127,6 +144,53 @@ static boolean isEndOfInput(ParseResumeLocation
parseResumeLocation) {
return tables.build();
}
+ /** Returns the fully qualified name of the function defined in the
statement. */
+ static String getFunctionQualifiedName(ResolvedCreateFunctionStmt
createFunctionStmt) {
Review comment:
This looks unused.
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -127,6 +144,53 @@ static boolean isEndOfInput(ParseResumeLocation
parseResumeLocation) {
return tables.build();
}
+ /** Returns the fully qualified name of the function defined in the
statement. */
+ static String getFunctionQualifiedName(ResolvedCreateFunctionStmt
createFunctionStmt) {
+ return String.format(
+ "%s:%s",
+ getFunctionGroup(createFunctionStmt), String.join(".",
createFunctionStmt.getNamePath()));
+ }
+
+ static String getFunctionGroup(ResolvedCreateFunctionStmt
createFunctionStmt) {
+ switch (createFunctionStmt.getLanguage().toUpperCase()) {
+ case "JAVA":
+ if (createFunctionStmt.getIsAggregate()) {
+ throw new UnsupportedOperationException(
+ "Java SQL aggregate functions are not supported (BEAM-10925).");
+ }
+ return USER_DEFINED_JAVA_SCALAR_FUNCTIONS;
+ case "SQL":
+ if (createFunctionStmt.getIsAggregate()) {
+ throw new UnsupportedOperationException(
+ "Native SQL aggregate functions are not supported (BEAM-9954).");
+ }
+ return USER_DEFINED_FUNCTIONS;
+ case "PY":
+ case "PYTHON":
+ case "JS":
+ case "JAVASCRIPT":
+ throw new UnsupportedOperationException(
+ String.format(
+ "Function %s uses unsupported language %s.",
+ String.join(".", createFunctionStmt.getNamePath()),
+ createFunctionStmt.getLanguage()));
+ default:
+ throw new IllegalArgumentException(
+ String.format(
+ "Function %s uses unrecognized language %s.",
+ String.join(".", createFunctionStmt.getNamePath()),
+ createFunctionStmt.getLanguage()));
+ }
+ }
+
+ private Function createFunction(ResolvedCreateFunctionStmt
createFunctionStmt) {
Review comment:
It seems like you bypassed a few abstraction layers here. Probably
`ResolvedCreateFunctionStmt` should add a udf to the `TableProvider` (or an
equivalent for UDFs). For an example, see CREATE EXTERNAL TABLE in the calcite
dialect:
https://github.com/apache/beam/blob/3bb232fb098700de408f574585dfe74bbaff7230/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java#L136
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -127,6 +144,53 @@ static boolean isEndOfInput(ParseResumeLocation
parseResumeLocation) {
return tables.build();
}
+ /** Returns the fully qualified name of the function defined in the
statement. */
+ static String getFunctionQualifiedName(ResolvedCreateFunctionStmt
createFunctionStmt) {
+ return String.format(
+ "%s:%s",
+ getFunctionGroup(createFunctionStmt), String.join(".",
createFunctionStmt.getNamePath()));
+ }
+
+ static String getFunctionGroup(ResolvedCreateFunctionStmt
createFunctionStmt) {
+ switch (createFunctionStmt.getLanguage().toUpperCase()) {
+ case "JAVA":
+ if (createFunctionStmt.getIsAggregate()) {
+ throw new UnsupportedOperationException(
+ "Java SQL aggregate functions are not supported (BEAM-10925).");
+ }
+ return USER_DEFINED_JAVA_SCALAR_FUNCTIONS;
+ case "SQL":
+ if (createFunctionStmt.getIsAggregate()) {
+ throw new UnsupportedOperationException(
+ "Native SQL aggregate functions are not supported (BEAM-9954).");
+ }
+ return USER_DEFINED_FUNCTIONS;
+ case "PY":
Review comment:
I'm curious as to where these came from. Is there another engine that
supports these constants?
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -115,6 +116,22 @@ static boolean isEndOfInput(ParseResumeLocation
parseResumeLocation) {
>= parseResumeLocation.getInput().getBytes(UTF_8).length;
}
+ static String getOptionStringValue(
+ ResolvedCreateFunctionStmt createFunctionStmt, String optionName) {
+ for (ResolvedNodes.ResolvedOption option :
createFunctionStmt.getOptionList()) {
+ if (option.getName().equals(optionName)) {
+ if (option.getValue().getType().getKind() != TypeKind.TYPE_STRING) {
Review comment:
`getValue` can return null here. I didn't check the other two, but you
can find a copy of the generated `ResolvedNodes.java` in internal code search.
https://github.com/google/zetasql/blob/862a192a6da487757e860166a9666120b16773f5/java/com/google/zetasql/resolvedast/ResolvedNodes.java.template#L295
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -115,6 +116,22 @@ static boolean isEndOfInput(ParseResumeLocation
parseResumeLocation) {
>= parseResumeLocation.getInput().getBytes(UTF_8).length;
}
+ static String getOptionStringValue(
+ ResolvedCreateFunctionStmt createFunctionStmt, String optionName) {
+ for (ResolvedNodes.ResolvedOption option :
createFunctionStmt.getOptionList()) {
+ if (option.getName().equals(optionName)) {
Review comment:
How about `optionName.equals(option.getName())`. That will avoid
potential crashes if `getName` returns null.
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SqlAnalyzer.java
##########
@@ -115,6 +116,22 @@ static boolean isEndOfInput(ParseResumeLocation
parseResumeLocation) {
>= parseResumeLocation.getInput().getBytes(UTF_8).length;
}
+ static String getOptionStringValue(
Review comment:
nit: This method appears to be used exactly once in another file. It
should go right next to the method that calls it.
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPlannerImpl.java
##########
@@ -104,14 +106,30 @@ public RelRoot rel(String sql, QueryParameters params) {
ImmutableMap.Builder<List<String>, ResolvedCreateFunctionStmt> udfBuilder =
ImmutableMap.builder();
ImmutableMap.Builder<List<String>, ResolvedNode> udtvfBuilder =
ImmutableMap.builder();
+ ImmutableMap.Builder<List<String>,
UserFunctionDefinitions.JavaScalarFunction>
+ javaScalarFunctionBuilder = ImmutableMap.builder();
+ JavaUdfLoader javaUdfLoader = new JavaUdfLoader();
ResolvedStatement statement;
ParseResumeLocation parseResumeLocation = new ParseResumeLocation(sql);
do {
statement = analyzer.analyzeNextStatement(parseResumeLocation, options,
catalog);
if (statement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) {
ResolvedCreateFunctionStmt createFunctionStmt =
(ResolvedCreateFunctionStmt) statement;
- udfBuilder.put(createFunctionStmt.getNamePath(), createFunctionStmt);
+ String functionGroup =
SqlAnalyzer.getFunctionGroup(createFunctionStmt);
+ if (SqlAnalyzer.USER_DEFINED_FUNCTIONS.equals(functionGroup)) {
Review comment:
nit: switch/case is better than if for this pattern if your string isn't
null.
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSQLPlannerImpl.java
##########
@@ -104,14 +106,30 @@ public RelRoot rel(String sql, QueryParameters params) {
ImmutableMap.Builder<List<String>, ResolvedCreateFunctionStmt> udfBuilder =
ImmutableMap.builder();
ImmutableMap.Builder<List<String>, ResolvedNode> udtvfBuilder =
ImmutableMap.builder();
+ ImmutableMap.Builder<List<String>,
UserFunctionDefinitions.JavaScalarFunction>
+ javaScalarFunctionBuilder = ImmutableMap.builder();
+ JavaUdfLoader javaUdfLoader = new JavaUdfLoader();
ResolvedStatement statement;
ParseResumeLocation parseResumeLocation = new ParseResumeLocation(sql);
do {
statement = analyzer.analyzeNextStatement(parseResumeLocation, options,
catalog);
if (statement.nodeKind() == RESOLVED_CREATE_FUNCTION_STMT) {
ResolvedCreateFunctionStmt createFunctionStmt =
(ResolvedCreateFunctionStmt) statement;
- udfBuilder.put(createFunctionStmt.getNamePath(), createFunctionStmt);
+ String functionGroup =
SqlAnalyzer.getFunctionGroup(createFunctionStmt);
+ if (SqlAnalyzer.USER_DEFINED_FUNCTIONS.equals(functionGroup)) {
+ udfBuilder.put(createFunctionStmt.getNamePath(), createFunctionStmt);
+ } else if
(SqlAnalyzer.USER_DEFINED_JAVA_SCALAR_FUNCTIONS.equals(functionGroup)) {
+ String jarPath = getJarPath(createFunctionStmt);
+ ScalarFn scalarFn =
+
javaUdfLoader.loadScalarFunction(createFunctionStmt.getNamePath(), jarPath);
Review comment:
Again on bypassing layers, it seems like all this should be in a
`TableProvider` like interface, and be built through a `buildBeamSqlUDF` method
called from `BeamCalciteSchema` (see just above the line in this link for the
table example):
https://github.com/apache/beam/blob/68d6c8e6243b1d8f392840273f886276e2a8baff/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java#L122
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]