twalthr commented on a change in pull request #15280:
URL: https://github.com/apache/flink/pull/15280#discussion_r605478496



##########
File path: 
flink-python/src/main/java/org/apache/flink/table/runtime/operators/python/aggregate/PythonStreamGroupWindowAggregateOperator.java
##########
@@ -117,6 +122,11 @@
     /** Session Window gap. */
     private long gap;
 
+    /** The shift timeZone of the window. */
+    private final String shiftTimeZone;
+
+    protected transient TimeZone timeZone;

Review comment:
       Why are we using this old time class instead of `java.time.ZoneId`. The 
`java.time` package is well desinged.

##########
File path: 
flink-python/src/test/java/org/apache/flink/table/runtime/operators/python/aggregate/arrow/stream/StreamArrowPythonGroupWindowAggregateFunctionOperatorTest.java
##########
@@ -458,7 +458,8 @@ public AbstractArrowPythonAggregateFunctionOperator 
getTestOperator(
                     new PlannerNamedWindowProperty("end", new 
PlannerWindowEnd(null))
                 },
                 groupingSet,
-                udafInputOffsets);
+                udafInputOffsets,
+                "UTC");

Review comment:
       We should avoid using strings but real constants and concrete instances 
of classes.

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DefaultSchemaResolver.java
##########
@@ -248,13 +246,29 @@ private Column adjustRowtimeAttribute(List<WatermarkSpec> 
watermarkSpecs, Column
         final boolean hasWatermarkSpec =
                 watermarkSpecs.stream().anyMatch(s -> 
s.getRowtimeAttribute().equals(name));
         if (hasWatermarkSpec && isStreamingMode) {
-            final TimestampType originalType = (TimestampType) 
dataType.getLogicalType();
-            final LogicalType rowtimeType =
-                    new TimestampType(
-                            originalType.isNullable(),
-                            TimestampKind.ROWTIME,
-                            originalType.getPrecision());
-            return column.copy(replaceLogicalType(dataType, rowtimeType));
+            switch (dataType.getLogicalType().getTypeRoot()) {

Review comment:
       This change and the change above has nothing to do with `PROCTIME()`. It 
should be in a different PR that updates rowtime.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
##########
@@ -81,7 +82,8 @@
                     int.class.getName(),
                     Long.class.getName(),
                     long.class.getName(),
-                    TimestampData.class.getName());
+                    TimestampData.class.getName(),
+                    java.sql.Timestamp.class.getName());

Review comment:
       Please also update the documentation table.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.TimeZone;
+
+/** Time util to deals window start and end in different timezone. */
+public class TimeWindowUtil {

Review comment:
       mark as `@Internal` 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
##########
@@ -207,6 +208,10 @@ public StreamExecGroupWindowAggregate(
             inputTimeFieldIndex = -1;
         }
 
+        final String shiftTimeZone =
+                TimeWindowUtil.getShiftTimeZone(

Review comment:
       +1 to Jark's question

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
##########
@@ -755,7 +758,7 @@ private static boolean isRowtimeField(FieldInfo field) {
 
     private static boolean isProctimeField(FieldInfo field) {
         DataType type = field.getType();
-        return hasRoot(type.getLogicalType(), 
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
+        return hasFamily(type.getLogicalType(), LogicalTypeFamily.TIMESTAMP)

Review comment:
       we could also change `isProctimeAttribute` such that the default returns 
`false`. Then the first predicate is not necessary anymore and simplifies the 
code.

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
##########
@@ -120,6 +120,25 @@ public static boolean isProctimeAttribute(LogicalType 
logicalType) {
         return logicalType.accept(TIMESTAMP_KIND_EXTRACTOR) == 
TimestampKind.PROCTIME;
     }
 
+    public static boolean supportedTimeAttributeType(LogicalType logicalType) {

Review comment:
       nit: it would be nice to start methods with verbs e.g. 
`isTimeAttributeType` or `isWatermarkType` which is also shorter

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
##########
@@ -48,7 +48,9 @@
      * Returns a {@code Collection} of windows that should be assigned to the 
element.
      *
      * @param element The element to which windows should be assigned.
-     * @param timestamp The timestamp of the element.
+     * @param timestamp The timestamp value has been shifted to local 
timestamp to simplify window
+     *     assigning. That means the timestamp has been adjusted to 
milliseconds after 1970-01-01
+     *     00:00:00 in local time zone.

Review comment:
       Is this change correct in the DataStream API? It actually doesn't know 
the concept of a "local time zone". It simply works in epoch millis.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecTemporalSort.java
##########
@@ -104,10 +105,15 @@ public StreamExecTemporalSort(
             TimestampType keyType = (TimestampType) timeType;
             if (keyType.getKind() == TimestampKind.ROWTIME) {
                 return createSortRowTime(inputType, inputTransform, config);
-            } else if (keyType.getKind() == TimestampKind.PROCTIME) {
+            }
+        }
+        if (timeType instanceof LocalZonedTimestampType) {
+            LocalZonedTimestampType keyType = (LocalZonedTimestampType) 
timeType;
+            if (keyType.getKind() == TimestampKind.PROCTIME) {

Review comment:
       If `PROCTIME` will always be `LocalZonedTimestampType` from now on, 
shall we forbid this kind in the constructor of `ZonedTimestampType` and 
`TimestampType`? Would it make our life easier?

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/LocalZonedTimestampType.java
##########
@@ -72,7 +72,8 @@
                     java.time.Instant.class.getName(),
                     Integer.class.getName(),
                     Long.class.getName(),
-                    TimestampData.class.getName());
+                    TimestampData.class.getName(),
+                    java.sql.Timestamp.class.getName());

Review comment:
       Are we still planning to support `LocalDateTime` as well? Or is 
`java.sql.Timestamp` enough for us?  Supporting `LocalDateTime` would mean that 
we need access to configuration in converters, correct?

##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ConnectorCatalogTable.java
##########
@@ -166,7 +167,9 @@ private static void updateProctimeIndicator(
             if (fieldNames[i].equals(proctimeAttribute)) {
                 // bridged to timestamp for compatible flink-planner
                 types[i] =
-                        new AtomicDataType(new TimestampType(true, 
TimestampKind.PROCTIME, 3))
+                        new AtomicDataType(

Review comment:
       Create `DataTypeUtils.createProctimeDataType()` to simplify this to:
   
   `DataTypeUtils.createProctimeDataType().bridgedTo(java.sql.Timestamp.class)`

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.scala
##########
@@ -602,10 +612,17 @@ class RexTimeIndicatorMaterializer(
     private val input: Seq[RelDataType])
   extends RexShuttle {
 
-  private def timestamp(isNullable: Boolean): RelDataType = rexBuilder
-    .getTypeFactory
-    .asInstanceOf[FlinkTypeFactory]
-    .createFieldTypeFromLogicalType(new TimestampType(isNullable, 3))
+  private def timestamp(isNullable: Boolean, isTimestampLtz: Boolean = false): 
RelDataType = {

Review comment:
       reduce 3 code duplication and refactor this to a global method in this 
file that takes `RexBuilder`?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/data/util/DataFormatConverters.java
##########
@@ -900,6 +902,36 @@ Timestamp toExternalImpl(RowData row, int column) {
         }
     }
 
+    /** Converter for timestamp which doesn't consider the time zone. */
+    public static final class TimestampLtzConverter

Review comment:
       Please also update the non-legacy 
`org.apache.flink.table.data.conversion.DataStructureConverters`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to