This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new cb6b5c1  [FLINK-21978][table-planner] Disable cast conversion between 
Numeric type and TIMESTAMP_LTZ type
cb6b5c1 is described below

commit cb6b5c1282f2d7ac226fa5c04ff650107382bb6d
Author: Leonard Xu <xbjt...@163.com>
AuthorDate: Sat Mar 27 20:01:57 2021 +0800

    [FLINK-21978][table-planner] Disable cast conversion between Numeric type 
and TIMESTAMP_LTZ type
    
    This closes #15374
---
 .../planner/codegen/calls/ScalarOperatorGens.scala | 103 +++----------
 .../planner/expressions/TemporalTypesTest.scala    | 166 ++++++++-------------
 .../apache/flink/table/data/DecimalDataUtils.java  |  22 ---
 .../apache/flink/table/data/DecimalDataTest.java   |   7 -
 4 files changed, 85 insertions(+), 213 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index 8dea49d..fa188b7 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -1241,7 +1241,7 @@ object ScalarOperatorGens {
         s"$method($operandTerm.getMillisecond(), $zone)"
       }
 
-    // Disable cast conversion between Numeric type and  Timestamp type
+    // Disable cast conversion between Numeric type and Timestamp type
     case (TINYINT, TIMESTAMP_WITHOUT_TIME_ZONE) |
          (SMALLINT, TIMESTAMP_WITHOUT_TIME_ZONE) |
          (INTEGER, TIMESTAMP_WITHOUT_TIME_ZONE) |
@@ -1267,91 +1267,30 @@ object ScalarOperatorGens {
       }
     }
 
-    // Tinyint -> TimestampLtz
-    // Smallint -> TimestampLtz
-    // Int -> TimestampLtz
-    // Bigint -> TimestampLtz
+    // Disable cast conversion between Numeric type and TimestampLtz type
     case (TINYINT, TIMESTAMP_WITH_LOCAL_TIME_ZONE) |
          (SMALLINT, TIMESTAMP_WITH_LOCAL_TIME_ZONE) |
          (INTEGER, TIMESTAMP_WITH_LOCAL_TIME_ZONE) |
-         (BIGINT, TIMESTAMP_WITH_LOCAL_TIME_ZONE) =>
-      generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-        operandTerm => s"$TIMESTAMP_DATA.fromEpochMillis(((long) $operandTerm) 
* 1000)"
-      }
-
-    // Float -> TimestampLtz
-    // Double -> TimestampLtz
-    case (FLOAT, TIMESTAMP_WITH_LOCAL_TIME_ZONE) |
-         (DOUBLE, TIMESTAMP_WITH_LOCAL_TIME_ZONE) =>
-      generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-        operandTerm =>
-          s"""
-              |$TIMESTAMP_DATA.fromEpochMillis((long) ($operandTerm * 1000),
-              |(int) (($operandTerm - (int) $operandTerm) * 1000_000_000 % 
1000_000))
-              """.stripMargin
-      }
-
-    // DECIMAL -> TimestampLtz
-    case (DECIMAL, TIMESTAMP_WITH_LOCAL_TIME_ZONE) =>
-      generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-        operandTerm =>
-          s"$DECIMAL_UTIL.castToTimestamp($operandTerm)"
-      }
-
-    // TimestampLtz -> Tinyint
-    case (TIMESTAMP_WITH_LOCAL_TIME_ZONE, TINYINT) =>
-      generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-        operandTerm => s"((byte) ($operandTerm.getMillisecond() / 1000))"
-      }
-
-    // TimestampLtz -> Smallint
-    case (TIMESTAMP_WITH_LOCAL_TIME_ZONE, SMALLINT) =>
-      generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-        operandTerm => s"((short) ($operandTerm.getMillisecond() / 1000))"
-      }
-
-    // TimestampLtz -> Int
-    case (TIMESTAMP_WITH_LOCAL_TIME_ZONE, INTEGER) =>
-      generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-        operandTerm => s"((int) ($operandTerm.getMillisecond() / 1000))"
-      }
-
-    // TimestampLtz -> BigInt
-    case (TIMESTAMP_WITH_LOCAL_TIME_ZONE, BIGINT) =>
-      generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-        operandTerm => s"((long) ($operandTerm.getMillisecond() / 1000))"
-      }
-
-    // TimestampLtz -> Float
-    case (TIMESTAMP_WITH_LOCAL_TIME_ZONE, FLOAT) =>
-      generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-        operandTerm =>
-          s"""
-             |((float) ($operandTerm.getMillisecond() / 1000.0 +
-             |$operandTerm.getNanoOfMillisecond() / 1000_000_000.0))
-          """.stripMargin
-      }
-
-    // TimestampLtz -> Double
-    case (TIMESTAMP_WITH_LOCAL_TIME_ZONE, DOUBLE) =>
-      generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-        operandTerm =>
-          s"""
-             |((double) ($operandTerm.getMillisecond() / 1000.0 +
-             |$operandTerm.getNanoOfMillisecond() / 1000_000_000.0))
-          """.stripMargin
-      }
-
-    // TimestampLtz -> Decimal
-    case  (TIMESTAMP_WITH_LOCAL_TIME_ZONE, DECIMAL) =>
-      val dt = targetType.asInstanceOf[DecimalType]
-      generateUnaryOperatorIfNotNull(ctx, targetType, operand) {
-        operandTerm =>
-          s"""
-             |$DECIMAL_UTIL.castFrom(
-             |  $operandTerm, ${dt.getPrecision}, ${dt.getScale})
-           """.stripMargin
+         (BIGINT, TIMESTAMP_WITH_LOCAL_TIME_ZONE) |
+         (FLOAT, TIMESTAMP_WITH_LOCAL_TIME_ZONE) |
+         (DOUBLE, TIMESTAMP_WITH_LOCAL_TIME_ZONE) |
+         (DECIMAL, TIMESTAMP_WITH_LOCAL_TIME_ZONE) |
+         (TIMESTAMP_WITH_LOCAL_TIME_ZONE, TINYINT) |
+         (TIMESTAMP_WITH_LOCAL_TIME_ZONE, SMALLINT) |
+         (TIMESTAMP_WITH_LOCAL_TIME_ZONE, INTEGER) |
+         (TIMESTAMP_WITH_LOCAL_TIME_ZONE, BIGINT) |
+         (TIMESTAMP_WITH_LOCAL_TIME_ZONE, FLOAT) |
+         (TIMESTAMP_WITH_LOCAL_TIME_ZONE, DOUBLE) |
+         (TIMESTAMP_WITH_LOCAL_TIME_ZONE, DECIMAL) => {
+      if (TIMESTAMP_WITH_LOCAL_TIME_ZONE.equals(targetType.getTypeRoot)) {
+        throw new ValidationException("The cast conversion from NUMERIC type" +
+          " to TIMESTAMP_LTZ type is not allowed, it's recommended to use" +
+          " TO_TIMESTAMP_LTZ(numeric_col, precision) instead.")
+      } else {
+        throw new ValidationException("The cast conversion from" +
+          " TIMESTAMP_LTZ type to NUMERIC type is not allowed.")
       }
+    }
 
     // internal temporal casting
     // Date -> Integer
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
index ac8e11f..b41ad79 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/expressions/TemporalTypesTest.scala
@@ -254,149 +254,111 @@ class TemporalTypesTest extends ExpressionTestBase {
       "CAST(f2 AS TIMESTAMP_LTZ(3))",
       "1990-10-14 10:20:45.123")
 
-    // TINYINT -> TIMESTAMP_LTZ
+    // TIMESTAMP_LTZ -> TIME
+    testSqlApi(
+      s"CAST(${timestampLtz("2018-03-14 01:02:03")} AS TIME)",
+      "01:02:03")
+
+    // TIMESTAMP_LTZ -> DATE
     testSqlApi(
+      s"CAST(${timestampLtz("2018-03-14 01:02:03")} AS DATE)",
+      "2018-03-14")
+
+    // TIMESTAMP_LTZ -> TIMESTAMP
+    testSqlApi(
+      s"CAST(${timestampLtz("2018-03-14 01:02:03")} AS TIMESTAMP(3))",
+      "2018-03-14 01:02:03.000")
+
+    // test precision when cast between TIMESTAMP_LTZ and TIMESTAMP_LTZ
+    testSqlApi(
+      s"CAST(${timestampLtz("1970-01-01 08:00:01.123456")} AS 
TIMESTAMP_LTZ(3))",
+      "1970-01-01 08:00:01.123")
+    testSqlApi(
+      s"CAST(${timestampLtz("1970-01-01 08:00:01.123456")} AS 
TIMESTAMP_LTZ(6))",
+      "1970-01-01 08:00:01.123456")
+    testSqlApi(
+      s"CAST(${timestampLtz("1970-01-01 08:00:01.123456")} AS 
TIMESTAMP_LTZ(9))",
+      "1970-01-01 08:00:01.123456000")
+  }
+
+  @Test
+  def tesInvalidCastBetweenNumericAndTimestampLtz(): Unit = {
+    val castFromTimestampLtzExceptionMsg = "The cast conversion from 
TIMESTAMP_LTZ type to" +
+      " NUMERIC type is not allowed."
+
+    val castToTimestampLtzExceptionMsg = "The cast conversion from NUMERIC 
type to TIMESTAMP_LTZ" +
+      " type is not allowed, it's recommended to use" +
+      " TO_TIMESTAMP_LTZ(numeric_col, precision) instead."
+
+    // TINYINT -> TIMESTAMP_LTZ
+    testExpectedSqlException(
       "CAST(CAST(100 AS TINYINT) AS TIMESTAMP_LTZ(3))",
-      "1970-01-01 08:01:40.000")
+      castToTimestampLtzExceptionMsg)
 
     // SMALLINT -> TIMESTAMP_LTZ
-    testSqlApi(
+    testExpectedSqlException(
       "CAST(CAST(100 AS SMALLINT) AS TIMESTAMP_LTZ(3))",
-      "1970-01-01 08:01:40.000")
+      castToTimestampLtzExceptionMsg)
 
     // INT -> TIMESTAMP_LTZ
-    testSqlApi(
+    testExpectedSqlException(
       "CAST(100 AS TIMESTAMP_LTZ(3))",
-      "1970-01-01 08:01:40.000")
+      castToTimestampLtzExceptionMsg)
 
     // BIGINT -> TIMESTAMP_LTZ
-    testSqlApi(
+    testExpectedSqlException(
       "CAST(CAST(100 AS BIGINT) AS TIMESTAMP_LTZ(3))",
-      "1970-01-01 08:01:40.000")
+      castToTimestampLtzExceptionMsg)
 
     // FLOAT -> TIMESTAMP_LTZ
-    testSqlApi(
+    testExpectedSqlException(
       "CAST(CAST(100.01 AS FLOAT) AS TIMESTAMP_LTZ(3))",
-      "1970-01-01 08:01:40.010")
+      castToTimestampLtzExceptionMsg)
 
     // DOUBLE -> TIMESTAMP_LTZ
-    testSqlApi(
+    testExpectedSqlException(
       "CAST(CAST(100.123 AS DOUBLE) AS TIMESTAMP_LTZ(3))",
-      "1970-01-01 08:01:40.123")
+      castToTimestampLtzExceptionMsg)
 
     // DECIMAL -> TIMESTAMP_LTZ
-    testSqlApi(
+    testExpectedSqlException(
       "CAST(CAST(100.1234 as DECIMAL(38, 18)) AS TIMESTAMP_LTZ(3))",
-      "1970-01-01 08:01:40.123")
-    testSqlApi(
-      "CAST(CAST(1616490480.123 AS DECIMAL(38, 18)) AS TIMESTAMP_LTZ(9))",
-      "2021-03-23 17:08:00.123000000")
-
-    // TIMESTAMP_LTZ -> TIME
-    testSqlApi(
-      s"CAST(${timestampLtz("2018-03-14 01:02:03")} AS TIME)",
-      "01:02:03")
-
-    // TIMESTAMP_LTZ -> DATE
-    testSqlApi(
-      s"CAST(${timestampLtz("2018-03-14 01:02:03")} AS DATE)",
-      "2018-03-14")
-
-    // TIMESTAMP_LTZ -> TIMESTAMP
-    testSqlApi(
-      s"CAST(${timestampLtz("2018-03-14 01:02:03")} AS TIMESTAMP(3))",
-      "2018-03-14 01:02:03.000")
+      castToTimestampLtzExceptionMsg)
 
     // TIMESTAMP_LTZ -> TINYINT
-    testSqlApi(
+    testExpectedSqlException(
       s"CAST(${timestampLtz("1970-01-01 08:02:03.123")} AS TINYINT)",
-      "123")
+      castFromTimestampLtzExceptionMsg)
 
     // TIMESTAMP_LTZ -> SMALLINT
-    testSqlApi(
+    testExpectedSqlException(
       s"CAST(${timestampLtz("1970-01-01 08:02:03.123")} AS SMALLINT)",
-      "123")
+      castFromTimestampLtzExceptionMsg)
 
     // TIMESTAMP_LTZ -> INT
-    testSqlApi(
+    testExpectedSqlException(
       s"CAST(${timestampLtz("1970-01-01 08:02:03.123")} AS INT)",
-      "123")
+      castFromTimestampLtzExceptionMsg)
 
     // TIMESTAMP_LTZ -> BIGINT
-    testSqlApi(
+    testExpectedSqlException(
       s"CAST(${timestampLtz("1970-01-01 08:02:03.123")} AS BIGINT)",
-      "123")
+      castFromTimestampLtzExceptionMsg)
 
     // TIMESTAMP_LTZ -> FLOAT
-    testSqlApi(
+    testExpectedSqlException(
       s"CAST(${timestampLtz("1970-01-01 08:02:03.123")} AS FLOAT)",
-      "123.123")
+      castFromTimestampLtzExceptionMsg)
 
     // TIMESTAMP_LTZ -> DOUBLE
-    testSqlApi(
+    testExpectedSqlException(
       s"CAST(${timestampLtz("1970-01-01 08:02:03.123")} AS DOUBLE)",
-      "123.123")
+      castFromTimestampLtzExceptionMsg)
 
     // TIMESTAMP_LTZ -> DECIMAL
-    testSqlApi(
+    testExpectedSqlException(
       s"CAST(${timestampLtz("1970-01-01 08:02:03.123")} AS DECIMAL(38, 3))",
-      "123.123")
-
-    // test precision when cast to TIMESTAMP_LTZ
-    testSqlApi(
-      "CAST(CAST(1.1234567 AS FLOAT) AS TIMESTAMP_LTZ(6))",
-      "1970-01-01 08:00:01.123456")
-    testSqlApi(
-      "CAST(CAST(1.12 AS FLOAT) AS TIMESTAMP_LTZ(6))",
-      "1970-01-01 08:00:01.120000")
-    testSqlApi(
-      "CAST(CAST(1.1234567899 AS DOUBLE) AS TIMESTAMP_LTZ(9))",
-      "1970-01-01 08:00:01.123456789")
-    testSqlApi(
-      "CAST(CAST(1.12 AS DOUBLE) AS TIMESTAMP_LTZ(6))",
-      "1970-01-01 08:00:01.120000")
-    testSqlApi(
-      "CAST(CAST(1.1234567899 AS DECIMAL(38, 18)) AS TIMESTAMP_LTZ(9))",
-      "1970-01-01 08:00:01.123456789")
-    testSqlApi(
-      "CAST(CAST(1.12 AS DECIMAL(38, 18)) AS TIMESTAMP_LTZ(6))",
-      "1970-01-01 08:00:01.120000")
-
-    // test precision when cast from TIMESTAMP_LTZ
-    testSqlApi(
-      s"CAST(${timestampLtz("1970-01-01 08:00:01.123456")} AS FLOAT)",
-      "1.123456")
-    testSqlApi(
-      s"CAST(${timestampLtz("1970-01-01 08:02:03.123456")} AS DOUBLE)",
-      "123.123456")
-    testSqlApi(
-      s"CAST(${timestampLtz("1970-01-01 08:02:03.123456")} AS DECIMAL(38, 6))",
-      "123.123456")
-    testSqlApi(
-      s"CAST(${timestampLtz("1970-01-01 08:02:03.123456789")} AS DECIMAL(38, 
9))",
-      "123.123456789")
-    testSqlApi(
-      s"CAST(${timestampLtz("1970-01-01 08:02:03.123456")} AS DECIMAL(38, 4))",
-      "123.1235")
-
-    // test cast between TIMESTAMP_LTZ and TIMESTAMP_LTZ
-    testSqlApi(
-      s"CAST(${timestampLtz("1970-01-01 08:00:01.123456")} AS 
TIMESTAMP_LTZ(3))",
-      "1970-01-01 08:00:01.123")
-    testSqlApi(
-      s"CAST(${timestampLtz("1970-01-01 08:00:01.123456")} AS 
TIMESTAMP_LTZ(6))",
-      "1970-01-01 08:00:01.123456")
-    testSqlApi(
-      s"CAST(${timestampLtz("1970-01-01 08:00:01.123456")} AS 
TIMESTAMP_LTZ(9))",
-      "1970-01-01 08:00:01.123456000")
-
-    // test cast with null value
-    testSqlApi(
-      s"CAST(CAST(null AS BIGINT) AS TIMESTAMP_LTZ(3))",
-      "null")
-    testSqlApi(
-      s"CAST(CAST(null AS TIMESTAMP_LTZ(3)) AS BIGINT)",
-      "null")
+      castFromTimestampLtzExceptionMsg)
   }
 
   @Test
diff --git 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/DecimalDataUtils.java
 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/DecimalDataUtils.java
index 72a7193..9ea7055 100644
--- 
a/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/DecimalDataUtils.java
+++ 
b/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/DecimalDataUtils.java
@@ -38,9 +38,6 @@ import static 
org.apache.flink.table.data.DecimalData.fromBigDecimal;
 public final class DecimalDataUtils {
 
     private static final MathContext MC_DIVIDE = new MathContext(38, 
RoundingMode.HALF_UP);
-    private static final long MILLS_PER_SECOND = 1000L;
-    private static final long NANOS_PER_SECOND = 1000_000_000L;
-    private static final long NANOS_PER_MILLISECOND = 1000_000L;
 
     public static final DecimalType DECIMAL_SYSTEM_DEFAULT =
             new DecimalType(DecimalType.MAX_PRECISION, 18);
@@ -208,25 +205,6 @@ public final class DecimalDataUtils {
         return dec.toBigDecimal().compareTo(BigDecimal.ZERO) != 0;
     }
 
-    public static TimestampData castToTimestamp(DecimalData dec) {
-        BigDecimal bd = dec.toBigDecimal().multiply(new 
BigDecimal(MILLS_PER_SECOND));
-        long mills = bd.longValue();
-        int nanos =
-                bd.remainder(BigDecimal.ONE)
-                        .multiply(new BigDecimal(NANOS_PER_MILLISECOND))
-                        .intValue();
-        return TimestampData.fromEpochMillis(mills, nanos);
-    }
-
-    public static DecimalData castFrom(TimestampData val, int precision, int 
scale) {
-        BigDecimal decimalSeconds =
-                new BigDecimal(val.getMillisecond())
-                        .multiply(new BigDecimal(NANOS_PER_MILLISECOND))
-                        .add(new BigDecimal(val.getNanoOfMillisecond()))
-                        .divide(new BigDecimal(NANOS_PER_SECOND));
-        return fromBigDecimal(decimalSeconds, precision, scale);
-    }
-
     public static DecimalData castFrom(DecimalData dec, int precision, int 
scale) {
         return fromBigDecimal(dec.toBigDecimal(), precision, scale);
     }
diff --git 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DecimalDataTest.java
 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DecimalDataTest.java
index 9854183..5f5b5f0 100644
--- 
a/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DecimalDataTest.java
+++ 
b/flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/data/DecimalDataTest.java
@@ -28,7 +28,6 @@ import static 
org.apache.flink.table.data.DecimalDataUtils.castFrom;
 import static org.apache.flink.table.data.DecimalDataUtils.castToBoolean;
 import static org.apache.flink.table.data.DecimalDataUtils.castToDecimal;
 import static org.apache.flink.table.data.DecimalDataUtils.castToIntegral;
-import static org.apache.flink.table.data.DecimalDataUtils.castToTimestamp;
 import static org.apache.flink.table.data.DecimalDataUtils.ceil;
 import static org.apache.flink.table.data.DecimalDataUtils.compare;
 import static org.apache.flink.table.data.DecimalDataUtils.divide;
@@ -115,10 +114,6 @@ public class DecimalDataTest {
         assertTrue(castToBoolean(castFrom(true, 5, 0)));
         assertEquals(5, castToIntegral(castFrom(5, 5, 0)));
         assertEquals(5, castToIntegral(castFrom("5", 5, 0)));
-        assertEquals(5000, castToTimestamp(castFrom("5", 5, 
0)).getMillisecond());
-        assertEquals(12300456, castToTimestamp(castFrom("12300.4567", 10, 
4)).getMillisecond());
-        assertEquals(
-                780000, castToTimestamp(castFrom("12300.45678", 10, 
5)).getNanoOfMillisecond());
 
         DecimalData newDecimal = castFrom(castFrom(10, 5, 2), 10, 4);
         assertEquals(10, newDecimal.precision());
@@ -173,8 +168,6 @@ public class DecimalDataTest {
         assertNull(DecimalData.fromBigDecimal(new BigDecimal(Long.MAX_VALUE), 
5, 0));
         assertEquals(0, DecimalData.zero(20, 2).toBigDecimal().intValue());
         assertEquals(0, DecimalData.zero(20, 2).toBigDecimal().intValue());
-        assertEquals(1234000, castToTimestamp(castFrom("1234", 20, 
4)).getMillisecond());
-        assertEquals(0, castToTimestamp(castFrom("1234", 20, 
4)).getNanoOfMillisecond());
     }
 
     @Test

Reply via email to