This is an automated email from the ASF dual-hosted git repository.
robinyqiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 287067e Update SupportedZetaSqlBuiltinFunctions and support math
functions
new e9dcd54 Merge pull request #12643 from robinyqiu/math
287067e is described below
commit 287067eeed7b7f48ef25ff5ddcd408ee572861c0
Author: Yueyang Qiu <[email protected]>
AuthorDate: Tue Oct 27 00:33:05 2020 -0700
Update SupportedZetaSqlBuiltinFunctions and support math functions
---
.../provider/bigquery/BeamBigQuerySqlDialect.java | 31 +-
.../zetasql/SupportedZetaSqlBuiltinFunctions.java | 118 ++-
.../zetasql/ZetaSqlCalciteTranslationUtils.java | 27 +-
.../sql/zetasql/ZetaSqlDialectSpecTest.java | 399 --------
.../sql/zetasql/ZetaSqlMathFunctionsTest.java | 1032 ++++++++++++++++++++
.../extensions/sql/zetasql/ZetaSqlTypesUtils.java | 5 +
6 files changed, 1147 insertions(+), 465 deletions(-)
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
index c32ad7c..561eba6 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
@@ -88,16 +88,17 @@ public class BeamBigQuerySqlDialect extends
BigQuerySqlDialect {
.put("$extract_time", "TIME")
.put("$extract_datetime", "DATETIME")
.build();
- public static final String DOUBLE_POSITIVE_INF_FUNCTION =
"double_positive_inf";
- public static final String DOUBLE_NEGATIVE_INF_FUNCTION =
"double_negative_inf";
- public static final String DOUBLE_NAN_FUNCTION = "double_nan";
- private static final Map<String, String> DOUBLE_FUNCTIONS =
+ public static final String DOUBLE_POSITIVE_INF_WRAPPER =
"double_positive_inf";
+ public static final String DOUBLE_NEGATIVE_INF_WRAPPER =
"double_negative_inf";
+ public static final String DOUBLE_NAN_WRAPPER = "double_nan";
+ // ZetaSQL has no literal representation of NaN and infinity, so we need to
CAST from strings
+ private static final Map<String, String> DOUBLE_LITERAL_WRAPPERS =
ImmutableMap.<String, String>builder()
- .put(DOUBLE_POSITIVE_INF_FUNCTION, "CAST('+inf' AS FLOAT64)")
- .put(DOUBLE_NEGATIVE_INF_FUNCTION, "CAST('-inf' AS FLOAT64)")
- .put(DOUBLE_NAN_FUNCTION, "CAST('NaN' AS FLOAT64)")
+ .put(DOUBLE_POSITIVE_INF_WRAPPER, "CAST('+inf' AS FLOAT64)")
+ .put(DOUBLE_NEGATIVE_INF_WRAPPER, "CAST('-inf' AS FLOAT64)")
+ .put(DOUBLE_NAN_WRAPPER, "CAST('NaN' AS FLOAT64)")
.build();
- public static final String NUMERIC_LITERAL_FUNCTION = "numeric_literal";
+ public static final String NUMERIC_LITERAL_WRAPPER = "numeric_literal";
public BeamBigQuerySqlDialect(Context context) {
super(context);
@@ -167,12 +168,12 @@ public class BeamBigQuerySqlDialect extends
BigQuerySqlDialect {
break;
case OTHER_FUNCTION:
String funName = call.getOperator().getName();
- if (DOUBLE_FUNCTIONS.containsKey(funName)) {
+ if (DOUBLE_LITERAL_WRAPPERS.containsKey(funName)) {
// self-designed function dealing with the unparsing of ZetaSQL
DOUBLE positive
// infinity, negative infinity and NaN
- unparseDoubleWrapperFunction(writer, funName);
+ unparseDoubleLiteralWrapperFunction(writer, funName);
break;
- } else if (NUMERIC_LITERAL_FUNCTION.equals(funName)) {
+ } else if (NUMERIC_LITERAL_WRAPPER.equals(funName)) {
// self-designed function dealing with the unparsing of ZetaSQL
NUMERIC literal
unparseNumericLiteralWrapperFunction(writer, call, leftPrec,
rightPrec);
break;
@@ -254,12 +255,8 @@ public class BeamBigQuerySqlDialect extends
BigQuerySqlDialect {
writer.endFunCall(trimFrame);
}
- /**
- * As there is no direct ZetaSQL literal representation of NaN or infinity,
we cast String "+inf",
- * "-inf" and "NaN" to FLOAT64 representing positive infinity, negative
infinity and NaN.
- */
- private void unparseDoubleWrapperFunction(SqlWriter writer, String funName) {
- writer.literal(DOUBLE_FUNCTIONS.get(funName));
+ private void unparseDoubleLiteralWrapperFunction(SqlWriter writer, String
funName) {
+ writer.literal(DOUBLE_LITERAL_WRAPPERS.get(funName));
}
private void unparseNumericLiteralWrapperFunction(
diff --git
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
index 8555e01..d0e88ec 100644
---
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
+++
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
@@ -35,6 +35,8 @@ class SupportedZetaSqlBuiltinFunctions {
FunctionSignatureId.FN_ADD_INT64, // $add
FunctionSignatureId.FN_ADD_NUMERIC, // $add
// FunctionSignatureId.FN_ADD_BIGNUMERIC, // $add
+ // FunctionSignatureId.FN_ADD_DATE_INT64, // $add
+ // FunctionSignatureId.FN_ADD_INT64_DATE, // $add
FunctionSignatureId.FN_AND, // $and
FunctionSignatureId.FN_CASE_NO_VALUE, // $case_no_value
FunctionSignatureId.FN_CASE_WITH_VALUE, // $case_with_value
@@ -65,6 +67,7 @@ class SupportedZetaSqlBuiltinFunctions {
FunctionSignatureId.FN_SUBTRACT_INT64, // $subtract
FunctionSignatureId.FN_SUBTRACT_NUMERIC, // $subtract
// FunctionSignatureId.FN_SUBTRACT_BIGNUMERIC, // $subtract
+ // FunctionSignatureId.FN_SUBTRACT_DATE_INT64, // $subtract
FunctionSignatureId.FN_UNARY_MINUS_INT64, // $unary_minus
FunctionSignatureId.FN_UNARY_MINUS_DOUBLE, // $unary_minus
FunctionSignatureId.FN_UNARY_MINUS_NUMERIC, // $unary_minus
@@ -144,6 +147,7 @@ class SupportedZetaSqlBuiltinFunctions {
// semantically identical to FN_LENGTH_BYTES
FunctionSignatureId.FN_CHAR_LENGTH_STRING, // char_length(string) ->
int64
// semantically identical to FN_LENGTH_STRING
+ // FunctionSignatureId.FN_FORMAT_STRING, // format(string, ...) ->
string
// FunctionSignatureId.FN_SPLIT_STRING, // split(string, string) ->
array of string
// FunctionSignatureId.FN_SPLIT_BYTES, // split(bytes, bytes) ->
array of bytes
//
FunctionSignatureId.FN_REGEXP_CONTAINS_STRING,//regexp_contains(string, string)
-> bool
@@ -187,6 +191,8 @@ class SupportedZetaSqlBuiltinFunctions {
// FunctionSignatureId.FN_TRANSLATE_STRING, // translate(string,
string, string) -> string
// FunctionSignatureId.FN_TRANSLATE_BYTES, // soundex(bytes, bytes,
bytes) -> bytes
// FunctionSignatureId.FN_INITCAP_STRING, // initcap(string[,
string]) -> string
+ // FunctionSignatureId.FN_UNICODE_STRING, // unicode(string) -> int64
+ // FunctionSignatureId.FN_CHR_STRING, // chr(int64) -> string
// Control flow functions
FunctionSignatureId.FN_IF, // if
@@ -235,18 +241,28 @@ class SupportedZetaSqlBuiltinFunctions {
// FunctionSignatureId.FN_UNIX_MICROS_FROM_TIMESTAMP,
FunctionSignatureId.FN_DATE_FROM_TIMESTAMP, // date
FunctionSignatureId.FN_DATE_FROM_DATETIME, // date
+ // FunctionSignatureId.FN_DATE_FROM_DATE, // date
+ // FunctionSignatureId.FN_DATE_FROM_STRING, // date
FunctionSignatureId.FN_DATE_FROM_YEAR_MONTH_DAY, // date
FunctionSignatureId.FN_TIMESTAMP_FROM_STRING, // timestamp
FunctionSignatureId.FN_TIMESTAMP_FROM_DATE, // timestamp
FunctionSignatureId.FN_TIMESTAMP_FROM_DATETIME, // timestamp
+ // FunctionSignatureId.FN_TIMESTAMP_FROM_TIMESTAMP, // timestamp
FunctionSignatureId.FN_TIME_FROM_HOUR_MINUTE_SECOND, // time
FunctionSignatureId.FN_TIME_FROM_TIMESTAMP, // time
FunctionSignatureId.FN_TIME_FROM_DATETIME, // time
+ // FunctionSignatureId.FN_TIME_FROM_TIME, // time
+ // FunctionSignatureId.FN_TIME_FROM_STRING, // time
FunctionSignatureId.FN_DATETIME_FROM_DATE_AND_TIME, // datetime
FunctionSignatureId.FN_DATETIME_FROM_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND, //
datetime
FunctionSignatureId.FN_DATETIME_FROM_TIMESTAMP, // datetime
FunctionSignatureId.FN_DATETIME_FROM_DATE, // datetime
+ // FunctionSignatureId.FN_DATETIME_FROM_DATETIME, // datetime
+ // FunctionSignatureId.FN_DATETIME_FROM_STRING, // datetime
+ // FunctionSignatureId.FN_STRING_FROM_DATE, // string
FunctionSignatureId.FN_STRING_FROM_TIMESTAMP, // string
+ // FunctionSignatureId.FN_STRING_FROM_DATETIME, // string
+ // FunctionSignatureId.FN_STRING_FROM_TIME, // string
// Signatures for extracting date parts, taking a date/timestamp
// and the target date part as arguments.
@@ -277,29 +293,31 @@ class SupportedZetaSqlBuiltinFunctions {
FunctionSignatureId.FN_PARSE_DATETIME, // parse_datetime
FunctionSignatureId.FN_PARSE_TIME, // parse_time
FunctionSignatureId.FN_PARSE_TIMESTAMP, // parse_timestamp
+ // FunctionSignatureId.FN_LAST_DAY_DATE, // last_day date
+ // FunctionSignatureId.FN_LAST_DAY_DATETIME, // last_day datetime
// Math functions
FunctionSignatureId.FN_ABS_INT64, // abs
- // FunctionSignatureId.FN_ABS_DOUBLE, // abs
+ FunctionSignatureId.FN_ABS_DOUBLE, // abs
FunctionSignatureId.FN_ABS_NUMERIC, // abs
// FunctionSignatureId.FN_ABS_BIGNUMERIC, // abs
- // FunctionSignatureId.FN_SIGN_INT64, // sign
- // FunctionSignatureId.FN_SIGN_DOUBLE, // sign
- // FunctionSignatureId.FN_SIGN_NUMERIC, // sign
+ FunctionSignatureId.FN_SIGN_INT64, // sign
+ FunctionSignatureId.FN_SIGN_DOUBLE, // sign
+ FunctionSignatureId.FN_SIGN_NUMERIC, // sign
// FunctionSignatureId.FN_SIGN_BIGNUMERIC, // sign
- // FunctionSignatureId.FN_ROUND_DOUBLE, // round(double) -> double
- // FunctionSignatureId.FN_ROUND_NUMERIC, // round(numeric) -> numeric
+ FunctionSignatureId.FN_ROUND_DOUBLE, // round(double) -> double
+ FunctionSignatureId.FN_ROUND_NUMERIC, // round(numeric) -> numeric
// FunctionSignatureId.FN_ROUND_BIGNUMERIC, // round(bignumeric) ->
bignumeric
- // FunctionSignatureId.FN_ROUND_WITH_DIGITS_DOUBLE, // round(double,
int64) -> double
- // FunctionSignatureId.FN_ROUND_WITH_DIGITS_NUMERIC, //
round(numeric, int64) -> numeric
+ FunctionSignatureId.FN_ROUND_WITH_DIGITS_DOUBLE, // round(double,
int64) -> double
+ FunctionSignatureId.FN_ROUND_WITH_DIGITS_NUMERIC, // round(numeric,
int64) -> numeric
// round(bignumeric, int64) -> bignumeric
// FunctionSignatureId.FN_ROUND_WITH_DIGITS_BIGNUMERIC,
- // FunctionSignatureId.FN_TRUNC_DOUBLE, // trunc(double) -> double
+ FunctionSignatureId.FN_TRUNC_DOUBLE, // trunc(double) -> double
FunctionSignatureId.FN_TRUNC_NUMERIC, // trunc(numeric) -> numeric
// FunctionSignatureId.FN_TRUNC_BIGNUMERIC, // trunc(bignumeric) ->
bignumeric
- // FunctionSignatureId.FN_TRUNC_WITH_DIGITS_DOUBLE, // trunc(double,
int64) -> double
- // FunctionSignatureId.FN_TRUNC_WITH_DIGITS_NUMERIC, //
trunc(numeric, int64) -> numeric
+ FunctionSignatureId.FN_TRUNC_WITH_DIGITS_DOUBLE, // trunc(double,
int64) -> double
+ FunctionSignatureId.FN_TRUNC_WITH_DIGITS_NUMERIC, // trunc(numeric,
int64) -> numeric
// trunc(bignumeric, int64) -> bignumeric
// FunctionSignatureId.FN_TRUNC_WITH_DIGITS_BIGNUMERIC,
FunctionSignatureId.FN_CEIL_DOUBLE, // ceil(double) -> double
@@ -312,52 +330,63 @@ class SupportedZetaSqlBuiltinFunctions {
FunctionSignatureId.FN_MOD_INT64, // mod(int64, int64) -> int64
FunctionSignatureId.FN_MOD_NUMERIC, // mod(numeric, numeric) ->
numeric
// FunctionSignatureId.FN_MOD_BIGNUMERIC, // mod(bignumeric,
bignumeric) -> bignumeric
- // FunctionSignatureId.FN_DIV_INT64, // div(int64, int64) -> int64
- // FunctionSignatureId.FN_DIV_NUMERIC, // div(numeric, numeric) ->
numeric
+ FunctionSignatureId.FN_DIV_INT64, // div(int64, int64) -> int64
+ FunctionSignatureId.FN_DIV_NUMERIC, // div(numeric, numeric) ->
numeric
// FunctionSignatureId.FN_DIV_BIGNUMERIC, // div(bignumeric,
bignumeric) -> bignumeric
FunctionSignatureId.FN_IS_INF, // is_inf
FunctionSignatureId.FN_IS_NAN, // is_nan
- // FunctionSignatureId.FN_IEEE_DIVIDE_DOUBLE, // ieee_divide
+ FunctionSignatureId.FN_IEEE_DIVIDE_DOUBLE, // ieee_divide
FunctionSignatureId.FN_SAFE_DIVIDE_DOUBLE, // safe_divide
FunctionSignatureId.FN_SAFE_DIVIDE_NUMERIC, // safe_divide
// FunctionSignatureId.FN_SAFE_DIVIDE_BIGNUMERIC, // safe_divide
- // FunctionSignatureId.FN_SAFE_ADD_INT64, // safe_add
- // FunctionSignatureId.FN_SAFE_ADD_DOUBLE, // safe_add
- // FunctionSignatureId.FN_SAFE_ADD_NUMERIC, // safe_add
+ FunctionSignatureId.FN_SAFE_ADD_INT64, // safe_add
+ FunctionSignatureId.FN_SAFE_ADD_DOUBLE, // safe_add
+ FunctionSignatureId.FN_SAFE_ADD_NUMERIC, // safe_add
// FunctionSignatureId.FN_SAFE_ADD_BIGNUMERIC, // safe_add
- // FunctionSignatureId.FN_SAFE_SUBTRACT_INT64, // safe_subtract
- // FunctionSignatureId.FN_SAFE_SUBTRACT_DOUBLE, // safe_subtract
- // FunctionSignatureId.FN_SAFE_SUBTRACT_NUMERIC, // safe_subtract
+ FunctionSignatureId.FN_SAFE_SUBTRACT_INT64, // safe_subtract
+ FunctionSignatureId.FN_SAFE_SUBTRACT_DOUBLE, // safe_subtract
+ FunctionSignatureId.FN_SAFE_SUBTRACT_NUMERIC, // safe_subtract
// FunctionSignatureId.FN_SAFE_SUBTRACT_BIGNUMERIC, // safe_subtract
- // FunctionSignatureId.FN_SAFE_MULTIPLY_INT64, // safe_multiply
- // FunctionSignatureId.FN_SAFE_MULTIPLY_DOUBLE, // safe_multiply
- // FunctionSignatureId.FN_SAFE_MULTIPLY_NUMERIC, // safe_multiply
+ FunctionSignatureId.FN_SAFE_MULTIPLY_INT64, // safe_multiply
+ FunctionSignatureId.FN_SAFE_MULTIPLY_DOUBLE, // safe_multiply
+ FunctionSignatureId.FN_SAFE_MULTIPLY_NUMERIC, // safe_multiply
// FunctionSignatureId.FN_SAFE_MULTIPLY_BIGNUMERIC, // safe_multiply
- // FunctionSignatureId.FN_SAFE_UNARY_MINUS_INT64, // safe_negate
- // FunctionSignatureId.FN_SAFE_UNARY_MINUS_DOUBLE, // safe_negate
- // FunctionSignatureId.FN_SAFE_UNARY_MINUS_NUMERIC, // safe_negate
+ FunctionSignatureId.FN_SAFE_UNARY_MINUS_INT64, // safe_negate
+ FunctionSignatureId.FN_SAFE_UNARY_MINUS_DOUBLE, // safe_negate
+ FunctionSignatureId.FN_SAFE_UNARY_MINUS_NUMERIC, // safe_negate
// FunctionSignatureId.FN_SAFE_UNARY_MINUS_BIGNUMERIC, // safe_negate
// FunctionSignatureId.FN_GREATEST, // greatest
// FunctionSignatureId.FN_LEAST, // least
- // FunctionSignatureId.FN_SQRT_DOUBLE, // sqrt
+ FunctionSignatureId.FN_SQRT_DOUBLE, // sqrt
+ FunctionSignatureId.FN_SQRT_NUMERIC, // sqrt(numeric) -> numeric
+ // FunctionSignatureId.FN_SQRT_BIGNUMERIC, // sqrt(bignumeric) ->
bignumeric
FunctionSignatureId.FN_POW_DOUBLE, // pow
FunctionSignatureId.FN_POW_NUMERIC, // pow(numeric, numeric) ->
numeric
// FunctionSignatureId.FN_POW_BIGNUMERIC, // pow(bignumeric,
bignumeric) -> bignumeric
FunctionSignatureId.FN_EXP_DOUBLE, // exp
- // FunctionSignatureId.FN_NATURAL_LOGARITHM_DOUBLE, // ln and log
- // FunctionSignatureId.FN_DECIMAL_LOGARITHM_DOUBLE, // log10
+ FunctionSignatureId.FN_EXP_NUMERIC, // exp(numeric) -> numeric
+ // FunctionSignatureId.FN_EXP_BIGNUMERIC, // exp(bignumeric) ->
bignumeric
+ FunctionSignatureId.FN_NATURAL_LOGARITHM_DOUBLE, // ln
+ FunctionSignatureId.FN_NATURAL_LOGARITHM_NUMERIC, // ln(numeric) ->
numeric
+ // FunctionSignatureId.FN_NATURAL_LOGARITHM_BIGNUMERIC, //
ln(bignumeric) -> bignumeric
+ FunctionSignatureId.FN_DECIMAL_LOGARITHM_DOUBLE, // log10
+ FunctionSignatureId.FN_DECIMAL_LOGARITHM_NUMERIC, // log10(numeric)
-> numeric
+ // FunctionSignatureId.FN_DECIMAL_LOGARITHM_BIGNUMERIC, //
log10(bignumeric) -> bignumeric
FunctionSignatureId.FN_LOGARITHM_DOUBLE, // log
+ FunctionSignatureId.FN_LOGARITHM_NUMERIC, // log(numeric, numeric)
-> numeric
+ // FunctionSignatureId.FN_LOGARITHM_BIGNUMERIC,//log(bignumeric,
bignumeric) -> bignumeric
+
FunctionSignatureId.FN_COS_DOUBLE, // cos
FunctionSignatureId.FN_COSH_DOUBLE, // cosh
FunctionSignatureId.FN_ACOS_DOUBLE, // acos
- // FunctionSignatureId.FN_ACOSH_DOUBLE, // acosh
+ FunctionSignatureId.FN_ACOSH_DOUBLE, // acosh
FunctionSignatureId.FN_SIN_DOUBLE, // sin
- // FunctionSignatureId.FN_SINH_DOUBLE, // sinh
+ FunctionSignatureId.FN_SINH_DOUBLE, // sinh
FunctionSignatureId.FN_ASIN_DOUBLE, // asin
- // FunctionSignatureId.FN_ASINH_DOUBLE, // asinh
+ FunctionSignatureId.FN_ASINH_DOUBLE, // asinh
FunctionSignatureId.FN_TAN_DOUBLE, // tan
FunctionSignatureId.FN_TANH_DOUBLE, // tanh
FunctionSignatureId.FN_ATAN_DOUBLE, // atan
@@ -412,18 +441,25 @@ class SupportedZetaSqlBuiltinFunctions {
// Statistical aggregate functions.
// FunctionSignatureId.FN_CORR, // corr
// FunctionSignatureId.FN_CORR_NUMERIC, // corr
+ // FunctionSignatureId.FN_CORR_BIGNUMERIC, // corr
// FunctionSignatureId.FN_COVAR_POP, // covar_pop
// FunctionSignatureId.FN_COVAR_POP_NUMERIC, // covar_pop
+ // FunctionSignatureId.FN_COVAR_POP_BIGNUMERIC, // covar_pop
// FunctionSignatureId.FN_COVAR_SAMP, // covar_samp
// FunctionSignatureId.FN_COVAR_SAMP_NUMERIC, // covar_samp
+ // FunctionSignatureId.FN_COVAR_SAMP_BIGNUMERIC, // covar_samp
// FunctionSignatureId.FN_STDDEV_POP, // stddev_pop
// FunctionSignatureId.FN_STDDEV_POP_NUMERIC, // stddev_pop
+ // FunctionSignatureId.FN_STDDEV_POP_BIGNUMERIC, // stddev_pop
// FunctionSignatureId.FN_STDDEV_SAMP, // stddev_samp
// FunctionSignatureId.FN_STDDEV_SAMP_NUMERIC, // stddev_samp
+ // FunctionSignatureId.FN_STDDEV_SAMP_BIGNUMERIC, // stddev_samp
// FunctionSignatureId.FN_VAR_POP, // var_pop
// FunctionSignatureId.FN_VAR_POP_NUMERIC, // var_pop
+ // FunctionSignatureId.FN_VAR_POP_BIGNUMERIC, // var_pop
// FunctionSignatureId.FN_VAR_SAMP, // var_samp
// FunctionSignatureId.FN_VAR_SAMP_NUMERIC, // var_samp
+ // FunctionSignatureId.FN_VAR_SAMP_BIGNUMERIC, // var_samp
// FunctionSignatureId.FN_COUNTIF, // countif
@@ -455,8 +491,10 @@ class SupportedZetaSqlBuiltinFunctions {
// FunctionSignatureId.FN_NTH_VALUE, // nth_value
// FunctionSignatureId.FN_PERCENTILE_CONT, // percentile_cont
// FunctionSignatureId.FN_PERCENTILE_CONT_NUMERIC, // percentile_cont
+ // FunctionSignatureId.FN_PERCENTILE_CONT_BIGNUMERIC, //
percentile_cont
// FunctionSignatureId.FN_PERCENTILE_DISC, // percentile_disc
// FunctionSignatureId.FN_PERCENTILE_DISC_NUMERIC, // percentile_disc
+ // FunctionSignatureId.FN_PERCENTILE_DISC_BIGNUMERIC, //
percentile_disc
// Misc functions.
// FunctionSignatureId.FN_BIT_CAST_INT64_TO_INT64, //
bit_cast_to_int64(int64)
@@ -474,16 +512,21 @@ class SupportedZetaSqlBuiltinFunctions {
// FunctionSignatureId.FN_RANGE_BUCKET, // range_bucket(T,
array<T>) -> int64
- // FunctionSignatureId.FN_RAND, // rand() -> double
+ // FunctionSignatureId.FN_RAND // rand() -> double
// FunctionSignatureId.FN_GENERATE_UUID, // generate_uuid() -> string
FunctionSignatureId.FN_JSON_EXTRACT, // json_extract(string, string)
+ // FunctionSignatureId.FN_JSON_EXTRACT_JSON, // json_extract(json,
string) -> json
FunctionSignatureId.FN_JSON_EXTRACT_SCALAR, //
json_extract_scalar(string, string)
+ // json_extract_scalar(json, string) -> string
+ // FunctionSignatureId.FN_JSON_EXTRACT_SCALAR_JSON,
// json_extract_array(string[, string]) -> array
FunctionSignatureId.FN_JSON_EXTRACT_ARRAY,
FunctionSignatureId.FN_TO_JSON_STRING, // to_json_string(any[,
bool]) -> string
- FunctionSignatureId.FN_JSON_QUERY, // json_query(string, string)
- FunctionSignatureId.FN_JSON_VALUE // json_value(string, string)
+ FunctionSignatureId.FN_JSON_QUERY, // json_query(string, string) ->
string
+ // FunctionSignatureId.FN_JSON_QUERY_JSON, // json_query(json,
string) -> json
+ FunctionSignatureId.FN_JSON_VALUE // json_value(string, string) ->
string
+ // FunctionSignatureId.FN_JSON_VALUE_JSON, // json_value(json,
string) -> json
// Net functions. These are all found in the "net.*" namespace.
// FunctionSignatureId.FN_NET_FORMAT_IP,
@@ -594,9 +637,11 @@ class SupportedZetaSqlBuiltinFunctions {
// FunctionSignatureId.FN_ST_UNION_AGG,
// FunctionSignatureId.FN_ST_ACCUM,
// FunctionSignatureId.FN_ST_CENTROID_AGG,
+ // FunctionSignatureId.FN_ST_NEAREST_NEIGHBORS,
// Other geography functions
// FunctionSignatureId.FN_ST_X,
// FunctionSignatureId.FN_ST_Y,
+ // FunctionSignatureId.FN_ST_CLUSTERDBSCAN,
// Array functions.
// FunctionSignatureId.FN_FLATTEN, // flatten(array path) -> array
@@ -612,5 +657,8 @@ class SupportedZetaSqlBuiltinFunctions {
// FunctionSignatureId.FN_MAKE_ARRAY, // $make_array
// FunctionSignatureId.FN_SAFE_ARRAY_AT_OFFSET, //
$safe_array_at_offset
// FunctionSignatureId.FN_SAFE_ARRAY_AT_ORDINAL, //
$safe_array_at_ordinal
+ // FunctionSignatureId.FN_ARRAY_IS_DISTINCT, //
array_is_distinct(array) -> bool
+ // FunctionSignatureId.FN_PROTO_MAP_AT_KEY, // $proto_map_at_key
+ // FunctionSignatureId.FN_SAFE_PROTO_MAP_AT_KEY, //
$safe_proto_map_at_key
);
}
diff --git
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
index 8ddf628..e40f69b 100644
---
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
+++
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
@@ -208,24 +208,23 @@ public final class ZetaSqlCalciteTranslationUtils {
return rexBuilder.makeExactLiteral(
new BigDecimal(value.getInt64Value()), toCalciteType(type, false,
rexBuilder));
case TYPE_DOUBLE:
- // Cannot simply call makeApproxLiteral() for ZetaSQL DOUBLE type
because positive infinity,
- // negative infinity and NaN cannot be directly converted to
BigDecimal. So we create three
- // wrapper functions here for these three cases such that we can later
recognize it and
- // customize its unparsing in BeamBigQuerySqlDialect.
+ // Cannot simply call makeApproxLiteral() because +inf, -inf, and NaN
cannot be represented
+ // as BigDecimal. So we create wrapper functions here for these three
cases such that we can
+ // later recognize it and customize its unparsing in
BeamBigQuerySqlDialect.
double val = value.getDoubleValue();
String wrapperFun = null;
if (val == Double.POSITIVE_INFINITY) {
- wrapperFun = BeamBigQuerySqlDialect.DOUBLE_POSITIVE_INF_FUNCTION;
+ wrapperFun = BeamBigQuerySqlDialect.DOUBLE_POSITIVE_INF_WRAPPER;
} else if (val == Double.NEGATIVE_INFINITY) {
- wrapperFun = BeamBigQuerySqlDialect.DOUBLE_NEGATIVE_INF_FUNCTION;
+ wrapperFun = BeamBigQuerySqlDialect.DOUBLE_NEGATIVE_INF_WRAPPER;
} else if (Double.isNaN(val)) {
- wrapperFun = BeamBigQuerySqlDialect.DOUBLE_NAN_FUNCTION;
+ wrapperFun = BeamBigQuerySqlDialect.DOUBLE_NAN_WRAPPER;
}
RelDataType returnType = toCalciteType(type, false, rexBuilder);
if (wrapperFun == null) {
return rexBuilder.makeApproxLiteral(new BigDecimal(val), returnType);
- } else if
(BeamBigQuerySqlDialect.DOUBLE_NAN_FUNCTION.equals(wrapperFun)) {
+ } else if
(BeamBigQuerySqlDialect.DOUBLE_NAN_WRAPPER.equals(wrapperFun)) {
// TODO[BEAM-10550]: Update the temporary workaround below after
vendored Calcite version.
// Adding an additional random parameter for the wrapper function of
NaN, to avoid
// triggering Calcite operation simplification. (e.g. 'NaN == NaN'
would be simplify to
@@ -248,14 +247,14 @@ public final class ZetaSqlCalciteTranslationUtils {
case TYPE_BYTES:
return rexBuilder.makeBinaryLiteral(new
ByteString(value.getBytesValue().toByteArray()));
case TYPE_NUMERIC:
- // Cannot simply call makeExactLiteral() for ZetaSQL NUMERIC type
because later it will be
- // unparsed to the string representation of the BigDecimal itself
(e.g. "SELECT NUMERIC '0'"
- // will be unparsed to "SELECT 0E-9"), and Calcite does not allow
customize unparsing of
- // SqlNumericLiteral. So we create a wrapper function here such that
we can later recognize
- // it and customize its unparsing in BeamBigQuerySqlDialect.
+ // Cannot simply call makeExactLiteral() because later it will be
unparsed to the string
+ // representation of the BigDecimal itself (e.g. "SELECT NUMERIC '0'"
will be unparsed to
+ // "SELECT 0E-9"), and Calcite does not allow customize unparsing of
SqlNumericLiteral.
+ // So we create a wrapper function here such that we can later
recognize it and customize
+ // its unparsing in BeamBigQuerySqlDialect.
return rexBuilder.makeCall(
SqlOperators.createZetaSqlFunction(
- BeamBigQuerySqlDialect.NUMERIC_LITERAL_FUNCTION,
+ BeamBigQuerySqlDialect.NUMERIC_LITERAL_WRAPPER,
toCalciteType(type, false, rexBuilder).getSqlTypeName()),
rexBuilder.makeExactLiteral(
value.getNumericValue(), toCalciteType(type, false,
rexBuilder)));
diff --git
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
index fe07193..083b236 100644
---
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
+++
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
@@ -203,21 +203,6 @@ public class ZetaSqlDialectSpecTest extends
ZetaSqlTestBase {
}
@Test
- public void testFloat() {
- String sql = "SELECT 3.0";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- final Schema schema = Schema.builder().addNullableField("ColA",
FieldType.DOUBLE).build();
-
-
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(3.0).build());
-
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
public void testStringLiterals() {
String sql = "SELECT '\"America/Los_Angeles\"\\n'";
@@ -802,20 +787,6 @@ public class ZetaSqlDialectSpecTest extends
ZetaSqlTestBase {
}
@Test
- public void testMod() {
- String sql = "SELECT MOD(4, 2)";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- final Schema schema = Schema.builder().addInt64Field("field1").build();
-
-
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(0L).build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
public void testSimpleUnionAll() {
String sql =
"SELECT CAST (1243 as INT64), "
@@ -2165,376 +2136,6 @@ public class ZetaSqlDialectSpecTest extends
ZetaSqlTestBase {
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}
- /////////////////////////////////////////////////////////////////////////////
- // DOUBLE INF and NAN tests
- /////////////////////////////////////////////////////////////////////////////
-
- @Test
- public void testDoubleINF() {
- String sql = "SELECT CAST('+inf' AS FLOAT64), CAST('-inf' AS FLOAT64)";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
- Row.withSchema(
- Schema.builder()
- .addDoubleField("f_double1")
- .addDoubleField("f_double2")
- .build())
- .addValues(Double.POSITIVE_INFINITY)
- .addValues(Double.NEGATIVE_INFINITY)
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
- public void testDoubleINFEQ() {
- String sql =
- "SELECT CAST('+inf' AS FLOAT64) = CAST('+inf' AS FLOAT64), CAST('+inf'
AS FLOAT64) = CAST('-inf' AS FLOAT64)";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
- Row.withSchema(
- Schema.builder()
- .addBooleanField("f_boolean1")
- .addBooleanField("f_boolean2")
- .build())
- .addValues(true)
- .addValues(false)
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
- public void testDoubleNAN() {
- String sql = "SELECT CAST('NaN' AS FLOAT64)";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
- Row.withSchema(Schema.builder().addDoubleField("f_double").build())
- .addValues(Double.NaN)
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
- public void testDoubleNaNEQ() {
- String sql = "SELECT CAST('NaN' AS FLOAT64) = CAST('NaN' AS FLOAT64)";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
-
Row.withSchema(Schema.builder().addBooleanField("f_boolean").build())
- .addValues(false)
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
- public void testDoubleIsINF() {
- String sql =
- "SELECT IS_INF(CAST('+inf' AS FLOAT64)), IS_INF(CAST('-inf' AS
FLOAT64)), IS_INF(3.0)";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
- Row.withSchema(
- Schema.builder()
- .addBooleanField("f_boolean1")
- .addBooleanField("f_boolean2")
- .addBooleanField("f_boolean3")
- .build())
- .addValues(true)
- .addValues(true)
- .addValues(false)
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
- public void testDoubleIsNAN() {
- String sql = "SELECT IS_NAN(CAST('NaN' AS FLOAT64)), IS_NAN(3.0)";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
- Row.withSchema(
- Schema.builder()
- .addBooleanField("f_boolean1")
- .addBooleanField("f_boolean2")
- .build())
- .addValues(true)
- .addValues(false)
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- /////////////////////////////////////////////////////////////////////////////
- // NUMERIC type tests
- /////////////////////////////////////////////////////////////////////////////
-
- @Test
- public void testNumericLiteral() {
- String sql =
- "SELECT NUMERIC '0', "
- + "NUMERIC '123456', "
- + "NUMERIC '-3.14', "
- + "NUMERIC '-0.54321', "
- + "NUMERIC '1.23456e05', "
- + "NUMERIC '-9.876e-3', "
- // min value for ZetaSQL NUMERIC type
- + "NUMERIC '-99999999999999999999999999999.999999999', "
- // max value for ZetaSQL NUMERIC type
- + "NUMERIC '99999999999999999999999999999.999999999'";
- ;
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
- Row.withSchema(
- Schema.builder()
- .addDecimalField("f_numeric1")
- .addDecimalField("f_numeric2")
- .addDecimalField("f_numeric3")
- .addDecimalField("f_numeric4")
- .addDecimalField("f_numeric5")
- .addDecimalField("f_numeric6")
- .addDecimalField("f_numeric7")
- .addDecimalField("f_numeric8")
- .build())
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("0"))
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123456"))
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-3.14"))
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.54321"))
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123456"))
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.009876"))
- .addValues(
- ZetaSqlTypesUtils.bigDecimalAsNumeric(
- "-99999999999999999999999999999.999999999"))
- .addValues(
- ZetaSqlTypesUtils.bigDecimalAsNumeric(
- "99999999999999999999999999999.999999999"))
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
- public void testNumericColumn() {
- String sql = "SELECT numeric_field FROM table_with_numeric";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
-
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123.4567"))
- .build(),
-
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("765.4321"))
- .build(),
-
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-555.5555"))
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
- public void testUnaryMinusNumeric() {
- String sql = "SELECT - NUMERIC '1.23456e05'";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
-
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-123456"))
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
- public void testAddNumeric() {
- String sql = "SELECT NUMERIC '1.23456e05' + NUMERIC '9.876e-3'";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
-
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
-
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123456.009876"))
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
- public void testSubNumeric() {
- String sql = "SELECT NUMERIC '1.23456e05' - NUMERIC '-9.876e-3'";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
-
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
-
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123456.009876"))
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
- public void testMultiNumeric() {
- String sql = "SELECT NUMERIC '1.23e02' * NUMERIC '-1.001e-3'";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
-
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.123123"))
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
- public void testDivNumeric() {
- String sql = "SELECT NUMERIC '-1.23123e-1' / NUMERIC '-1.001e-3'";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
-
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123"))
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
- public void testModNumeric() {
- String sql = "SELECT MOD(NUMERIC '1.23456e05', NUMERIC '5')";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
-
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("1"))
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
- public void testFloorNumeric() {
- String sql = "SELECT FLOOR(NUMERIC '1.23456e04'), FLOOR(NUMERIC
'-1.23456e04')";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
- Row.withSchema(
- Schema.builder()
- .addDecimalField("f_numeric1")
- .addDecimalField("f_numeric2")
- .build())
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("12345"))
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-12346"))
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
- public void testCeilNumeric() {
- String sql = "SELECT CEIL(NUMERIC '1.23456e04'), CEIL(NUMERIC
'-1.23456e04')";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
- Row.withSchema(
- Schema.builder()
- .addDecimalField("f_numeric1")
- .addDecimalField("f_numeric2")
- .build())
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("12346"))
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-12345"))
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
- @Ignore("https://jira.apache.org/jira/browse/BEAM-10459")
- public void testSumNumeric() {
- String sql = "SELECT SUM(numeric_field) FROM table_with_numeric";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
-
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("333.3333"))
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
- @Test
- @Ignore("https://jira.apache.org/jira/browse/BEAM-10459")
- public void testAvgNumeric() {
- String sql = "SELECT AVG(numeric_field) FROM table_with_numeric";
-
- ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
- BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
- PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
-
- PAssert.that(stream)
- .containsInAnyOrder(
-
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
- .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("111.1111"))
- .build());
-
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
- }
-
@Test
public void testMultipleSelectStatementsThrowsException() {
String sql = "SELECT 1; SELECT 2;";
diff --git
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlMathFunctionsTest.java
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlMathFunctionsTest.java
new file mode 100644
index 0000000..c2ee278
--- /dev/null
+++
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlMathFunctionsTest.java
@@ -0,0 +1,1032 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for ZetaSQL Math functions (on INT64, DOUBLE, NUMERIC types). */
+@RunWith(JUnit4.class)
+public class ZetaSqlMathFunctionsTest extends ZetaSqlTestBase {
+
+ @Rule public transient TestPipeline pipeline = TestPipeline.create();
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ @Before
+ public void setUp() {
+ initialize();
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+ // INT64 type tests
+ /////////////////////////////////////////////////////////////////////////////
+
+ @Test
+ public void testArithmeticOperatorsInt64() {
+ String sql = "SELECT -1, 1 + 2, 1 - 2, 1 * 2, 1 / 2";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addInt64Field("f_int64_1")
+ .addInt64Field("f_int64_2")
+ .addInt64Field("f_int64_3")
+ .addInt64Field("f_int64_4")
+ .addDoubleField("f_double")
+ .build())
+ .addValues(-1L, 3L, -1L, 2L, 0.5)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testAbsInt64() {
+ String sql = "SELECT ABS(1), ABS(-1)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+
Schema.builder().addInt64Field("f_int64_1").addInt64Field("f_int64_2").build())
+ .addValues(1L, 1L)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testSignInt64() {
+ String sql = "SELECT SIGN(0), SIGN(5), SIGN(-5)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addInt64Field("f_int64_1")
+ .addInt64Field("f_int64_2")
+ .addInt64Field("f_int64_3")
+ .build())
+ .addValues(0L, 1L, -1L)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testModInt64() {
+ String sql = "SELECT MOD(4, 2)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addInt64Field("f_int64").build())
+ .addValues(0L)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testDivInt64() {
+ String sql = "SELECT DIV(1, 2), DIV(2, 1)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+
Schema.builder().addInt64Field("f_int64_1").addInt64Field("f_int64_2").build())
+ .addValues(0L, 2L)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testSafeArithmeticFunctionsInt64() {
+ String sql =
+ "SELECT SAFE_ADD(9223372036854775807, 1), "
+ + "SAFE_SUBTRACT(-9223372036854775808, 1), "
+ + "SAFE_MULTIPLY(9223372036854775807, 2), "
+ + "SAFE_DIVIDE(1, 0), "
+ + "SAFE_NEGATE(-9223372036854775808)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addNullableField("f_int64_1", Schema.FieldType.INT64)
+ .addNullableField("f_int64_2", Schema.FieldType.INT64)
+ .addNullableField("f_int64_3", Schema.FieldType.INT64)
+ .addNullableField("f_int64_4", Schema.FieldType.INT64)
+ .addNullableField("f_int64_5", Schema.FieldType.INT64)
+ .build())
+ .addValues(null, null, null, null, null)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+ // DOUBLE (FLOAT64) type tests
+ /////////////////////////////////////////////////////////////////////////////
+
+ @Test
+ public void testDoubleLiteral() {
+ String sql =
+ "SELECT 3.0, CAST('+inf' AS FLOAT64), CAST('-inf' AS FLOAT64),
CAST('NaN' AS FLOAT64)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addDoubleField("f_double1")
+ .addDoubleField("f_double2")
+ .addDoubleField("f_double3")
+ .addDoubleField("f_double4")
+ .build())
+ .addValues(3.0, Double.POSITIVE_INFINITY,
Double.NEGATIVE_INFINITY, Double.NaN)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testArithmeticOperatorsDouble() {
+ String sql = "SELECT -1.5, 1.5 + 2.5, 1.5 - 2.5, 1.5 * 2.5, 1.5 / 2.5";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addDoubleField("f_double1")
+ .addDoubleField("f_double2")
+ .addDoubleField("f_double3")
+ .addDoubleField("f_double4")
+ .addDoubleField("f_double5")
+ .build())
+ .addValues(-1.5, 4.0, -1.0, 3.75, 0.6)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testEqualsInf() {
+ String sql =
+ "SELECT CAST('+inf' AS FLOAT64) = CAST('+inf' AS FLOAT64), "
+ + "CAST('+inf' AS FLOAT64) = CAST('-inf' AS FLOAT64)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addBooleanField("f_boolean1")
+ .addBooleanField("f_boolean2")
+ .build())
+ .addValues(true, false)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testEqualsNaN() {
+ String sql = "SELECT CAST('NaN' AS FLOAT64) = CAST('NaN' AS FLOAT64)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+
Row.withSchema(Schema.builder().addBooleanField("f_boolean").build())
+ .addValues(false)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testAbsDouble() {
+ String sql = "SELECT ABS(1.5), ABS(-1.0), ABS(CAST('NaN' AS FLOAT64))";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addDoubleField("f_double1")
+ .addDoubleField("f_double2")
+ .addDoubleField("f_double3")
+ .build())
+ .addValues(1.5, 1.0, Double.NaN)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testSignDouble() {
+ String sql = "SELECT SIGN(-0.0), SIGN(1.5), SIGN(-1.5), SIGN(CAST('NaN' AS
FLOAT64))";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addDoubleField("f_double1")
+ .addDoubleField("f_double2")
+ .addDoubleField("f_double3")
+ .addDoubleField("f_double4")
+ .build())
+ .addValues(0.0, 1.0, -1.0, Double.NaN)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testRoundDouble() {
+ String sql = "SELECT ROUND(1.23), ROUND(-1.27, 1)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addDoubleField("f_double1")
+ .addDoubleField("f_double2")
+ .build())
+ .addValues(1.0, -1.3)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testTruncDouble() {
+ String sql = "SELECT TRUNC(1.23), TRUNC(-1.27, 1)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addDoubleField("f_double1")
+ .addDoubleField("f_double2")
+ .build())
+ .addValues(1.0, -1.2)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testCeilDouble() {
+ String sql = "SELECT CEIL(1.2), CEIL(-1.2)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addDoubleField("f_double1")
+ .addDoubleField("f_double2")
+ .build())
+ .addValues(2.0, -1.0)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testFloorDouble() {
+ String sql = "SELECT FLOOR(1.2), FLOOR(-1.2)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addDoubleField("f_double1")
+ .addDoubleField("f_double2")
+ .build())
+ .addValues(1.0, -2.0)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testIsInf() {
+ String sql =
+ "SELECT IS_INF(CAST('+inf' AS FLOAT64)), IS_INF(CAST('-inf' AS
FLOAT64)), IS_INF(3.0)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addBooleanField("f_boolean1")
+ .addBooleanField("f_boolean2")
+ .addBooleanField("f_boolean3")
+ .build())
+ .addValues(true, true, false)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testIsNaN() {
+ String sql = "SELECT IS_NAN(CAST('NaN' AS FLOAT64)), IS_NAN(3.0)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addBooleanField("f_boolean1")
+ .addBooleanField("f_boolean2")
+ .build())
+ .addValues(true, false)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testIeeeDivide() {
+ String sql = "SELECT IEEE_DIVIDE(1.0, 0.0)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addDoubleField("f_double").build())
+ .addValues(Double.POSITIVE_INFINITY)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testSafeDivide() {
+ String sql = "SELECT SAFE_DIVIDE(1.0, 0.0)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder().addNullableField("f_double",
Schema.FieldType.DOUBLE).build())
+ .addValue(null)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testSqrtDouble() {
+ String sql = "SELECT SQRT(4.0)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addDoubleField("f_double").build())
+ .addValues(2.0)
+ .build());
+
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testPowDouble() {
+ String sql = "SELECT POW(2.0, 3.0)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addDoubleField("f_double").build())
+ .addValues(8.0)
+ .build());
+
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testExpDouble() {
+ String sql = "SELECT EXP(2.0)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addDoubleField("f_double").build())
+ .addValues(7.38905609893065)
+ .build());
+
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testLnDouble() {
+ String sql = "SELECT LN(7.38905609893065)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addDoubleField("f_double").build())
+ .addValues(2.0)
+ .build());
+
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testLog10Double() {
+ String sql = "SELECT LOG10(100.0)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addDoubleField("f_double").build())
+ .addValues(2.0)
+ .build());
+
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testLogDouble() {
+ String sql = "SELECT LOG(2.25, 1.5)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(Schema.builder().addDoubleField("f_double").build())
+ .addValues(2.0)
+ .build());
+
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testTrigonometricFunctions() {
+ String sql =
+ "SELECT COS(0.0), COSH(0.0), ACOS(1.0), ACOSH(1.0), "
+ + "SIN(0.0), SINH(0.0), ASIN(0.0), ASINH(0.0), "
+ + "TAN(0.0), TANH(0.0), ATAN(0.0), ATANH(0.0), ATAN2(0.0, 0.0)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addDoubleField("f_double1")
+ .addDoubleField("f_double2")
+ .addDoubleField("f_double3")
+ .addDoubleField("f_double4")
+ .addDoubleField("f_double5")
+ .addDoubleField("f_double6")
+ .addDoubleField("f_double7")
+ .addDoubleField("f_double8")
+ .addDoubleField("f_double9")
+ .addDoubleField("f_double10")
+ .addDoubleField("f_double11")
+ .addDoubleField("f_double12")
+ .addDoubleField("f_double13")
+ .build())
+ .addValues(1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
0.0, 0.0, 0.0)
+ .build());
+
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ /////////////////////////////////////////////////////////////////////////////
+ // NUMERIC type tests
+ /////////////////////////////////////////////////////////////////////////////
+
+ @Test
+ public void testNumericLiteral() {
+ String sql =
+ "SELECT NUMERIC '0', "
+ + "NUMERIC '123456', "
+ + "NUMERIC '-3.14', "
+ + "NUMERIC '-0.54321', "
+ + "NUMERIC '1.23456e05', "
+ + "NUMERIC '-9.876e-3', "
+ // min value for ZetaSQL NUMERIC type
+ + "NUMERIC '-99999999999999999999999999999.999999999', "
+ // max value for ZetaSQL NUMERIC type
+ + "NUMERIC '99999999999999999999999999999.999999999'";
+ ;
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addDecimalField("f_numeric1")
+ .addDecimalField("f_numeric2")
+ .addDecimalField("f_numeric3")
+ .addDecimalField("f_numeric4")
+ .addDecimalField("f_numeric5")
+ .addDecimalField("f_numeric6")
+ .addDecimalField("f_numeric7")
+ .addDecimalField("f_numeric8")
+ .build())
+ .addValues(
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("0"),
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("123456"),
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("-3.14"),
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.54321"),
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("123456"),
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.009876"),
+ ZetaSqlTypesUtils.NUMERIC_MIN_VALUE,
+ ZetaSqlTypesUtils.NUMERIC_MAX_VALUE)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testNumericColumn() {
+ String sql = "SELECT numeric_field FROM table_with_numeric";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ final Schema schema =
Schema.builder().addDecimalField("f_numeric").build();
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(schema)
+ .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123.4567"))
+ .build(),
+ Row.withSchema(schema)
+ .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("765.4321"))
+ .build(),
+ Row.withSchema(schema)
+ .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-555.5555"))
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testArithmeticOperatorsNumeric() {
+ String sql =
+ "SELECT - NUMERIC '1.23456e05', "
+ + "NUMERIC '1.23456e05' + NUMERIC '9.876e-3', "
+ + "NUMERIC '1.23456e05' - NUMERIC '-9.876e-3', "
+ + "NUMERIC '1.23e02' * NUMERIC '-1.001e-3', "
+ + "NUMERIC '-1.23123e-1' / NUMERIC '-1.001e-3', ";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addDecimalField("f_numeric1")
+ .addDecimalField("f_numeric2")
+ .addDecimalField("f_numeric3")
+ .addDecimalField("f_numeric4")
+ .addDecimalField("f_numeric5")
+ .build())
+ .addValues(
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("-123456"),
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("123456.009876"),
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("123456.009876"),
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.123123"),
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("123"))
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testAbsNumeric() {
+ String sql = "SELECT ABS(NUMERIC '1.23456e04'), ABS(NUMERIC
'-1.23456e04')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addDecimalField("f_numeric1")
+ .addDecimalField("f_numeric2")
+ .build())
+ .addValues(
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("12345.6"),
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("12345.6"))
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testSignNumeric() {
+ String sql = "SELECT SIGN(NUMERIC '0'), SIGN(NUMERIC '1.23e01'),
SIGN(NUMERIC '-1.23e01')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addDecimalField("f_numeric1")
+ .addDecimalField("f_numeric2")
+ .addDecimalField("f_numeric3")
+ .build())
+ .addValues(
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("0"),
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("1"),
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("-1"))
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testRoundNumeric() {
+ String sql = "SELECT ROUND(NUMERIC '1.23456e04'), ROUND(NUMERIC
'-1.234567e04', 1)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addDecimalField("f_numeric1")
+ .addDecimalField("f_numeric2")
+ .build())
+ .addValues(
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("12346"),
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("-12345.7"))
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testTruncNumeric() {
+ String sql = "SELECT TRUNC(NUMERIC '1.23456e04'), TRUNC(NUMERIC
'-1.234567e04', 1)";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addDecimalField("f_numeric1")
+ .addDecimalField("f_numeric2")
+ .build())
+ .addValues(
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("12345"),
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("-12345.6"))
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testCeilNumeric() {
+ String sql = "SELECT CEIL(NUMERIC '1.23456e04'), CEIL(NUMERIC
'-1.23456e04')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addDecimalField("f_numeric1")
+ .addDecimalField("f_numeric2")
+ .build())
+ .addValues(
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("12346"),
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("-12345"))
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testFloorNumeric() {
+ String sql = "SELECT FLOOR(NUMERIC '1.23456e04'), FLOOR(NUMERIC
'-1.23456e04')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addDecimalField("f_numeric1")
+ .addDecimalField("f_numeric2")
+ .build())
+ .addValues(
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("12345"),
+ ZetaSqlTypesUtils.bigDecimalAsNumeric("-12346"))
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testModNumeric() {
+ String sql = "SELECT MOD(NUMERIC '1.23456e05', NUMERIC '5')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+ .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("1"))
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testDivNumeric() {
+ String sql = "SELECT DIV(NUMERIC '1.23456e05', NUMERIC '5')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+ .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("24691"))
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testSafeArithmeticFunctionsNumeric() {
+ String sql =
+ "SELECT SAFE_ADD(NUMERIC '99999999999999999999999999999.999999999',
NUMERIC '1'), "
+ + "SAFE_SUBTRACT(NUMERIC
'-99999999999999999999999999999.999999999', NUMERIC '1'), "
+ + "SAFE_MULTIPLY(NUMERIC
'99999999999999999999999999999.999999999', NUMERIC '2'), "
+ + "SAFE_DIVIDE(NUMERIC '1.23456e05', NUMERIC '0'), "
+ + "SAFE_NEGATE(NUMERIC '99999999999999999999999999999.999999999')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+ Row.withSchema(
+ Schema.builder()
+ .addNullableField("f_numeric1",
Schema.FieldType.DECIMAL)
+ .addNullableField("f_numeric2",
Schema.FieldType.DECIMAL)
+ .addNullableField("f_numeric3",
Schema.FieldType.DECIMAL)
+ .addNullableField("f_numeric4",
Schema.FieldType.DECIMAL)
+ .addNullableField("f_numeric5",
Schema.FieldType.DECIMAL)
+ .build())
+ .addValues(null, null, null, null,
ZetaSqlTypesUtils.NUMERIC_MIN_VALUE)
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testSqrtNumeric() {
+ String sql = "SELECT SQRT(NUMERIC '4')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+ .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("2"))
+ .build());
+
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testPowNumeric() {
+ String sql = "SELECT POW(NUMERIC '2', NUMERIC '3')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+ .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("8"))
+ .build());
+
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testExpNumeric() {
+ String sql = "SELECT EXP(NUMERIC '2')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("7.389056099"))
+ .build());
+
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testLnNumeric() {
+ String sql = "SELECT LN(NUMERIC '7.389056099')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+ .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("2"))
+ .build());
+
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testLog10Numeric() {
+ String sql = "SELECT LOG10(NUMERIC '100')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+ .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("2"))
+ .build());
+
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ public void testLogNumeric() {
+ String sql = "SELECT LOG(NUMERIC '2.25', NUMERIC '1.5')";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+ .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("2"))
+ .build());
+
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ @Ignore("[BEAM-10459] Aggregation functions on NUMERIC is not supported yet")
+ public void testSumNumeric() {
+ String sql = "SELECT SUM(numeric_field) FROM table_with_numeric";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+ .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("333.3333"))
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+
+ @Test
+ @Ignore("[BEAM-10459] Aggregation functions on NUMERIC is not supported yet")
+ public void testAvgNumeric() {
+ String sql = "SELECT AVG(numeric_field) FROM table_with_numeric";
+
+ ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+ BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+ PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline,
beamRelNode);
+
+ PAssert.that(stream)
+ .containsInAnyOrder(
+
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+ .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("111.1111"))
+ .build());
+
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+ }
+}
diff --git
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTypesUtils.java
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTypesUtils.java
index 3d8d478..02893b7 100644
---
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTypesUtils.java
+++
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTypesUtils.java
@@ -24,6 +24,11 @@ import org.apache.beam.sdk.annotations.Internal;
@Internal
public class ZetaSqlTypesUtils {
+ public static final BigDecimal NUMERIC_MAX_VALUE =
+ bigDecimalAsNumeric("99999999999999999999999999999.999999999");
+ public static final BigDecimal NUMERIC_MIN_VALUE =
+ bigDecimalAsNumeric("-99999999999999999999999999999.999999999");
+
private ZetaSqlTypesUtils() {}
/**