This is an automated email from the ASF dual-hosted git repository.

robinyqiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 287067e  Update SupportedZetaSqlBuiltinFunctions and support math 
functions
     new e9dcd54  Merge pull request #12643 from robinyqiu/math
287067e is described below

commit 287067eeed7b7f48ef25ff5ddcd408ee572861c0
Author: Yueyang Qiu <[email protected]>
AuthorDate: Tue Oct 27 00:33:05 2020 -0700

    Update SupportedZetaSqlBuiltinFunctions and support math functions
---
 .../provider/bigquery/BeamBigQuerySqlDialect.java  |   31 +-
 .../zetasql/SupportedZetaSqlBuiltinFunctions.java  |  118 ++-
 .../zetasql/ZetaSqlCalciteTranslationUtils.java    |   27 +-
 .../sql/zetasql/ZetaSqlDialectSpecTest.java        |  399 --------
 .../sql/zetasql/ZetaSqlMathFunctionsTest.java      | 1032 ++++++++++++++++++++
 .../extensions/sql/zetasql/ZetaSqlTypesUtils.java  |    5 +
 6 files changed, 1147 insertions(+), 465 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
index c32ad7c..561eba6 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
@@ -88,16 +88,17 @@ public class BeamBigQuerySqlDialect extends 
BigQuerySqlDialect {
           .put("$extract_time", "TIME")
           .put("$extract_datetime", "DATETIME")
           .build();
-  public static final String DOUBLE_POSITIVE_INF_FUNCTION = 
"double_positive_inf";
-  public static final String DOUBLE_NEGATIVE_INF_FUNCTION = 
"double_negative_inf";
-  public static final String DOUBLE_NAN_FUNCTION = "double_nan";
-  private static final Map<String, String> DOUBLE_FUNCTIONS =
+  public static final String DOUBLE_POSITIVE_INF_WRAPPER = 
"double_positive_inf";
+  public static final String DOUBLE_NEGATIVE_INF_WRAPPER = 
"double_negative_inf";
+  public static final String DOUBLE_NAN_WRAPPER = "double_nan";
+  // ZetaSQL has no literal representation of NaN and infinity, so we need to 
CAST from strings
+  private static final Map<String, String> DOUBLE_LITERAL_WRAPPERS =
       ImmutableMap.<String, String>builder()
-          .put(DOUBLE_POSITIVE_INF_FUNCTION, "CAST('+inf' AS FLOAT64)")
-          .put(DOUBLE_NEGATIVE_INF_FUNCTION, "CAST('-inf' AS FLOAT64)")
-          .put(DOUBLE_NAN_FUNCTION, "CAST('NaN' AS FLOAT64)")
+          .put(DOUBLE_POSITIVE_INF_WRAPPER, "CAST('+inf' AS FLOAT64)")
+          .put(DOUBLE_NEGATIVE_INF_WRAPPER, "CAST('-inf' AS FLOAT64)")
+          .put(DOUBLE_NAN_WRAPPER, "CAST('NaN' AS FLOAT64)")
           .build();
-  public static final String NUMERIC_LITERAL_FUNCTION = "numeric_literal";
+  public static final String NUMERIC_LITERAL_WRAPPER = "numeric_literal";
 
   public BeamBigQuerySqlDialect(Context context) {
     super(context);
@@ -167,12 +168,12 @@ public class BeamBigQuerySqlDialect extends 
BigQuerySqlDialect {
         break;
       case OTHER_FUNCTION:
         String funName = call.getOperator().getName();
-        if (DOUBLE_FUNCTIONS.containsKey(funName)) {
+        if (DOUBLE_LITERAL_WRAPPERS.containsKey(funName)) {
           // self-designed function dealing with the unparsing of ZetaSQL 
DOUBLE positive
           // infinity, negative infinity and NaN
-          unparseDoubleWrapperFunction(writer, funName);
+          unparseDoubleLiteralWrapperFunction(writer, funName);
           break;
-        } else if (NUMERIC_LITERAL_FUNCTION.equals(funName)) {
+        } else if (NUMERIC_LITERAL_WRAPPER.equals(funName)) {
           // self-designed function dealing with the unparsing of ZetaSQL 
NUMERIC literal
           unparseNumericLiteralWrapperFunction(writer, call, leftPrec, 
rightPrec);
           break;
@@ -254,12 +255,8 @@ public class BeamBigQuerySqlDialect extends 
BigQuerySqlDialect {
     writer.endFunCall(trimFrame);
   }
 
-  /**
-   * As there is no direct ZetaSQL literal representation of NaN or infinity, 
we cast String "+inf",
-   * "-inf" and "NaN" to FLOAT64 representing positive infinity, negative 
infinity and NaN.
-   */
-  private void unparseDoubleWrapperFunction(SqlWriter writer, String funName) {
-    writer.literal(DOUBLE_FUNCTIONS.get(funName));
+  private void unparseDoubleLiteralWrapperFunction(SqlWriter writer, String 
funName) {
+    writer.literal(DOUBLE_LITERAL_WRAPPERS.get(funName));
   }
 
   private void unparseNumericLiteralWrapperFunction(
diff --git 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
index 8555e01..d0e88ec 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
@@ -35,6 +35,8 @@ class SupportedZetaSqlBuiltinFunctions {
           FunctionSignatureId.FN_ADD_INT64, // $add
           FunctionSignatureId.FN_ADD_NUMERIC, // $add
           // FunctionSignatureId.FN_ADD_BIGNUMERIC, // $add
+          // FunctionSignatureId.FN_ADD_DATE_INT64, // $add
+          // FunctionSignatureId.FN_ADD_INT64_DATE, // $add
           FunctionSignatureId.FN_AND, // $and
           FunctionSignatureId.FN_CASE_NO_VALUE, // $case_no_value
           FunctionSignatureId.FN_CASE_WITH_VALUE, // $case_with_value
@@ -65,6 +67,7 @@ class SupportedZetaSqlBuiltinFunctions {
           FunctionSignatureId.FN_SUBTRACT_INT64, // $subtract
           FunctionSignatureId.FN_SUBTRACT_NUMERIC, // $subtract
           // FunctionSignatureId.FN_SUBTRACT_BIGNUMERIC, // $subtract
+          // FunctionSignatureId.FN_SUBTRACT_DATE_INT64, // $subtract
           FunctionSignatureId.FN_UNARY_MINUS_INT64, // $unary_minus
           FunctionSignatureId.FN_UNARY_MINUS_DOUBLE, // $unary_minus
           FunctionSignatureId.FN_UNARY_MINUS_NUMERIC, // $unary_minus
@@ -144,6 +147,7 @@ class SupportedZetaSqlBuiltinFunctions {
           // semantically identical to FN_LENGTH_BYTES
           FunctionSignatureId.FN_CHAR_LENGTH_STRING, // char_length(string) -> 
int64
           // semantically identical to FN_LENGTH_STRING
+          // FunctionSignatureId.FN_FORMAT_STRING, // format(string, ...) -> 
string
           // FunctionSignatureId.FN_SPLIT_STRING, // split(string, string) -> 
array of string
           // FunctionSignatureId.FN_SPLIT_BYTES, // split(bytes, bytes) -> 
array of bytes
           // 
FunctionSignatureId.FN_REGEXP_CONTAINS_STRING,//regexp_contains(string, string) 
-> bool
@@ -187,6 +191,8 @@ class SupportedZetaSqlBuiltinFunctions {
           // FunctionSignatureId.FN_TRANSLATE_STRING, // translate(string, 
string, string) -> string
           // FunctionSignatureId.FN_TRANSLATE_BYTES, // soundex(bytes, bytes, 
bytes) -> bytes
           // FunctionSignatureId.FN_INITCAP_STRING, // initcap(string[, 
string]) -> string
+          // FunctionSignatureId.FN_UNICODE_STRING, // unicode(string) -> int64
+          // FunctionSignatureId.FN_CHR_STRING, // chr(int64) -> string
 
           // Control flow functions
           FunctionSignatureId.FN_IF, // if
@@ -235,18 +241,28 @@ class SupportedZetaSqlBuiltinFunctions {
           // FunctionSignatureId.FN_UNIX_MICROS_FROM_TIMESTAMP,
           FunctionSignatureId.FN_DATE_FROM_TIMESTAMP, // date
           FunctionSignatureId.FN_DATE_FROM_DATETIME, // date
+          // FunctionSignatureId.FN_DATE_FROM_DATE, // date
+          // FunctionSignatureId.FN_DATE_FROM_STRING, // date
           FunctionSignatureId.FN_DATE_FROM_YEAR_MONTH_DAY, // date
           FunctionSignatureId.FN_TIMESTAMP_FROM_STRING, // timestamp
           FunctionSignatureId.FN_TIMESTAMP_FROM_DATE, // timestamp
           FunctionSignatureId.FN_TIMESTAMP_FROM_DATETIME, // timestamp
+          // FunctionSignatureId.FN_TIMESTAMP_FROM_TIMESTAMP, // timestamp
           FunctionSignatureId.FN_TIME_FROM_HOUR_MINUTE_SECOND, // time
           FunctionSignatureId.FN_TIME_FROM_TIMESTAMP, // time
           FunctionSignatureId.FN_TIME_FROM_DATETIME, // time
+          // FunctionSignatureId.FN_TIME_FROM_TIME, // time
+          // FunctionSignatureId.FN_TIME_FROM_STRING, // time
           FunctionSignatureId.FN_DATETIME_FROM_DATE_AND_TIME, // datetime
           
FunctionSignatureId.FN_DATETIME_FROM_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND, // 
datetime
           FunctionSignatureId.FN_DATETIME_FROM_TIMESTAMP, // datetime
           FunctionSignatureId.FN_DATETIME_FROM_DATE, // datetime
+          // FunctionSignatureId.FN_DATETIME_FROM_DATETIME, // datetime
+          // FunctionSignatureId.FN_DATETIME_FROM_STRING, // datetime
+          // FunctionSignatureId.FN_STRING_FROM_DATE, // string
           FunctionSignatureId.FN_STRING_FROM_TIMESTAMP, // string
+          // FunctionSignatureId.FN_STRING_FROM_DATETIME, // string
+          // FunctionSignatureId.FN_STRING_FROM_TIME, // string
 
           // Signatures for extracting date parts, taking a date/timestamp
           // and the target date part as arguments.
@@ -277,29 +293,31 @@ class SupportedZetaSqlBuiltinFunctions {
           FunctionSignatureId.FN_PARSE_DATETIME, // parse_datetime
           FunctionSignatureId.FN_PARSE_TIME, // parse_time
           FunctionSignatureId.FN_PARSE_TIMESTAMP, // parse_timestamp
+          // FunctionSignatureId.FN_LAST_DAY_DATE, // last_day date
+          // FunctionSignatureId.FN_LAST_DAY_DATETIME, // last_day datetime
 
           // Math functions
           FunctionSignatureId.FN_ABS_INT64, // abs
-          // FunctionSignatureId.FN_ABS_DOUBLE, // abs
+          FunctionSignatureId.FN_ABS_DOUBLE, // abs
           FunctionSignatureId.FN_ABS_NUMERIC, // abs
           // FunctionSignatureId.FN_ABS_BIGNUMERIC, // abs
-          // FunctionSignatureId.FN_SIGN_INT64, // sign
-          // FunctionSignatureId.FN_SIGN_DOUBLE, // sign
-          // FunctionSignatureId.FN_SIGN_NUMERIC, // sign
+          FunctionSignatureId.FN_SIGN_INT64, // sign
+          FunctionSignatureId.FN_SIGN_DOUBLE, // sign
+          FunctionSignatureId.FN_SIGN_NUMERIC, // sign
           // FunctionSignatureId.FN_SIGN_BIGNUMERIC, // sign
 
-          // FunctionSignatureId.FN_ROUND_DOUBLE, // round(double) -> double
-          // FunctionSignatureId.FN_ROUND_NUMERIC, // round(numeric) -> numeric
+          FunctionSignatureId.FN_ROUND_DOUBLE, // round(double) -> double
+          FunctionSignatureId.FN_ROUND_NUMERIC, // round(numeric) -> numeric
           // FunctionSignatureId.FN_ROUND_BIGNUMERIC, // round(bignumeric) -> 
bignumeric
-          // FunctionSignatureId.FN_ROUND_WITH_DIGITS_DOUBLE, // round(double, 
int64) -> double
-          // FunctionSignatureId.FN_ROUND_WITH_DIGITS_NUMERIC, // 
round(numeric, int64) -> numeric
+          FunctionSignatureId.FN_ROUND_WITH_DIGITS_DOUBLE, // round(double, 
int64) -> double
+          FunctionSignatureId.FN_ROUND_WITH_DIGITS_NUMERIC, // round(numeric, 
int64) -> numeric
           // round(bignumeric, int64) -> bignumeric
           // FunctionSignatureId.FN_ROUND_WITH_DIGITS_BIGNUMERIC,
-          // FunctionSignatureId.FN_TRUNC_DOUBLE, // trunc(double) -> double
+          FunctionSignatureId.FN_TRUNC_DOUBLE, // trunc(double) -> double
           FunctionSignatureId.FN_TRUNC_NUMERIC, // trunc(numeric) -> numeric
           // FunctionSignatureId.FN_TRUNC_BIGNUMERIC, // trunc(bignumeric) -> 
bignumeric
-          // FunctionSignatureId.FN_TRUNC_WITH_DIGITS_DOUBLE, // trunc(double, 
int64) -> double
-          // FunctionSignatureId.FN_TRUNC_WITH_DIGITS_NUMERIC, // 
trunc(numeric, int64) -> numeric
+          FunctionSignatureId.FN_TRUNC_WITH_DIGITS_DOUBLE, // trunc(double, 
int64) -> double
+          FunctionSignatureId.FN_TRUNC_WITH_DIGITS_NUMERIC, // trunc(numeric, 
int64) -> numeric
           // trunc(bignumeric, int64) -> bignumeric
           // FunctionSignatureId.FN_TRUNC_WITH_DIGITS_BIGNUMERIC,
           FunctionSignatureId.FN_CEIL_DOUBLE, // ceil(double) -> double
@@ -312,52 +330,63 @@ class SupportedZetaSqlBuiltinFunctions {
           FunctionSignatureId.FN_MOD_INT64, // mod(int64, int64) -> int64
           FunctionSignatureId.FN_MOD_NUMERIC, // mod(numeric, numeric) -> 
numeric
           // FunctionSignatureId.FN_MOD_BIGNUMERIC, // mod(bignumeric, 
bignumeric) -> bignumeric
-          // FunctionSignatureId.FN_DIV_INT64, // div(int64, int64) -> int64
-          // FunctionSignatureId.FN_DIV_NUMERIC, // div(numeric, numeric) -> 
numeric
+          FunctionSignatureId.FN_DIV_INT64, // div(int64, int64) -> int64
+          FunctionSignatureId.FN_DIV_NUMERIC, // div(numeric, numeric) -> 
numeric
           // FunctionSignatureId.FN_DIV_BIGNUMERIC, // div(bignumeric, 
bignumeric) -> bignumeric
 
           FunctionSignatureId.FN_IS_INF, // is_inf
           FunctionSignatureId.FN_IS_NAN, // is_nan
-          // FunctionSignatureId.FN_IEEE_DIVIDE_DOUBLE, // ieee_divide
+          FunctionSignatureId.FN_IEEE_DIVIDE_DOUBLE, // ieee_divide
           FunctionSignatureId.FN_SAFE_DIVIDE_DOUBLE, // safe_divide
           FunctionSignatureId.FN_SAFE_DIVIDE_NUMERIC, // safe_divide
           // FunctionSignatureId.FN_SAFE_DIVIDE_BIGNUMERIC, // safe_divide
-          // FunctionSignatureId.FN_SAFE_ADD_INT64, // safe_add
-          // FunctionSignatureId.FN_SAFE_ADD_DOUBLE, // safe_add
-          // FunctionSignatureId.FN_SAFE_ADD_NUMERIC, // safe_add
+          FunctionSignatureId.FN_SAFE_ADD_INT64, // safe_add
+          FunctionSignatureId.FN_SAFE_ADD_DOUBLE, // safe_add
+          FunctionSignatureId.FN_SAFE_ADD_NUMERIC, // safe_add
           // FunctionSignatureId.FN_SAFE_ADD_BIGNUMERIC, // safe_add
-          // FunctionSignatureId.FN_SAFE_SUBTRACT_INT64, // safe_subtract
-          // FunctionSignatureId.FN_SAFE_SUBTRACT_DOUBLE, // safe_subtract
-          // FunctionSignatureId.FN_SAFE_SUBTRACT_NUMERIC, // safe_subtract
+          FunctionSignatureId.FN_SAFE_SUBTRACT_INT64, // safe_subtract
+          FunctionSignatureId.FN_SAFE_SUBTRACT_DOUBLE, // safe_subtract
+          FunctionSignatureId.FN_SAFE_SUBTRACT_NUMERIC, // safe_subtract
           // FunctionSignatureId.FN_SAFE_SUBTRACT_BIGNUMERIC, // safe_subtract
-          // FunctionSignatureId.FN_SAFE_MULTIPLY_INT64, // safe_multiply
-          // FunctionSignatureId.FN_SAFE_MULTIPLY_DOUBLE, // safe_multiply
-          // FunctionSignatureId.FN_SAFE_MULTIPLY_NUMERIC, // safe_multiply
+          FunctionSignatureId.FN_SAFE_MULTIPLY_INT64, // safe_multiply
+          FunctionSignatureId.FN_SAFE_MULTIPLY_DOUBLE, // safe_multiply
+          FunctionSignatureId.FN_SAFE_MULTIPLY_NUMERIC, // safe_multiply
           // FunctionSignatureId.FN_SAFE_MULTIPLY_BIGNUMERIC, // safe_multiply
-          // FunctionSignatureId.FN_SAFE_UNARY_MINUS_INT64, // safe_negate
-          // FunctionSignatureId.FN_SAFE_UNARY_MINUS_DOUBLE, // safe_negate
-          // FunctionSignatureId.FN_SAFE_UNARY_MINUS_NUMERIC, // safe_negate
+          FunctionSignatureId.FN_SAFE_UNARY_MINUS_INT64, // safe_negate
+          FunctionSignatureId.FN_SAFE_UNARY_MINUS_DOUBLE, // safe_negate
+          FunctionSignatureId.FN_SAFE_UNARY_MINUS_NUMERIC, // safe_negate
           // FunctionSignatureId.FN_SAFE_UNARY_MINUS_BIGNUMERIC, // safe_negate
 
           // FunctionSignatureId.FN_GREATEST, // greatest
           // FunctionSignatureId.FN_LEAST, // least
 
-          // FunctionSignatureId.FN_SQRT_DOUBLE, // sqrt
+          FunctionSignatureId.FN_SQRT_DOUBLE, // sqrt
+          FunctionSignatureId.FN_SQRT_NUMERIC, // sqrt(numeric) -> numeric
+          // FunctionSignatureId.FN_SQRT_BIGNUMERIC, // sqrt(bignumeric) -> 
bignumeric
           FunctionSignatureId.FN_POW_DOUBLE, // pow
           FunctionSignatureId.FN_POW_NUMERIC, // pow(numeric, numeric) -> 
numeric
           // FunctionSignatureId.FN_POW_BIGNUMERIC, // pow(bignumeric, 
bignumeric) -> bignumeric
           FunctionSignatureId.FN_EXP_DOUBLE, // exp
-          // FunctionSignatureId.FN_NATURAL_LOGARITHM_DOUBLE, // ln and log
-          // FunctionSignatureId.FN_DECIMAL_LOGARITHM_DOUBLE, // log10
+          FunctionSignatureId.FN_EXP_NUMERIC, // exp(numeric) -> numeric
+          // FunctionSignatureId.FN_EXP_BIGNUMERIC, // exp(bignumeric) -> 
bignumeric
+          FunctionSignatureId.FN_NATURAL_LOGARITHM_DOUBLE, // ln
+          FunctionSignatureId.FN_NATURAL_LOGARITHM_NUMERIC, // ln(numeric) -> 
numeric
+          // FunctionSignatureId.FN_NATURAL_LOGARITHM_BIGNUMERIC, // 
ln(bignumeric) -> bignumeric
+          FunctionSignatureId.FN_DECIMAL_LOGARITHM_DOUBLE, // log10
+          FunctionSignatureId.FN_DECIMAL_LOGARITHM_NUMERIC, // log10(numeric) 
-> numeric
+          // FunctionSignatureId.FN_DECIMAL_LOGARITHM_BIGNUMERIC, // 
log10(bignumeric) -> bignumeric
           FunctionSignatureId.FN_LOGARITHM_DOUBLE, // log
+          FunctionSignatureId.FN_LOGARITHM_NUMERIC, // log(numeric, numeric) 
-> numeric
+          // FunctionSignatureId.FN_LOGARITHM_BIGNUMERIC,//log(bignumeric, 
bignumeric) -> bignumeric
+
           FunctionSignatureId.FN_COS_DOUBLE, // cos
           FunctionSignatureId.FN_COSH_DOUBLE, // cosh
           FunctionSignatureId.FN_ACOS_DOUBLE, // acos
-          // FunctionSignatureId.FN_ACOSH_DOUBLE, // acosh
+          FunctionSignatureId.FN_ACOSH_DOUBLE, // acosh
           FunctionSignatureId.FN_SIN_DOUBLE, // sin
-          // FunctionSignatureId.FN_SINH_DOUBLE, // sinh
+          FunctionSignatureId.FN_SINH_DOUBLE, // sinh
           FunctionSignatureId.FN_ASIN_DOUBLE, // asin
-          // FunctionSignatureId.FN_ASINH_DOUBLE, // asinh
+          FunctionSignatureId.FN_ASINH_DOUBLE, // asinh
           FunctionSignatureId.FN_TAN_DOUBLE, // tan
           FunctionSignatureId.FN_TANH_DOUBLE, // tanh
           FunctionSignatureId.FN_ATAN_DOUBLE, // atan
@@ -412,18 +441,25 @@ class SupportedZetaSqlBuiltinFunctions {
           // Statistical aggregate functions.
           // FunctionSignatureId.FN_CORR, // corr
           // FunctionSignatureId.FN_CORR_NUMERIC, // corr
+          // FunctionSignatureId.FN_CORR_BIGNUMERIC, // corr
           // FunctionSignatureId.FN_COVAR_POP, // covar_pop
           // FunctionSignatureId.FN_COVAR_POP_NUMERIC, // covar_pop
+          // FunctionSignatureId.FN_COVAR_POP_BIGNUMERIC, // covar_pop
           // FunctionSignatureId.FN_COVAR_SAMP, // covar_samp
           // FunctionSignatureId.FN_COVAR_SAMP_NUMERIC, // covar_samp
+          // FunctionSignatureId.FN_COVAR_SAMP_BIGNUMERIC, // covar_samp
           // FunctionSignatureId.FN_STDDEV_POP, // stddev_pop
           // FunctionSignatureId.FN_STDDEV_POP_NUMERIC, // stddev_pop
+          // FunctionSignatureId.FN_STDDEV_POP_BIGNUMERIC, // stddev_pop
           // FunctionSignatureId.FN_STDDEV_SAMP, // stddev_samp
           // FunctionSignatureId.FN_STDDEV_SAMP_NUMERIC, // stddev_samp
+          // FunctionSignatureId.FN_STDDEV_SAMP_BIGNUMERIC, // stddev_samp
           // FunctionSignatureId.FN_VAR_POP, // var_pop
           // FunctionSignatureId.FN_VAR_POP_NUMERIC, // var_pop
+          // FunctionSignatureId.FN_VAR_POP_BIGNUMERIC, // var_pop
           // FunctionSignatureId.FN_VAR_SAMP, // var_samp
           // FunctionSignatureId.FN_VAR_SAMP_NUMERIC, // var_samp
+          // FunctionSignatureId.FN_VAR_SAMP_BIGNUMERIC, // var_samp
 
           // FunctionSignatureId.FN_COUNTIF, // countif
 
@@ -455,8 +491,10 @@ class SupportedZetaSqlBuiltinFunctions {
           // FunctionSignatureId.FN_NTH_VALUE, // nth_value
           // FunctionSignatureId.FN_PERCENTILE_CONT, // percentile_cont
           // FunctionSignatureId.FN_PERCENTILE_CONT_NUMERIC, // percentile_cont
+          // FunctionSignatureId.FN_PERCENTILE_CONT_BIGNUMERIC, // 
percentile_cont
           // FunctionSignatureId.FN_PERCENTILE_DISC, // percentile_disc
           // FunctionSignatureId.FN_PERCENTILE_DISC_NUMERIC, // percentile_disc
+          // FunctionSignatureId.FN_PERCENTILE_DISC_BIGNUMERIC, // 
percentile_disc
 
           // Misc functions.
           // FunctionSignatureId.FN_BIT_CAST_INT64_TO_INT64, // 
bit_cast_to_int64(int64)
@@ -474,16 +512,21 @@ class SupportedZetaSqlBuiltinFunctions {
 
           // FunctionSignatureId.FN_RANGE_BUCKET, //  range_bucket(T, 
array<T>) -> int64
 
-          // FunctionSignatureId.FN_RAND, // rand() -> double
+          // FunctionSignatureId.FN_RAND // rand() -> double
           // FunctionSignatureId.FN_GENERATE_UUID, // generate_uuid() -> string
 
           FunctionSignatureId.FN_JSON_EXTRACT, // json_extract(string, string)
+          // FunctionSignatureId.FN_JSON_EXTRACT_JSON, // json_extract(json, 
string) -> json
           FunctionSignatureId.FN_JSON_EXTRACT_SCALAR, // 
json_extract_scalar(string, string)
+          // json_extract_scalar(json, string) -> string
+          // FunctionSignatureId.FN_JSON_EXTRACT_SCALAR_JSON,
           // json_extract_array(string[, string]) -> array
           FunctionSignatureId.FN_JSON_EXTRACT_ARRAY,
           FunctionSignatureId.FN_TO_JSON_STRING, // to_json_string(any[, 
bool]) -> string
-          FunctionSignatureId.FN_JSON_QUERY, // json_query(string, string)
-          FunctionSignatureId.FN_JSON_VALUE // json_value(string, string)
+          FunctionSignatureId.FN_JSON_QUERY, // json_query(string, string) -> 
string
+          // FunctionSignatureId.FN_JSON_QUERY_JSON, // json_query(json, 
string) -> json
+          FunctionSignatureId.FN_JSON_VALUE // json_value(string, string) -> 
string
+          // FunctionSignatureId.FN_JSON_VALUE_JSON, // json_value(json, 
string) -> json
 
           // Net functions. These are all found in the "net.*" namespace.
           // FunctionSignatureId.FN_NET_FORMAT_IP,
@@ -594,9 +637,11 @@ class SupportedZetaSqlBuiltinFunctions {
           // FunctionSignatureId.FN_ST_UNION_AGG,
           // FunctionSignatureId.FN_ST_ACCUM,
           // FunctionSignatureId.FN_ST_CENTROID_AGG,
+          // FunctionSignatureId.FN_ST_NEAREST_NEIGHBORS,
           // Other geography functions
           // FunctionSignatureId.FN_ST_X,
           // FunctionSignatureId.FN_ST_Y,
+          // FunctionSignatureId.FN_ST_CLUSTERDBSCAN,
 
           // Array functions.
           // FunctionSignatureId.FN_FLATTEN, // flatten(array path) -> array
@@ -612,5 +657,8 @@ class SupportedZetaSqlBuiltinFunctions {
           // FunctionSignatureId.FN_MAKE_ARRAY, // $make_array
           // FunctionSignatureId.FN_SAFE_ARRAY_AT_OFFSET, // 
$safe_array_at_offset
           // FunctionSignatureId.FN_SAFE_ARRAY_AT_ORDINAL, // 
$safe_array_at_ordinal
+          // FunctionSignatureId.FN_ARRAY_IS_DISTINCT, // 
array_is_distinct(array) -> bool
+          // FunctionSignatureId.FN_PROTO_MAP_AT_KEY, // $proto_map_at_key
+          // FunctionSignatureId.FN_SAFE_PROTO_MAP_AT_KEY, // 
$safe_proto_map_at_key
           );
 }
diff --git 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
index 8ddf628..e40f69b 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlCalciteTranslationUtils.java
@@ -208,24 +208,23 @@ public final class ZetaSqlCalciteTranslationUtils {
         return rexBuilder.makeExactLiteral(
             new BigDecimal(value.getInt64Value()), toCalciteType(type, false, 
rexBuilder));
       case TYPE_DOUBLE:
-        // Cannot simply call makeApproxLiteral() for ZetaSQL DOUBLE type 
because positive infinity,
-        // negative infinity and NaN cannot be directly converted to 
BigDecimal. So we create three
-        // wrapper functions here for these three cases such that we can later 
recognize it and
-        // customize its unparsing in BeamBigQuerySqlDialect.
+        // Cannot simply call makeApproxLiteral() because +inf, -inf, and NaN 
cannot be represented
+        // as BigDecimal. So we create wrapper functions here for these three 
cases such that we can
+        // later recognize it and customize its unparsing in 
BeamBigQuerySqlDialect.
         double val = value.getDoubleValue();
         String wrapperFun = null;
         if (val == Double.POSITIVE_INFINITY) {
-          wrapperFun = BeamBigQuerySqlDialect.DOUBLE_POSITIVE_INF_FUNCTION;
+          wrapperFun = BeamBigQuerySqlDialect.DOUBLE_POSITIVE_INF_WRAPPER;
         } else if (val == Double.NEGATIVE_INFINITY) {
-          wrapperFun = BeamBigQuerySqlDialect.DOUBLE_NEGATIVE_INF_FUNCTION;
+          wrapperFun = BeamBigQuerySqlDialect.DOUBLE_NEGATIVE_INF_WRAPPER;
         } else if (Double.isNaN(val)) {
-          wrapperFun = BeamBigQuerySqlDialect.DOUBLE_NAN_FUNCTION;
+          wrapperFun = BeamBigQuerySqlDialect.DOUBLE_NAN_WRAPPER;
         }
 
         RelDataType returnType = toCalciteType(type, false, rexBuilder);
         if (wrapperFun == null) {
           return rexBuilder.makeApproxLiteral(new BigDecimal(val), returnType);
-        } else if 
(BeamBigQuerySqlDialect.DOUBLE_NAN_FUNCTION.equals(wrapperFun)) {
+        } else if 
(BeamBigQuerySqlDialect.DOUBLE_NAN_WRAPPER.equals(wrapperFun)) {
           // TODO[BEAM-10550]: Update the temporary workaround below after 
vendored Calcite version.
           // Adding an additional random parameter for the wrapper function of 
NaN, to avoid
           // triggering Calcite operation simplification. (e.g. 'NaN == NaN' 
would be simplify to
@@ -248,14 +247,14 @@ public final class ZetaSqlCalciteTranslationUtils {
       case TYPE_BYTES:
         return rexBuilder.makeBinaryLiteral(new 
ByteString(value.getBytesValue().toByteArray()));
       case TYPE_NUMERIC:
-        // Cannot simply call makeExactLiteral() for ZetaSQL NUMERIC type 
because later it will be
-        // unparsed to the string representation of the BigDecimal itself 
(e.g. "SELECT NUMERIC '0'"
-        // will be unparsed to "SELECT 0E-9"), and Calcite does not allow 
customize unparsing of
-        // SqlNumericLiteral. So we create a wrapper function here such that 
we can later recognize
-        // it and customize its unparsing in BeamBigQuerySqlDialect.
+        // Cannot simply call makeExactLiteral() because later it will be 
unparsed to the string
+        // representation of the BigDecimal itself (e.g. "SELECT NUMERIC '0'" 
will be unparsed to
+        // "SELECT 0E-9"), and Calcite does not allow customize unparsing of 
SqlNumericLiteral.
+        // So we create a wrapper function here such that we can later 
recognize it and customize
+        // its unparsing in BeamBigQuerySqlDialect.
         return rexBuilder.makeCall(
             SqlOperators.createZetaSqlFunction(
-                BeamBigQuerySqlDialect.NUMERIC_LITERAL_FUNCTION,
+                BeamBigQuerySqlDialect.NUMERIC_LITERAL_WRAPPER,
                 toCalciteType(type, false, rexBuilder).getSqlTypeName()),
             rexBuilder.makeExactLiteral(
                 value.getNumericValue(), toCalciteType(type, false, 
rexBuilder)));
diff --git 
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
index fe07193..083b236 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
@@ -203,21 +203,6 @@ public class ZetaSqlDialectSpecTest extends 
ZetaSqlTestBase {
   }
 
   @Test
-  public void testFloat() {
-    String sql = "SELECT 3.0";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    final Schema schema = Schema.builder().addNullableField("ColA", 
FieldType.DOUBLE).build();
-
-    
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(3.0).build());
-
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
   public void testStringLiterals() {
     String sql = "SELECT '\"America/Los_Angeles\"\\n'";
 
@@ -802,20 +787,6 @@ public class ZetaSqlDialectSpecTest extends 
ZetaSqlTestBase {
   }
 
   @Test
-  public void testMod() {
-    String sql = "SELECT MOD(4, 2)";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    final Schema schema = Schema.builder().addInt64Field("field1").build();
-
-    
PAssert.that(stream).containsInAnyOrder(Row.withSchema(schema).addValues(0L).build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
   public void testSimpleUnionAll() {
     String sql =
         "SELECT CAST (1243 as INT64), "
@@ -2165,376 +2136,6 @@ public class ZetaSqlDialectSpecTest extends 
ZetaSqlTestBase {
     
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
   }
 
-  /////////////////////////////////////////////////////////////////////////////
-  // DOUBLE INF and NAN tests
-  /////////////////////////////////////////////////////////////////////////////
-
-  @Test
-  public void testDoubleINF() {
-    String sql = "SELECT CAST('+inf' AS FLOAT64), CAST('-inf' AS FLOAT64)";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            Row.withSchema(
-                    Schema.builder()
-                        .addDoubleField("f_double1")
-                        .addDoubleField("f_double2")
-                        .build())
-                .addValues(Double.POSITIVE_INFINITY)
-                .addValues(Double.NEGATIVE_INFINITY)
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
-  public void testDoubleINFEQ() {
-    String sql =
-        "SELECT CAST('+inf' AS FLOAT64) = CAST('+inf' AS FLOAT64), CAST('+inf' 
AS FLOAT64) = CAST('-inf' AS FLOAT64)";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            Row.withSchema(
-                    Schema.builder()
-                        .addBooleanField("f_boolean1")
-                        .addBooleanField("f_boolean2")
-                        .build())
-                .addValues(true)
-                .addValues(false)
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
-  public void testDoubleNAN() {
-    String sql = "SELECT CAST('NaN' AS FLOAT64)";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            Row.withSchema(Schema.builder().addDoubleField("f_double").build())
-                .addValues(Double.NaN)
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
-  public void testDoubleNaNEQ() {
-    String sql = "SELECT CAST('NaN' AS FLOAT64) = CAST('NaN' AS FLOAT64)";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            
Row.withSchema(Schema.builder().addBooleanField("f_boolean").build())
-                .addValues(false)
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
-  public void testDoubleIsINF() {
-    String sql =
-        "SELECT IS_INF(CAST('+inf' AS FLOAT64)), IS_INF(CAST('-inf' AS 
FLOAT64)), IS_INF(3.0)";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            Row.withSchema(
-                    Schema.builder()
-                        .addBooleanField("f_boolean1")
-                        .addBooleanField("f_boolean2")
-                        .addBooleanField("f_boolean3")
-                        .build())
-                .addValues(true)
-                .addValues(true)
-                .addValues(false)
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
-  public void testDoubleIsNAN() {
-    String sql = "SELECT IS_NAN(CAST('NaN' AS FLOAT64)), IS_NAN(3.0)";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            Row.withSchema(
-                    Schema.builder()
-                        .addBooleanField("f_boolean1")
-                        .addBooleanField("f_boolean2")
-                        .build())
-                .addValues(true)
-                .addValues(false)
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  /////////////////////////////////////////////////////////////////////////////
-  // NUMERIC type tests
-  /////////////////////////////////////////////////////////////////////////////
-
-  @Test
-  public void testNumericLiteral() {
-    String sql =
-        "SELECT NUMERIC '0', "
-            + "NUMERIC '123456', "
-            + "NUMERIC '-3.14', "
-            + "NUMERIC '-0.54321', "
-            + "NUMERIC '1.23456e05', "
-            + "NUMERIC '-9.876e-3', "
-            // min value for ZetaSQL NUMERIC type
-            + "NUMERIC '-99999999999999999999999999999.999999999', "
-            // max value for ZetaSQL NUMERIC type
-            + "NUMERIC '99999999999999999999999999999.999999999'";
-    ;
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            Row.withSchema(
-                    Schema.builder()
-                        .addDecimalField("f_numeric1")
-                        .addDecimalField("f_numeric2")
-                        .addDecimalField("f_numeric3")
-                        .addDecimalField("f_numeric4")
-                        .addDecimalField("f_numeric5")
-                        .addDecimalField("f_numeric6")
-                        .addDecimalField("f_numeric7")
-                        .addDecimalField("f_numeric8")
-                        .build())
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("0"))
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123456"))
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-3.14"))
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.54321"))
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123456"))
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.009876"))
-                .addValues(
-                    ZetaSqlTypesUtils.bigDecimalAsNumeric(
-                        "-99999999999999999999999999999.999999999"))
-                .addValues(
-                    ZetaSqlTypesUtils.bigDecimalAsNumeric(
-                        "99999999999999999999999999999.999999999"))
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
-  public void testNumericColumn() {
-    String sql = "SELECT numeric_field FROM table_with_numeric";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123.4567"))
-                .build(),
-            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("765.4321"))
-                .build(),
-            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-555.5555"))
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
-  public void testUnaryMinusNumeric() {
-    String sql = "SELECT - NUMERIC '1.23456e05'";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-123456"))
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
-  public void testAddNumeric() {
-    String sql = "SELECT NUMERIC '1.23456e05' + NUMERIC '9.876e-3'";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
-                
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123456.009876"))
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
-  public void testSubNumeric() {
-    String sql = "SELECT NUMERIC '1.23456e05' - NUMERIC '-9.876e-3'";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
-                
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123456.009876"))
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
-  public void testMultiNumeric() {
-    String sql = "SELECT NUMERIC '1.23e02' * NUMERIC '-1.001e-3'";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.123123"))
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
-  public void testDivNumeric() {
-    String sql = "SELECT NUMERIC '-1.23123e-1' / NUMERIC '-1.001e-3'";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123"))
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
-  public void testModNumeric() {
-    String sql = "SELECT MOD(NUMERIC '1.23456e05', NUMERIC '5')";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("1"))
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
-  public void testFloorNumeric() {
-    String sql = "SELECT FLOOR(NUMERIC '1.23456e04'), FLOOR(NUMERIC 
'-1.23456e04')";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            Row.withSchema(
-                    Schema.builder()
-                        .addDecimalField("f_numeric1")
-                        .addDecimalField("f_numeric2")
-                        .build())
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("12345"))
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-12346"))
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
-  public void testCeilNumeric() {
-    String sql = "SELECT CEIL(NUMERIC '1.23456e04'), CEIL(NUMERIC 
'-1.23456e04')";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            Row.withSchema(
-                    Schema.builder()
-                        .addDecimalField("f_numeric1")
-                        .addDecimalField("f_numeric2")
-                        .build())
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("12346"))
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-12345"))
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
-  @Ignore("https://jira.apache.org/jira/browse/BEAM-10459";)
-  public void testSumNumeric() {
-    String sql = "SELECT SUM(numeric_field) FROM table_with_numeric";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("333.3333"))
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
-  @Test
-  @Ignore("https://jira.apache.org/jira/browse/BEAM-10459";)
-  public void testAvgNumeric() {
-    String sql = "SELECT AVG(numeric_field) FROM table_with_numeric";
-
-    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
-    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
-    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
-
-    PAssert.that(stream)
-        .containsInAnyOrder(
-            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
-                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("111.1111"))
-                .build());
-    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
-  }
-
   @Test
   public void testMultipleSelectStatementsThrowsException() {
     String sql = "SELECT 1; SELECT 2;";
diff --git 
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlMathFunctionsTest.java
 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlMathFunctionsTest.java
new file mode 100644
index 0000000..c2ee278
--- /dev/null
+++ 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlMathFunctionsTest.java
@@ -0,0 +1,1032 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.zetasql;
+
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
+import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for ZetaSQL Math functions (on INT64, DOUBLE, NUMERIC types). */
+@RunWith(JUnit4.class)
+public class ZetaSqlMathFunctionsTest extends ZetaSqlTestBase {
+
+  @Rule public transient TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Before
+  public void setUp() {
+    initialize();
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+  // INT64 type tests
+  /////////////////////////////////////////////////////////////////////////////
+
+  @Test
+  public void testArithmeticOperatorsInt64() {
+    String sql = "SELECT -1, 1 + 2, 1 - 2, 1 * 2, 1 / 2";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addInt64Field("f_int64_1")
+                        .addInt64Field("f_int64_2")
+                        .addInt64Field("f_int64_3")
+                        .addInt64Field("f_int64_4")
+                        .addDoubleField("f_double")
+                        .build())
+                .addValues(-1L, 3L, -1L, 2L, 0.5)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testAbsInt64() {
+    String sql = "SELECT ABS(1), ABS(-1)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    
Schema.builder().addInt64Field("f_int64_1").addInt64Field("f_int64_2").build())
+                .addValues(1L, 1L)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testSignInt64() {
+    String sql = "SELECT SIGN(0), SIGN(5), SIGN(-5)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addInt64Field("f_int64_1")
+                        .addInt64Field("f_int64_2")
+                        .addInt64Field("f_int64_3")
+                        .build())
+                .addValues(0L, 1L, -1L)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testModInt64() {
+    String sql = "SELECT MOD(4, 2)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addInt64Field("f_int64").build())
+                .addValues(0L)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDivInt64() {
+    String sql = "SELECT DIV(1, 2), DIV(2, 1)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    
Schema.builder().addInt64Field("f_int64_1").addInt64Field("f_int64_2").build())
+                .addValues(0L, 2L)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testSafeArithmeticFunctionsInt64() {
+    String sql =
+        "SELECT SAFE_ADD(9223372036854775807, 1), "
+            + "SAFE_SUBTRACT(-9223372036854775808, 1), "
+            + "SAFE_MULTIPLY(9223372036854775807, 2), "
+            + "SAFE_DIVIDE(1, 0), "
+            + "SAFE_NEGATE(-9223372036854775808)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addNullableField("f_int64_1", Schema.FieldType.INT64)
+                        .addNullableField("f_int64_2", Schema.FieldType.INT64)
+                        .addNullableField("f_int64_3", Schema.FieldType.INT64)
+                        .addNullableField("f_int64_4", Schema.FieldType.INT64)
+                        .addNullableField("f_int64_5", Schema.FieldType.INT64)
+                        .build())
+                .addValues(null, null, null, null, null)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+  // DOUBLE (FLOAT64) type tests
+  /////////////////////////////////////////////////////////////////////////////
+
+  @Test
+  public void testDoubleLiteral() {
+    String sql =
+        "SELECT 3.0, CAST('+inf' AS FLOAT64), CAST('-inf' AS FLOAT64), 
CAST('NaN' AS FLOAT64)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDoubleField("f_double1")
+                        .addDoubleField("f_double2")
+                        .addDoubleField("f_double3")
+                        .addDoubleField("f_double4")
+                        .build())
+                .addValues(3.0, Double.POSITIVE_INFINITY, 
Double.NEGATIVE_INFINITY, Double.NaN)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testArithmeticOperatorsDouble() {
+    String sql = "SELECT -1.5, 1.5 + 2.5, 1.5 - 2.5, 1.5 * 2.5, 1.5 / 2.5";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDoubleField("f_double1")
+                        .addDoubleField("f_double2")
+                        .addDoubleField("f_double3")
+                        .addDoubleField("f_double4")
+                        .addDoubleField("f_double5")
+                        .build())
+                .addValues(-1.5, 4.0, -1.0, 3.75, 0.6)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testEqualsInf() {
+    String sql =
+        "SELECT CAST('+inf' AS FLOAT64) = CAST('+inf' AS FLOAT64), "
+            + "CAST('+inf' AS FLOAT64) = CAST('-inf' AS FLOAT64)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addBooleanField("f_boolean1")
+                        .addBooleanField("f_boolean2")
+                        .build())
+                .addValues(true, false)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testEqualsNaN() {
+    String sql = "SELECT CAST('NaN' AS FLOAT64) = CAST('NaN' AS FLOAT64)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            
Row.withSchema(Schema.builder().addBooleanField("f_boolean").build())
+                .addValues(false)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testAbsDouble() {
+    String sql = "SELECT ABS(1.5), ABS(-1.0), ABS(CAST('NaN' AS FLOAT64))";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDoubleField("f_double1")
+                        .addDoubleField("f_double2")
+                        .addDoubleField("f_double3")
+                        .build())
+                .addValues(1.5, 1.0, Double.NaN)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testSignDouble() {
+    String sql = "SELECT SIGN(-0.0), SIGN(1.5), SIGN(-1.5), SIGN(CAST('NaN' AS 
FLOAT64))";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDoubleField("f_double1")
+                        .addDoubleField("f_double2")
+                        .addDoubleField("f_double3")
+                        .addDoubleField("f_double4")
+                        .build())
+                .addValues(0.0, 1.0, -1.0, Double.NaN)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testRoundDouble() {
+    String sql = "SELECT ROUND(1.23), ROUND(-1.27, 1)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDoubleField("f_double1")
+                        .addDoubleField("f_double2")
+                        .build())
+                .addValues(1.0, -1.3)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testTruncDouble() {
+    String sql = "SELECT TRUNC(1.23), TRUNC(-1.27, 1)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDoubleField("f_double1")
+                        .addDoubleField("f_double2")
+                        .build())
+                .addValues(1.0, -1.2)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testCeilDouble() {
+    String sql = "SELECT CEIL(1.2), CEIL(-1.2)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDoubleField("f_double1")
+                        .addDoubleField("f_double2")
+                        .build())
+                .addValues(2.0, -1.0)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testFloorDouble() {
+    String sql = "SELECT FLOOR(1.2), FLOOR(-1.2)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDoubleField("f_double1")
+                        .addDoubleField("f_double2")
+                        .build())
+                .addValues(1.0, -2.0)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testIsInf() {
+    String sql =
+        "SELECT IS_INF(CAST('+inf' AS FLOAT64)), IS_INF(CAST('-inf' AS 
FLOAT64)), IS_INF(3.0)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addBooleanField("f_boolean1")
+                        .addBooleanField("f_boolean2")
+                        .addBooleanField("f_boolean3")
+                        .build())
+                .addValues(true, true, false)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testIsNaN() {
+    String sql = "SELECT IS_NAN(CAST('NaN' AS FLOAT64)), IS_NAN(3.0)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addBooleanField("f_boolean1")
+                        .addBooleanField("f_boolean2")
+                        .build())
+                .addValues(true, false)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testIeeeDivide() {
+    String sql = "SELECT IEEE_DIVIDE(1.0, 0.0)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addDoubleField("f_double").build())
+                .addValues(Double.POSITIVE_INFINITY)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testSafeDivide() {
+    String sql = "SELECT SAFE_DIVIDE(1.0, 0.0)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder().addNullableField("f_double", 
Schema.FieldType.DOUBLE).build())
+                .addValue(null)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testSqrtDouble() {
+    String sql = "SELECT SQRT(4.0)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addDoubleField("f_double").build())
+                .addValues(2.0)
+                .build());
+
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testPowDouble() {
+    String sql = "SELECT POW(2.0, 3.0)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addDoubleField("f_double").build())
+                .addValues(8.0)
+                .build());
+
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testExpDouble() {
+    String sql = "SELECT EXP(2.0)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addDoubleField("f_double").build())
+                .addValues(7.38905609893065)
+                .build());
+
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testLnDouble() {
+    String sql = "SELECT LN(7.38905609893065)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addDoubleField("f_double").build())
+                .addValues(2.0)
+                .build());
+
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testLog10Double() {
+    String sql = "SELECT LOG10(100.0)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addDoubleField("f_double").build())
+                .addValues(2.0)
+                .build());
+
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testLogDouble() {
+    String sql = "SELECT LOG(2.25, 1.5)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addDoubleField("f_double").build())
+                .addValues(2.0)
+                .build());
+
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testTrigonometricFunctions() {
+    String sql =
+        "SELECT COS(0.0), COSH(0.0), ACOS(1.0), ACOSH(1.0), "
+            + "SIN(0.0), SINH(0.0), ASIN(0.0), ASINH(0.0), "
+            + "TAN(0.0), TANH(0.0), ATAN(0.0), ATANH(0.0), ATAN2(0.0, 0.0)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDoubleField("f_double1")
+                        .addDoubleField("f_double2")
+                        .addDoubleField("f_double3")
+                        .addDoubleField("f_double4")
+                        .addDoubleField("f_double5")
+                        .addDoubleField("f_double6")
+                        .addDoubleField("f_double7")
+                        .addDoubleField("f_double8")
+                        .addDoubleField("f_double9")
+                        .addDoubleField("f_double10")
+                        .addDoubleField("f_double11")
+                        .addDoubleField("f_double12")
+                        .addDoubleField("f_double13")
+                        .build())
+                .addValues(1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 
0.0, 0.0, 0.0)
+                .build());
+
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+  // NUMERIC type tests
+  /////////////////////////////////////////////////////////////////////////////
+
+  @Test
+  public void testNumericLiteral() {
+    String sql =
+        "SELECT NUMERIC '0', "
+            + "NUMERIC '123456', "
+            + "NUMERIC '-3.14', "
+            + "NUMERIC '-0.54321', "
+            + "NUMERIC '1.23456e05', "
+            + "NUMERIC '-9.876e-3', "
+            // min value for ZetaSQL NUMERIC type
+            + "NUMERIC '-99999999999999999999999999999.999999999', "
+            // max value for ZetaSQL NUMERIC type
+            + "NUMERIC '99999999999999999999999999999.999999999'";
+    ;
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDecimalField("f_numeric1")
+                        .addDecimalField("f_numeric2")
+                        .addDecimalField("f_numeric3")
+                        .addDecimalField("f_numeric4")
+                        .addDecimalField("f_numeric5")
+                        .addDecimalField("f_numeric6")
+                        .addDecimalField("f_numeric7")
+                        .addDecimalField("f_numeric8")
+                        .build())
+                .addValues(
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("0"),
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("123456"),
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("-3.14"),
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.54321"),
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("123456"),
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.009876"),
+                    ZetaSqlTypesUtils.NUMERIC_MIN_VALUE,
+                    ZetaSqlTypesUtils.NUMERIC_MAX_VALUE)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testNumericColumn() {
+    String sql = "SELECT numeric_field FROM table_with_numeric";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    final Schema schema = 
Schema.builder().addDecimalField("f_numeric").build();
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(schema)
+                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("123.4567"))
+                .build(),
+            Row.withSchema(schema)
+                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("765.4321"))
+                .build(),
+            Row.withSchema(schema)
+                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("-555.5555"))
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testArithmeticOperatorsNumeric() {
+    String sql =
+        "SELECT - NUMERIC '1.23456e05', "
+            + "NUMERIC '1.23456e05' + NUMERIC '9.876e-3', "
+            + "NUMERIC '1.23456e05' - NUMERIC '-9.876e-3', "
+            + "NUMERIC '1.23e02' * NUMERIC '-1.001e-3', "
+            + "NUMERIC '-1.23123e-1' / NUMERIC '-1.001e-3', ";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDecimalField("f_numeric1")
+                        .addDecimalField("f_numeric2")
+                        .addDecimalField("f_numeric3")
+                        .addDecimalField("f_numeric4")
+                        .addDecimalField("f_numeric5")
+                        .build())
+                .addValues(
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("-123456"),
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("123456.009876"),
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("123456.009876"),
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("-0.123123"),
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("123"))
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testAbsNumeric() {
+    String sql = "SELECT ABS(NUMERIC '1.23456e04'), ABS(NUMERIC 
'-1.23456e04')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDecimalField("f_numeric1")
+                        .addDecimalField("f_numeric2")
+                        .build())
+                .addValues(
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("12345.6"),
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("12345.6"))
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testSignNumeric() {
+    String sql = "SELECT SIGN(NUMERIC '0'), SIGN(NUMERIC '1.23e01'), 
SIGN(NUMERIC '-1.23e01')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDecimalField("f_numeric1")
+                        .addDecimalField("f_numeric2")
+                        .addDecimalField("f_numeric3")
+                        .build())
+                .addValues(
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("0"),
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("1"),
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("-1"))
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testRoundNumeric() {
+    String sql = "SELECT ROUND(NUMERIC '1.23456e04'), ROUND(NUMERIC 
'-1.234567e04', 1)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDecimalField("f_numeric1")
+                        .addDecimalField("f_numeric2")
+                        .build())
+                .addValues(
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("12346"),
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("-12345.7"))
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testTruncNumeric() {
+    String sql = "SELECT TRUNC(NUMERIC '1.23456e04'), TRUNC(NUMERIC 
'-1.234567e04', 1)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDecimalField("f_numeric1")
+                        .addDecimalField("f_numeric2")
+                        .build())
+                .addValues(
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("12345"),
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("-12345.6"))
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testCeilNumeric() {
+    String sql = "SELECT CEIL(NUMERIC '1.23456e04'), CEIL(NUMERIC 
'-1.23456e04')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDecimalField("f_numeric1")
+                        .addDecimalField("f_numeric2")
+                        .build())
+                .addValues(
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("12346"),
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("-12345"))
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testFloorNumeric() {
+    String sql = "SELECT FLOOR(NUMERIC '1.23456e04'), FLOOR(NUMERIC 
'-1.23456e04')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDecimalField("f_numeric1")
+                        .addDecimalField("f_numeric2")
+                        .build())
+                .addValues(
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("12345"),
+                    ZetaSqlTypesUtils.bigDecimalAsNumeric("-12346"))
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testModNumeric() {
+    String sql = "SELECT MOD(NUMERIC '1.23456e05', NUMERIC '5')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("1"))
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDivNumeric() {
+    String sql = "SELECT DIV(NUMERIC '1.23456e05', NUMERIC '5')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("24691"))
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testSafeArithmeticFunctionsNumeric() {
+    String sql =
+        "SELECT SAFE_ADD(NUMERIC '99999999999999999999999999999.999999999', 
NUMERIC '1'), "
+            + "SAFE_SUBTRACT(NUMERIC 
'-99999999999999999999999999999.999999999', NUMERIC '1'), "
+            + "SAFE_MULTIPLY(NUMERIC 
'99999999999999999999999999999.999999999', NUMERIC '2'), "
+            + "SAFE_DIVIDE(NUMERIC '1.23456e05', NUMERIC '0'), "
+            + "SAFE_NEGATE(NUMERIC '99999999999999999999999999999.999999999')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addNullableField("f_numeric1", 
Schema.FieldType.DECIMAL)
+                        .addNullableField("f_numeric2", 
Schema.FieldType.DECIMAL)
+                        .addNullableField("f_numeric3", 
Schema.FieldType.DECIMAL)
+                        .addNullableField("f_numeric4", 
Schema.FieldType.DECIMAL)
+                        .addNullableField("f_numeric5", 
Schema.FieldType.DECIMAL)
+                        .build())
+                .addValues(null, null, null, null, 
ZetaSqlTypesUtils.NUMERIC_MIN_VALUE)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testSqrtNumeric() {
+    String sql = "SELECT SQRT(NUMERIC '4')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("2"))
+                .build());
+
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testPowNumeric() {
+    String sql = "SELECT POW(NUMERIC '2', NUMERIC '3')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("8"))
+                .build());
+
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testExpNumeric() {
+    String sql = "SELECT EXP(NUMERIC '2')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+                
.addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("7.389056099"))
+                .build());
+
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testLnNumeric() {
+    String sql = "SELECT LN(NUMERIC '7.389056099')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("2"))
+                .build());
+
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testLog10Numeric() {
+    String sql = "SELECT LOG10(NUMERIC '100')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("2"))
+                .build());
+
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testLogNumeric() {
+    String sql = "SELECT LOG(NUMERIC '2.25', NUMERIC '1.5')";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("2"))
+                .build());
+
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  @Ignore("[BEAM-10459] Aggregation functions on NUMERIC is not supported yet")
+  public void testSumNumeric() {
+    String sql = "SELECT SUM(numeric_field) FROM table_with_numeric";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("333.3333"))
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  @Ignore("[BEAM-10459] Aggregation functions on NUMERIC is not supported yet")
+  public void testAvgNumeric() {
+    String sql = "SELECT AVG(numeric_field) FROM table_with_numeric";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            
Row.withSchema(Schema.builder().addDecimalField("f_numeric").build())
+                .addValues(ZetaSqlTypesUtils.bigDecimalAsNumeric("111.1111"))
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+}
diff --git 
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTypesUtils.java
 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTypesUtils.java
index 3d8d478..02893b7 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTypesUtils.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlTypesUtils.java
@@ -24,6 +24,11 @@ import org.apache.beam.sdk.annotations.Internal;
 @Internal
 public class ZetaSqlTypesUtils {
 
+  public static final BigDecimal NUMERIC_MAX_VALUE =
+      bigDecimalAsNumeric("99999999999999999999999999999.999999999");
+  public static final BigDecimal NUMERIC_MIN_VALUE =
+      bigDecimalAsNumeric("-99999999999999999999999999999.999999999");
+
   private ZetaSqlTypesUtils() {}
 
   /**

Reply via email to