wuchong commented on a change in pull request #15280:
URL: https://github.com/apache/flink/pull/15280#discussion_r604753204



##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
##########
@@ -120,6 +120,17 @@ public static boolean isProctimeAttribute(LogicalType 
logicalType) {
         return logicalType.accept(TIMESTAMP_KIND_EXTRACTOR) == 
TimestampKind.PROCTIME;
     }
 
+    public static boolean supportedTimeAttributeType(LogicalType logicalType) {
+        if (isProctimeAttribute(logicalType)
+                && logicalType.getTypeRoot() == 
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE) {
+            return true;
+        }
+        if (isTimeAttribute(logicalType) && hasFamily(logicalType, 
LogicalTypeFamily.TIMESTAMP)) {

Review comment:
       nit: should we more strict here? 
   
   ```java
   return isRowtimeAttribute(logicalType)
                   && (logicalType.getTypeRoot() == 
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE
                           || logicalType.getTypeRoot()
                                   == 
LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
   ```

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
##########
@@ -258,27 +265,59 @@ private static DataType createLegacyType(
                 .bridgedTo(typeInfo.getTypeClass());
     }
 
+    // private static DataType convertToTimeAttributeType(

Review comment:
       remove

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
##########
@@ -182,7 +182,12 @@ object WindowUtil {
       throw new ValidationException("Window can only be defined on a time 
attribute column, " +
         "but is type of " + fieldType)
     }
-    val timeAttributeType = 
FlinkTypeFactory.toLogicalType(fieldType).asInstanceOf[TimestampType]
+    val timeAttributeType = FlinkTypeFactory.toLogicalType(fieldType)
+    if (!supportedTimeAttributeType(timeAttributeType)) {
+      throw new ValidationException("The supported time indicator type are" +
+        " timestamp and timestampLtz, but is " + 
FlinkTypeFactory.toLogicalType(fieldType)
+        + ", please file an issue.")

Review comment:
       nit: this is not a bug, do not need file an issue. 

##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala
##########
@@ -492,37 +492,37 @@ class TableEnvironmentTest extends TableTestBase {
   def testStreamAliasWithAddingTimeAttributesByName(): Unit = {
     val util = streamTestUtil()
 
-    // atomic
-    util.verifySchema(
-      util.addTable[Int]('new.proctime),
-      Seq("new" -> PROCTIME))
-
-    // case class
-    util.verifySchema(
-      util.addTable[CClassWithTime]('cf1, 'new.proctime, 'cf2),
-      Seq("cf1" -> INT, "new" -> PROCTIME, "cf2" -> LONG))
+//    // atomic

Review comment:
       revert

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/schema/LegacyCatalogSourceTable.scala
##########
@@ -234,11 +234,20 @@ class LegacyCatalogSourceTable[T](
       val fieldNames = logicalRowType.getFieldNames
       val fieldTypes = logicalRowType.getFields.map { f =>
         if (FlinkTypeFactory.isTimeIndicatorType(f.getType)) {
-          val timeIndicatorType = f.getType.asInstanceOf[TimestampType]
-          new TimestampType(
-            timeIndicatorType.isNullable,
-            TimestampKind.REGULAR,
-            timeIndicatorType.getPrecision)
+          f.getType match {
+            case ts: TimestampType =>
+              new TimestampType(
+                ts.isNullable,
+                TimestampKind.REGULAR,
+                ts.getPrecision)
+            case ltz: LocalZonedTimestampType =>
+              new LocalZonedTimestampType(
+                ltz.isNullable,
+                TimestampKind.REGULAR,
+                ltz.getPrecision)
+            case _ => throw new ValidationException("The supported time 
indicator type" +
+              " are timestamp and timestampLtz, but is " + f.getType + ", 
please file an issue.")

Review comment:
       nit: this is not a bug, do not need file an issue. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to