This is an automated email from the ASF dual-hosted git repository.

robinyqiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 0bff5fc  Support ZetaSQL DOUBLE +inf, -inf and NaN.
     new 1e569fa  Merge pull request #12292 from ZijieSong946/DoubleBugFixed
0bff5fc is described below

commit 0bff5fca7bf533b39bc04a97a48c208bed82c5a9
Author: zijiesong <[email protected]>
AuthorDate: Wed Jul 22 15:59:45 2020 -0500

    Support ZetaSQL DOUBLE +inf, -inf and NaN.
---
 .../provider/bigquery/BeamBigQuerySqlDialect.java  |  26 ++++-
 .../zetasql/SupportedZetaSqlBuiltinFunctions.java  |   4 +-
 .../zetasql/translation/ExpressionConverter.java   |  44 +++++--
 .../sql/zetasql/ZetaSqlDialectSpecTest.java        | 127 ++++++++++++++++++++-
 4 files changed, 187 insertions(+), 14 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
index c791e1a..d606b8b 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/bigquery/BeamBigQuerySqlDialect.java
@@ -87,6 +87,15 @@ public class BeamBigQuerySqlDialect extends 
BigQuerySqlDialect {
           .put("$extract_time", "TIME")
           .put("$extract_datetime", "DATETIME")
           .build();
+  public static final String DOUBLE_POSITIVE_INF_FUNCTION = 
"double_positive_inf";
+  public static final String DOUBLE_NEGATIVE_INF_FUNCTION = 
"double_negative_inf";
+  public static final String DOUBLE_NAN_FUNCTION = "double_nan";
+  private static final Map<String, String> DOUBLE_FUNCTIONS =
+      ImmutableMap.<String, String>builder()
+          .put(DOUBLE_POSITIVE_INF_FUNCTION, "CAST('+inf' AS FLOAT64)")
+          .put(DOUBLE_NEGATIVE_INF_FUNCTION, "CAST('-inf' AS FLOAT64)")
+          .put(DOUBLE_NAN_FUNCTION, "CAST('NaN' AS FLOAT64)")
+          .build();
   public static final String NUMERIC_LITERAL_FUNCTION = "numeric_literal";
 
   public BeamBigQuerySqlDialect(Context context) {
@@ -157,8 +166,13 @@ public class BeamBigQuerySqlDialect extends 
BigQuerySqlDialect {
         break;
       case OTHER_FUNCTION:
         String funName = call.getOperator().getName();
-        if (NUMERIC_LITERAL_FUNCTION.equals(funName)) {
-          // self-designed function dealing with the unparsing of ZetaSQL 
numeric literal
+        if (DOUBLE_FUNCTIONS.containsKey(funName)) {
+          // self-designed function dealing with the unparsing of ZetaSQL 
DOUBLE positive
+          // infinity, negative infinity and NaN
+          unparseDoubleWrapperFunction(writer, funName);
+          break;
+        } else if (NUMERIC_LITERAL_FUNCTION.equals(funName)) {
+          // self-designed function dealing with the unparsing of ZetaSQL 
NUMERIC literal
           unparseNumericLiteralWrapperFunction(writer, call, leftPrec, 
rightPrec);
           break;
         } else if (FUNCTIONS_USING_INTERVAL.contains(funName)) {
@@ -239,6 +253,14 @@ public class BeamBigQuerySqlDialect extends 
BigQuerySqlDialect {
     writer.endFunCall(trimFrame);
   }
 
+  /**
+   * As there is no direct ZetaSQL literal representation of NaN or infinity, 
we cast String "+inf",
+   * "-inf" and "NaN" to FLOAT64 representing positive infinity, negative 
infinity and NaN.
+   */
+  private void unparseDoubleWrapperFunction(SqlWriter writer, String funName) {
+    writer.literal(DOUBLE_FUNCTIONS.get(funName));
+  }
+
   private void unparseNumericLiteralWrapperFunction(
       SqlWriter writer, SqlCall call, int leftPrec, int rightPrec) {
     writer.literal("NUMERIC '");
diff --git 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
index 2ccbb1b..6ccfb26 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/SupportedZetaSqlBuiltinFunctions.java
@@ -316,8 +316,8 @@ class SupportedZetaSqlBuiltinFunctions {
           // FunctionSignatureId.FN_DIV_NUMERIC, // div(numeric, numeric) -> 
numeric
           // FunctionSignatureId.FN_DIV_BIGNUMERIC, // div(bignumeric, 
bignumeric) -> bignumeric
 
-          // FunctionSignatureId.FN_IS_INF, // is_inf
-          // FunctionSignatureId.FN_IS_NAN, // is_nan
+          FunctionSignatureId.FN_IS_INF, // is_inf
+          FunctionSignatureId.FN_IS_NAN, // is_nan
           // FunctionSignatureId.FN_IEEE_DIVIDE_DOUBLE, // ieee_divide
           // FunctionSignatureId.FN_SAFE_DIVIDE_DOUBLE, // safe_divide
           // FunctionSignatureId.FN_SAFE_DIVIDE_NUMERIC, // safe_divide
diff --git 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
index de393b2..19f18cd 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/translation/ExpressionConverter.java
@@ -763,15 +763,45 @@ public class ExpressionConverter {
                     ZetaSqlCalciteTranslationUtils.toSimpleRelDataType(kind, 
rexBuilder()));
         break;
       case TYPE_DOUBLE:
+        // Cannot simply call makeApproxLiteral() for ZetaSQL DOUBLE type 
because positive infinity,
+        // negative infinity and Nan cannot be directly converted to 
BigDecimal. So we create three
+        // wrapper functions here for these three cases such that we can later 
recognize it and
+        // customize its unparsing in BeamBigQuerySqlDialect.
         double val = value.getDoubleValue();
-        if (Double.isInfinite(val) || Double.isNaN(val)) {
-          throw new UnsupportedOperationException("Does not support Infinite 
or NaN literals.");
+        String wrapperFun = null;
+        if (val == Double.POSITIVE_INFINITY) {
+          wrapperFun = BeamBigQuerySqlDialect.DOUBLE_POSITIVE_INF_FUNCTION;
+        } else if (val == Double.NEGATIVE_INFINITY) {
+          wrapperFun = BeamBigQuerySqlDialect.DOUBLE_NEGATIVE_INF_FUNCTION;
+        } else if (Double.isNaN(val)) {
+          wrapperFun = BeamBigQuerySqlDialect.DOUBLE_NAN_FUNCTION;
+        }
+
+        RelDataType returnType =
+            ZetaSqlCalciteTranslationUtils.toSimpleRelDataType(kind, 
rexBuilder());
+        if (wrapperFun == null) {
+          ret = rexBuilder().makeApproxLiteral(new BigDecimal(val), 
returnType);
+        } else if 
(BeamBigQuerySqlDialect.DOUBLE_NAN_FUNCTION.equals(wrapperFun)) {
+          // TODO[BEAM-10550]: Update the temporary workaround below after 
vendored Calcite version
+
+          // Adding an additional random parameter for the wrapper function of 
NaN, to avoid
+          // triggering Calcite operation simplification. (e.g. 'NaN == NaN' 
would be simplify to
+          // 'null or NaN is not null' in Calcite. This would miscalculate the 
expression to be
+          // true, which should be false.)
+          ret =
+              rexBuilder()
+                  .makeCall(
+                      SqlOperators.createZetaSqlFunction(wrapperFun, 
returnType.getSqlTypeName()),
+                      ImmutableList.of(
+                          rexBuilder()
+                              .makeApproxLiteral(new 
BigDecimal(Math.random()), returnType)));
+          ;
+        } else {
+          ret =
+              rexBuilder()
+                  .makeCall(
+                      SqlOperators.createZetaSqlFunction(wrapperFun, 
returnType.getSqlTypeName()));
         }
-        ret =
-            rexBuilder()
-                .makeApproxLiteral(
-                    new BigDecimal(val),
-                    ZetaSqlCalciteTranslationUtils.toSimpleRelDataType(kind, 
rexBuilder()));
         break;
       case TYPE_STRING:
         // has to allow CAST because Calcite create CHAR type first and does a 
CAST to VARCHAR.
diff --git 
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
index b70be26..e9c51c7 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/test/java/org/apache/beam/sdk/extensions/sql/zetasql/ZetaSqlDialectSpecTest.java
@@ -274,9 +274,6 @@ public class ZetaSqlDialectSpecTest extends ZetaSqlTestBase 
{
   }
 
   @Test
-  @Ignore(
-      "Does not support inf/-inf/nan in double/float literals because 
double/float literals are"
-          + " converted to BigDecimal in Calcite codegen.")
   public void testEQ2() {
     String sql = "SELECT @p0 = @p1 AS ColA";
 
@@ -2175,6 +2172,130 @@ public class ZetaSqlDialectSpecTest extends 
ZetaSqlTestBase {
   }
 
   /////////////////////////////////////////////////////////////////////////////
+  // DOUBLE INF and NAN tests
+  /////////////////////////////////////////////////////////////////////////////
+
+  @Test
+  public void testDoubleINF() {
+    String sql = "SELECT CAST('+inf' AS FLOAT64), CAST('-inf' AS FLOAT64)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addDoubleField("f_double1")
+                        .addDoubleField("f_double2")
+                        .build())
+                .addValues(Double.POSITIVE_INFINITY)
+                .addValues(Double.NEGATIVE_INFINITY)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDoubleINFEQ() {
+    String sql =
+        "SELECT CAST('+inf' AS FLOAT64) = CAST('+inf' AS FLOAT64), CAST('+inf' 
AS FLOAT64) = CAST('-inf' AS FLOAT64)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addBooleanField("f_boolean1")
+                        .addBooleanField("f_boolean2")
+                        .build())
+                .addValues(true)
+                .addValues(false)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDoubleNAN() {
+    String sql = "SELECT CAST('NaN' AS FLOAT64)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(Schema.builder().addDoubleField("f_double").build())
+                .addValues(Double.NaN)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDoubleNaNEQ() {
+    String sql = "SELECT CAST('NaN' AS FLOAT64) = CAST('NaN' AS FLOAT64)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            
Row.withSchema(Schema.builder().addBooleanField("f_boolean").build())
+                .addValues(false)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDoubleIsINF() {
+    String sql =
+        "SELECT IS_INF(CAST('+inf' AS FLOAT64)), IS_INF(CAST('-inf' AS 
FLOAT64)), IS_INF(3.0)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addBooleanField("f_boolean1")
+                        .addBooleanField("f_boolean2")
+                        .addBooleanField("f_boolean3")
+                        .build())
+                .addValues(true)
+                .addValues(true)
+                .addValues(false)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  @Test
+  public void testDoubleIsNAN() {
+    String sql = "SELECT IS_NAN(CAST('NaN' AS FLOAT64)), IS_NAN(3.0)";
+
+    ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
+    BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
+    PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, 
beamRelNode);
+
+    PAssert.that(stream)
+        .containsInAnyOrder(
+            Row.withSchema(
+                    Schema.builder()
+                        .addBooleanField("f_boolean1")
+                        .addBooleanField("f_boolean2")
+                        .build())
+                .addValues(true)
+                .addValues(false)
+                .build());
+    
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
   // NUMERIC type tests
   /////////////////////////////////////////////////////////////////////////////
 

Reply via email to