amaliujia commented on a change in pull request #12766:
URL: https://github.com/apache/beam/pull/12766#discussion_r487307937
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -227,8 +224,35 @@ private AggregateCall convertAggCall(
ZetaSqlCalciteTranslationUtils.toCalciteType(
computedColumn.getColumn().getType(), nullable,
getCluster().getRexBuilder());
+ SqlAggFunction sqlAggFunction =
+ getSqlAggFunction(aggregateFunctionCall.getFunction().getName(),
returnType);
+
String aggName = getTrait().resolveAlias(computedColumn.getColumn());
return AggregateCall.create(
sqlAggFunction, false, false, false, argList, -1, RelCollations.EMPTY,
returnType, aggName);
}
+
+ private static SqlAggFunction getSqlAggFunction(
+ String zetaSqlAggFunctionName, RelDataType returnType) {
+ // ZetaSQL specific aggregation functions, implemented with a user-defined
CombineFn
+ if (ZETASQL_UDAF_OPERATORS.containsKey(zetaSqlAggFunctionName)) {
+ return ZETASQL_UDAF_OPERATORS.get(zetaSqlAggFunctionName);
+ }
+
+ if ("$count_star".equals(zetaSqlAggFunctionName)) {
+ zetaSqlAggFunctionName = "COUNT"; // $count_star and count both map to
the same implementation
+ } else {
+ // BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES uses upper-case
function names
+ zetaSqlAggFunctionName = zetaSqlAggFunctionName.toUpperCase();
+ }
+
+ // Beam builtin aggregation functions (available in both ZetaSQL and
CalciteSQL), implemented in
+ // {@link
org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}
+ if
(BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES.containsKey(zetaSqlAggFunctionName))
{
Review comment:
Is there a need to use the upper case of `zetaSqlAggFunctionName`?
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -155,6 +156,25 @@ public static SqlFunction createZetaSqlFunction(String
name, SqlTypeName returnT
SqlFunctionCategory.USER_DEFINED_FUNCTION);
}
+ /**
+ * Create a dummy SqlAggFunction of type OTHER_FUNCTION from given function
name and return type.
+ * These functions will be executed by Beam CombineFns implemented in {@link
+ *
org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}.
+ */
+ public static SqlAggFunction createZetaSqlAggFunction(String name,
RelDataType returnType) {
+ return new SqlAggFunction(
+ name,
+ null, // sqlIdentifier
+ SqlKind.OTHER_FUNCTION,
+ x -> returnType,
Review comment:
I am not sure whether this is the right trigger to override the return
type of an agg operator. Maybe the best to evaluate it is to support `AVG` in
ZetaSQL. As I recall the return type is different for ZetaSQL and Calcite when
input parameter type is `INT64` for `AVG`.
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -227,8 +224,35 @@ private AggregateCall convertAggCall(
ZetaSqlCalciteTranslationUtils.toCalciteType(
computedColumn.getColumn().getType(), nullable,
getCluster().getRexBuilder());
+ SqlAggFunction sqlAggFunction =
+ getSqlAggFunction(aggregateFunctionCall.getFunction().getName(),
returnType);
+
String aggName = getTrait().resolveAlias(computedColumn.getColumn());
return AggregateCall.create(
sqlAggFunction, false, false, false, argList, -1, RelCollations.EMPTY,
returnType, aggName);
}
+
+ private static SqlAggFunction getSqlAggFunction(
+ String zetaSqlAggFunctionName, RelDataType returnType) {
+ // ZetaSQL specific aggregation functions, implemented with a user-defined
CombineFn
+ if (ZETASQL_UDAF_OPERATORS.containsKey(zetaSqlAggFunctionName)) {
+ return ZETASQL_UDAF_OPERATORS.get(zetaSqlAggFunctionName);
+ }
+
+ if ("$count_star".equals(zetaSqlAggFunctionName)) {
+ zetaSqlAggFunctionName = "COUNT"; // $count_star and count both map to
the same implementation
+ } else {
+ // BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES uses upper-case
function names
+ zetaSqlAggFunctionName = zetaSqlAggFunctionName.toUpperCase();
+ }
+
+ // Beam builtin aggregation functions (available in both ZetaSQL and
CalciteSQL), implemented in
+ // {@link
org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}
+ if
(BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES.containsKey(zetaSqlAggFunctionName))
{
Review comment:
Is there a need to use the upper case of `zetaSqlAggFunctionName`?
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -155,6 +156,25 @@ public static SqlFunction createZetaSqlFunction(String
name, SqlTypeName returnT
SqlFunctionCategory.USER_DEFINED_FUNCTION);
}
+ /**
+ * Create a dummy SqlAggFunction of type OTHER_FUNCTION from given function
name and return type.
+ * These functions will be executed by Beam CombineFns implemented in {@link
+ *
org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}.
+ */
+ public static SqlAggFunction createZetaSqlAggFunction(String name,
RelDataType returnType) {
+ return new SqlAggFunction(
+ name,
+ null, // sqlIdentifier
+ SqlKind.OTHER_FUNCTION,
+ x -> returnType,
Review comment:
I am not sure whether this is the right trigger to override the return
type of an agg operator. Maybe the best to evaluate it is to support `AVG` in
ZetaSQL. As I recall the return type is different for ZetaSQL and Calcite when
input parameter type is `INT64` for `AVG`.
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -227,8 +224,35 @@ private AggregateCall convertAggCall(
ZetaSqlCalciteTranslationUtils.toCalciteType(
computedColumn.getColumn().getType(), nullable,
getCluster().getRexBuilder());
+ SqlAggFunction sqlAggFunction =
+ getSqlAggFunction(aggregateFunctionCall.getFunction().getName(),
returnType);
+
String aggName = getTrait().resolveAlias(computedColumn.getColumn());
return AggregateCall.create(
sqlAggFunction, false, false, false, argList, -1, RelCollations.EMPTY,
returnType, aggName);
}
+
+ private static SqlAggFunction getSqlAggFunction(
+ String zetaSqlAggFunctionName, RelDataType returnType) {
+ // ZetaSQL specific aggregation functions, implemented with a user-defined
CombineFn
+ if (ZETASQL_UDAF_OPERATORS.containsKey(zetaSqlAggFunctionName)) {
+ return ZETASQL_UDAF_OPERATORS.get(zetaSqlAggFunctionName);
+ }
+
+ if ("$count_star".equals(zetaSqlAggFunctionName)) {
+ zetaSqlAggFunctionName = "COUNT"; // $count_star and count both map to
the same implementation
+ } else {
+ // BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES uses upper-case
function names
+ zetaSqlAggFunctionName = zetaSqlAggFunctionName.toUpperCase();
+ }
+
+ // Beam builtin aggregation functions (available in both ZetaSQL and
CalciteSQL), implemented in
+ // {@link
org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}
+ if
(BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES.containsKey(zetaSqlAggFunctionName))
{
Review comment:
Is there a need to use the upper case of `zetaSqlAggFunctionName`?
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -155,6 +156,25 @@ public static SqlFunction createZetaSqlFunction(String
name, SqlTypeName returnT
SqlFunctionCategory.USER_DEFINED_FUNCTION);
}
+ /**
+ * Create a dummy SqlAggFunction of type OTHER_FUNCTION from given function
name and return type.
+ * These functions will be executed by Beam CombineFns implemented in {@link
+ *
org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}.
+ */
+ public static SqlAggFunction createZetaSqlAggFunction(String name,
RelDataType returnType) {
+ return new SqlAggFunction(
+ name,
+ null, // sqlIdentifier
+ SqlKind.OTHER_FUNCTION,
+ x -> returnType,
Review comment:
I am not sure whether this is the right trigger to override the return
type of an agg operator. Maybe the best to evaluate it is to support `AVG` in
ZetaSQL. As I recall the return type is different for ZetaSQL and Calcite when
input parameter type is `INT64` for `AVG`.
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -227,8 +224,35 @@ private AggregateCall convertAggCall(
ZetaSqlCalciteTranslationUtils.toCalciteType(
computedColumn.getColumn().getType(), nullable,
getCluster().getRexBuilder());
+ SqlAggFunction sqlAggFunction =
+ getSqlAggFunction(aggregateFunctionCall.getFunction().getName(),
returnType);
+
String aggName = getTrait().resolveAlias(computedColumn.getColumn());
return AggregateCall.create(
sqlAggFunction, false, false, false, argList, -1, RelCollations.EMPTY,
returnType, aggName);
}
+
+ private static SqlAggFunction getSqlAggFunction(
+ String zetaSqlAggFunctionName, RelDataType returnType) {
+ // ZetaSQL specific aggregation functions, implemented with a user-defined
CombineFn
+ if (ZETASQL_UDAF_OPERATORS.containsKey(zetaSqlAggFunctionName)) {
+ return ZETASQL_UDAF_OPERATORS.get(zetaSqlAggFunctionName);
+ }
+
+ if ("$count_star".equals(zetaSqlAggFunctionName)) {
+ zetaSqlAggFunctionName = "COUNT"; // $count_star and count both map to
the same implementation
+ } else {
+ // BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES uses upper-case
function names
+ zetaSqlAggFunctionName = zetaSqlAggFunctionName.toUpperCase();
+ }
+
+ // Beam builtin aggregation functions (available in both ZetaSQL and
CalciteSQL), implemented in
+ // {@link
org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}
+ if
(BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES.containsKey(zetaSqlAggFunctionName))
{
Review comment:
Is there a need to use the upper case of `zetaSqlAggFunctionName`?
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -155,6 +156,25 @@ public static SqlFunction createZetaSqlFunction(String
name, SqlTypeName returnT
SqlFunctionCategory.USER_DEFINED_FUNCTION);
}
+ /**
+ * Create a dummy SqlAggFunction of type OTHER_FUNCTION from given function
name and return type.
+ * These functions will be executed by Beam CombineFns implemented in {@link
+ *
org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}.
+ */
+ public static SqlAggFunction createZetaSqlAggFunction(String name,
RelDataType returnType) {
+ return new SqlAggFunction(
+ name,
+ null, // sqlIdentifier
+ SqlKind.OTHER_FUNCTION,
+ x -> returnType,
Review comment:
I am not sure whether this is the right trigger to override the return
type of an agg operator. Maybe the best to evaluate it is to support `AVG` in
ZetaSQL. As I recall the return type is different for ZetaSQL and Calcite when
input parameter type is `INT64` for `AVG`.
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -227,8 +224,35 @@ private AggregateCall convertAggCall(
ZetaSqlCalciteTranslationUtils.toCalciteType(
computedColumn.getColumn().getType(), nullable,
getCluster().getRexBuilder());
+ SqlAggFunction sqlAggFunction =
+ getSqlAggFunction(aggregateFunctionCall.getFunction().getName(),
returnType);
+
String aggName = getTrait().resolveAlias(computedColumn.getColumn());
return AggregateCall.create(
sqlAggFunction, false, false, false, argList, -1, RelCollations.EMPTY,
returnType, aggName);
}
+
+ private static SqlAggFunction getSqlAggFunction(
+ String zetaSqlAggFunctionName, RelDataType returnType) {
+ // ZetaSQL specific aggregation functions, implemented with a user-defined
CombineFn
+ if (ZETASQL_UDAF_OPERATORS.containsKey(zetaSqlAggFunctionName)) {
+ return ZETASQL_UDAF_OPERATORS.get(zetaSqlAggFunctionName);
+ }
+
+ if ("$count_star".equals(zetaSqlAggFunctionName)) {
+ zetaSqlAggFunctionName = "COUNT"; // $count_star and count both map to
the same implementation
+ } else {
+ // BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES uses upper-case
function names
+ zetaSqlAggFunctionName = zetaSqlAggFunctionName.toUpperCase();
+ }
+
+ // Beam builtin aggregation functions (available in both ZetaSQL and
CalciteSQL), implemented in
+ // {@link
org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}
+ if
(BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES.containsKey(zetaSqlAggFunctionName))
{
Review comment:
Is there a need to use the upper case of `zetaSqlAggFunctionName`?
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -155,6 +156,25 @@ public static SqlFunction createZetaSqlFunction(String
name, SqlTypeName returnT
SqlFunctionCategory.USER_DEFINED_FUNCTION);
}
+ /**
+ * Create a dummy SqlAggFunction of type OTHER_FUNCTION from given function
name and return type.
+ * These functions will be executed by Beam CombineFns implemented in {@link
+ *
org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}.
+ */
+ public static SqlAggFunction createZetaSqlAggFunction(String name,
RelDataType returnType) {
+ return new SqlAggFunction(
+ name,
+ null, // sqlIdentifier
+ SqlKind.OTHER_FUNCTION,
+ x -> returnType,
Review comment:
I am not sure whether this is the right trigger to override the return
type of an agg operator. Maybe the best to evaluate it is to support `AVG` in
ZetaSQL. As I recall the return type is different for ZetaSQL and Calcite when
input parameter type is `INT64` for `AVG`.
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/AggregateScanConverter.java
##########
@@ -227,8 +224,35 @@ private AggregateCall convertAggCall(
ZetaSqlCalciteTranslationUtils.toCalciteType(
computedColumn.getColumn().getType(), nullable,
getCluster().getRexBuilder());
+ SqlAggFunction sqlAggFunction =
+ getSqlAggFunction(aggregateFunctionCall.getFunction().getName(),
returnType);
+
String aggName = getTrait().resolveAlias(computedColumn.getColumn());
return AggregateCall.create(
sqlAggFunction, false, false, false, argList, -1, RelCollations.EMPTY,
returnType, aggName);
}
+
+ private static SqlAggFunction getSqlAggFunction(
+ String zetaSqlAggFunctionName, RelDataType returnType) {
+ // ZetaSQL specific aggregation functions, implemented with a user-defined
CombineFn
+ if (ZETASQL_UDAF_OPERATORS.containsKey(zetaSqlAggFunctionName)) {
+ return ZETASQL_UDAF_OPERATORS.get(zetaSqlAggFunctionName);
+ }
+
+ if ("$count_star".equals(zetaSqlAggFunctionName)) {
+ zetaSqlAggFunctionName = "COUNT"; // $count_star and count both map to
the same implementation
+ } else {
+ // BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES uses upper-case
function names
+ zetaSqlAggFunctionName = zetaSqlAggFunctionName.toUpperCase();
+ }
+
+ // Beam builtin aggregation functions (available in both ZetaSQL and
CalciteSQL), implemented in
+ // {@link
org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}
+ if
(BeamBuiltinAggregations.BUILTIN_AGGREGATOR_FACTORIES.containsKey(zetaSqlAggFunctionName))
{
Review comment:
Is there a need to use the upper case of `zetaSqlAggFunctionName`?
##########
File path:
sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/SqlOperators.java
##########
@@ -155,6 +156,25 @@ public static SqlFunction createZetaSqlFunction(String
name, SqlTypeName returnT
SqlFunctionCategory.USER_DEFINED_FUNCTION);
}
+ /**
+ * Create a dummy SqlAggFunction of type OTHER_FUNCTION from given function
name and return type.
+ * These functions will be executed by Beam CombineFns implemented in {@link
+ *
org.apache.beam.sdk.extensions.sql.impl.transform.BeamBuiltinAggregations}.
+ */
+ public static SqlAggFunction createZetaSqlAggFunction(String name,
RelDataType returnType) {
+ return new SqlAggFunction(
+ name,
+ null, // sqlIdentifier
+ SqlKind.OTHER_FUNCTION,
+ x -> returnType,
Review comment:
I am not sure whether this is the right trigger to override the return
type of an agg operator. Maybe the best to evaluate it is to support `AVG` in
ZetaSQL. As I recall the return type is different for ZetaSQL and Calcite when
input parameter type is `INT64` for `AVG`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]