wuchong commented on a change in pull request #15280:
URL: https://github.com/apache/flink/pull/15280#discussion_r603840950



##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java
##########
@@ -163,8 +163,8 @@ public static RelDataType inferRowType(
                 .builder()
                 .kind(inputRowType.getStructKind())
                 .addAll(inputRowType.getFieldList())
-                .add("window_start", SqlTypeName.TIMESTAMP, 3)
-                .add("window_end", SqlTypeName.TIMESTAMP, 3)
+                .add("window_start", timeAttributeType.getSqlTypeName(), 3)
+                .add("window_end", timeAttributeType.getSqlTypeName(), 3)

Review comment:
       Should use `TIMESTAMP` type.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupWindowAggregate.java
##########
@@ -207,6 +208,10 @@ public StreamExecGroupWindowAggregate(
             inputTimeFieldIndex = -1;
         }
 
+        final String shiftTimeZone =
+                TimeWindowUtil.getShiftTimeZone(

Review comment:
       Why not use `TimeZone` strong type here? 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.rel.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelDistributionTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Snapshot;
+import org.apache.calcite.rel.metadata.RelMdCollation;
+import org.apache.calcite.rel.metadata.RelMdDistribution;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Litmus;
+
+/**
+ * Sub-class of {@link org.apache.calcite.rel.core.Snapshot} not targeted at 
any particular engine
+ * or calling convention.
+ *
+ * <p>Line 80 ~ 91: Calcite only supports timestamp type as period type, but 
Flink supports both

Review comment:
       Add Calcite JIRA id. 

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
##########
@@ -120,6 +120,10 @@ public static boolean isProctimeAttribute(LogicalType 
logicalType) {
         return logicalType.accept(TIMESTAMP_KIND_EXTRACTOR) == 
TimestampKind.PROCTIME;
     }
 
+    public static boolean isTimestampLtzType(LogicalType logicalType) {

Review comment:
       It only be used in one place, seems not necessay to be a util method. 

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala
##########
@@ -627,3 +633,14 @@ class WindowAggregateITCase(mode: StateBackendMode)
       sink.getAppendResults.sorted.mkString("\n"))
   }
 }
+
+object WindowAggregateITCase {
+  @Parameterized.Parameters(name = "StateBackend={0}, TimeZone={1}")
+  def parameters(): JCollection[Array[Object]] = {
+    Seq[Array[AnyRef]](
+      Array(HEAP_BACKEND, TimeZone.getTimeZone("UTC")),
+      Array(HEAP_BACKEND, TimeZone.getTimeZone("Asia/Shanghai")),
+      Array(ROCKSDB_BACKEND, TimeZone.getTimeZone("UTC")),
+      Array(ROCKSDB_BACKEND, TimeZone.getTimeZone("Asia/Shanghai")))

Review comment:
       As we don't support LTZ for rowtime. This parameterized time zone 
doesn't have much sense here. We can introduce them when we support LTZ 
rowtime. If we want to have them, I suggest we can just test different time 
zones on heap backend to reduce test time, because I think time zone is not 
statebackend sensitive, 

##########
File path: 
flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest.java
##########
@@ -85,7 +85,7 @@ public void testProcTimeTumbleWindow() {
         String sinkTableDdl =
                 "CREATE TABLE MySink (\n"
                         + " b BIGINT,\n"
-                        + " window_end TIMESTAMP(3),\n"
+                        + " window_end TIMESTAMP_LTZ(3),\n"

Review comment:
       Should be `TIMESTAMP(3)`. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.TimeZone;
+
+/** Time util to deals window start and end in different timezone. */
+public class TimeWindowUtil {
+
+    private static final ZoneId UTC_ZONE_ID = 
TimeZone.getTimeZone("UTC").toZoneId();
+
+    private static final long SECONDS_PER_HOUR = 60 * 60L;
+
+    private static final long MILLS_PER_HOUR = SECONDS_PER_HOUR * 1000L;
+
+    /**
+     * Convert a epoch mills to timestamp mills which can describe a locate 
date time.
+     *
+     * <p>For example: The timestamp string of epoch mills 5 in UTC+8 is 
1970-01-01 08:00:05, the
+     * timestamp mills is 8 * 60 * 60 * 100 + 5.
+     *
+     * @param epochMills the epoch mills.
+     * @param timeZone the timezone
+     * @return the mills which can describe the local timestamp string in 
given timezone.
+     */
+    public static long toUtcTimestampMills(long epochMills, TimeZone timeZone) 
{
+        if (timeZone.toZoneId().equals(UTC_ZONE_ID)) {
+            return epochMills;
+        }
+        LocalDateTime localDateTime =
+                LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMills), 
timeZone.toZoneId());
+        return localDateTime.atZone(UTC_ZONE_ID).toInstant().toEpochMilli();
+    }
+
+    /**
+     * Convert a timestamp mills with given timezone to epoch mills.
+     *
+     * @param utcTimestampMills the timestamp mills.
+     * @param timeZone the timezone
+     * @param usedInTimer if the utc timestamp can map to multiple epoch 
mills(at most two), use the
+     *     bigger one for timer, use the smaller one for window property.
+     * @return the epoch mills.
+     */
+    public static long toEpochMills(
+            long utcTimestampMills, TimeZone timeZone, boolean usedInTimer) {

Review comment:
       Please split it into two different methods, e.g. `toEpochMillsForTimer`. 
This can be safer when calling the methods. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowUtil.scala
##########
@@ -176,7 +174,15 @@ object WindowUtil {
       throw new ValidationException("Window can only be defined on a time 
attribute column, " +
         "but is type of " + fieldType)
     }
-    val timeAttributeType = 
FlinkTypeFactory.toLogicalType(fieldType).asInstanceOf[TimestampType]
+    val timeAttributeType = FlinkTypeFactory.toLogicalType(fieldType) match {
+      case timestampType: TimestampType =>
+        timestampType
+      case timestampLtzType: LocalZonedTimestampType =>
+        timestampLtzType
+      case _ => throw new ValidationException("The supported time indicator 
type are" +
+        " timestamp and timestampLtz, but is " + 
FlinkTypeFactory.toLogicalType(fieldType)
+        + ", please file an issue.")
+    }

Review comment:
       Would be better to add the type check in another line. We can have a 
`supportedTimeAttributeType(LogicalType)` in `LogicalTypeChecks` for easier 
checking. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/calcite/FlinkTypeFactory.scala
##########
@@ -464,6 +473,12 @@ object FlinkTypeFactory {
     case _ => false
   }
 
+  def isTimestampLtzIndicatorType(relDataType: RelDataType): Boolean =
+    relDataType.getSqlTypeName match {
+    case SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE => true
+    case _ => false

Review comment:
       This method only checks type root, not indicator type. Would be better 
to call `isTimestampLtzType`.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.TimeZone;
+
+/** Time util to deals window start and end in different timezone. */
+public class TimeWindowUtil {
+
+    private static final ZoneId UTC_ZONE_ID = 
TimeZone.getTimeZone("UTC").toZoneId();
+
+    private static final long SECONDS_PER_HOUR = 60 * 60L;
+
+    private static final long MILLS_PER_HOUR = SECONDS_PER_HOUR * 1000L;
+
+    /**
+     * Convert a epoch mills to timestamp mills which can describe a locate 
date time.
+     *
+     * <p>For example: The timestamp string of epoch mills 5 in UTC+8 is 
1970-01-01 08:00:05, the
+     * timestamp mills is 8 * 60 * 60 * 100 + 5.
+     *
+     * @param epochMills the epoch mills.
+     * @param timeZone the timezone
+     * @return the mills which can describe the local timestamp string in 
given timezone.
+     */
+    public static long toUtcTimestampMills(long epochMills, TimeZone timeZone) 
{
+        if (timeZone.toZoneId().equals(UTC_ZONE_ID)) {
+            return epochMills;
+        }
+        LocalDateTime localDateTime =
+                LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMills), 
timeZone.toZoneId());
+        return localDateTime.atZone(UTC_ZONE_ID).toInstant().toEpochMilli();

Review comment:
       Could we reduce the `toZoneId()` calling? It might affect the 
performance because it's on the per-record path. 

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableToDataStreamITCase.scala
##########
@@ -256,6 +256,82 @@ final class TableToDataStreamITCase extends 
StreamingTestBase {
     assertEquals(expected2.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
   }
 
+  @Test
+  def testFromTableToAppendStreamWithProctime(): Unit = {
+    val data = List(
+      rowOf(localDateTime(1L), "A"),
+      rowOf(localDateTime(2L), "B"),
+      rowOf(localDateTime(3L), "C"),
+      rowOf(localDateTime(4L), "D"),
+      rowOf(localDateTime(7L), "E"))
+
+    val dataId: String = TestValuesTableFactory.registerData(data)
+
+    val sourceDDL =
+      s"""
+         |CREATE TABLE src (
+         |  ts TIMESTAMP(3),
+         |  a STRING,
+         |  proctime as PROCTIME()
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$dataId'
+         |)
+      """.stripMargin
+
+    tEnv.executeSql(sourceDDL)
+    val dataStream = tEnv.sqlQuery("SELECT a, ts, proctime FROM 
src").toAppendStream[Row]
+
+    val sink = new TestingAppendSink
+    dataStream.addSink(sink)
+    env.execute("TableToAppendStream")
+    // the result should like as following, we only check the result
+    //  number because of the non-deterministic of proctime()
+    //    A,1970-01-01T00:00:01,2021-03-30T03:14:40.936Z

Review comment:
       I think we can just verify result TypeInformation of `dataStream`. 
Running the job doesn't help to verify anything. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/window/assigners/WindowAssigner.java
##########
@@ -52,7 +52,8 @@ public void open(InternalWindowProcessFunction.Context<?, W> 
ctx) throws Excepti
      *
      * @param element The element to which windows should be assigned.
      * @param timestamp The timestamp of the element when {@link 
#isEventTime()} returns true, or
-     *     the current system time when {@link #isEventTime()} returns false.
+     *     the current system time when {@link #isEventTime()} returns false. 
The timestamp value is
+     *     mapping to UTC milliseconds for splitting windows simply.

Review comment:
       nit: we should mention **has been mapped**,  what do you think about:
   
   The timestamp value has been shifted to local timestamp to simplify window 
assigning. That means the timestamp has been adjusted to milliseconds after 
1970-01-01 00:00:00 in local time zone.
   
   
   

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.TimeZone;
+
+/** Time util to deals window start and end in different timezone. */
+public class TimeWindowUtil {
+
+    private static final ZoneId UTC_ZONE_ID = 
TimeZone.getTimeZone("UTC").toZoneId();
+
+    private static final long SECONDS_PER_HOUR = 60 * 60L;
+
+    private static final long MILLS_PER_HOUR = SECONDS_PER_HOUR * 1000L;
+
+    /**
+     * Convert a epoch mills to timestamp mills which can describe a locate 
date time.
+     *
+     * <p>For example: The timestamp string of epoch mills 5 in UTC+8 is 
1970-01-01 08:00:05, the
+     * timestamp mills is 8 * 60 * 60 * 100 + 5.
+     *
+     * @param epochMills the epoch mills.
+     * @param timeZone the timezone
+     * @return the mills which can describe the local timestamp string in 
given timezone.
+     */
+    public static long toUtcTimestampMills(long epochMills, TimeZone timeZone) 
{
+        if (timeZone.toZoneId().equals(UTC_ZONE_ID)) {
+            return epochMills;
+        }
+        LocalDateTime localDateTime =
+                LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMills), 
timeZone.toZoneId());
+        return localDateTime.atZone(UTC_ZONE_ID).toInstant().toEpochMilli();
+    }
+
+    /**
+     * Convert a timestamp mills with given timezone to epoch mills.
+     *
+     * @param utcTimestampMills the timestamp mills.
+     * @param timeZone the timezone
+     * @param usedInTimer if the utc timestamp can map to multiple epoch 
mills(at most two), use the
+     *     bigger one for timer, use the smaller one for window property.
+     * @return the epoch mills.
+     */
+    public static long toEpochMills(
+            long utcTimestampMills, TimeZone timeZone, boolean usedInTimer) {
+        if (timeZone.toZoneId().equals(UTC_ZONE_ID)) {
+            return utcTimestampMills;
+        }
+
+        if (timeZone.useDaylightTime()) {
+            // return the larger epoch mills if the time is leaving the DST.
+            // eg. Los_Angeles has two timestamp 2021-11-07 01:00:00 when 
leaving DST.
+            //  long epoch0  = 1636268400000L;  2021-11-07 00:00:00
+            //  long epoch1  = 1636272000000L;  the first local timestamp 
2021-11-07 01:00:00
+            //  long epoch2  = 1636275600000L;  rollback to  2021-11-07 
01:00:00
+            //  long epoch3  = 1636279200000L;  2021-11-07 02:00:00
+            // we should use the epoch2 to register timer to ensure the two 
hours' data can be fired
+            // properly.
+            LocalDateTime utcTimestamp =
+                    
LocalDateTime.ofInstant(Instant.ofEpochMilli(utcTimestampMills), UTC_ZONE_ID);
+            long epoch1 = 
utcTimestamp.atZone(timeZone.toZoneId()).toInstant().toEpochMilli();
+            long epoch2 =
+                    utcTimestamp
+                            .plusSeconds(SECONDS_PER_HOUR)
+                            .atZone(timeZone.toZoneId())
+                            .toInstant()
+                            .toEpochMilli();
+            boolean hasTwoEpochs = epoch2 - epoch1 > MILLS_PER_HOUR;
+            if (hasTwoEpochs && usedInTimer) {
+                return epoch2;

Review comment:
       Should be `epoch1 + MILLS_PER_HOUR`, would be better to add a test.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/TableToDataStreamITCase.scala
##########
@@ -256,6 +256,82 @@ final class TableToDataStreamITCase extends 
StreamingTestBase {
     assertEquals(expected2.sorted, TestSinkContextTableSink.ROWTIMES.sorted)
   }
 
+  @Test
+  def testFromTableToAppendStreamWithProctime(): Unit = {
+    val data = List(
+      rowOf(localDateTime(1L), "A"),
+      rowOf(localDateTime(2L), "B"),
+      rowOf(localDateTime(3L), "C"),
+      rowOf(localDateTime(4L), "D"),
+      rowOf(localDateTime(7L), "E"))
+
+    val dataId: String = TestValuesTableFactory.registerData(data)
+
+    val sourceDDL =
+      s"""
+         |CREATE TABLE src (
+         |  ts TIMESTAMP(3),
+         |  a STRING,
+         |  proctime as PROCTIME()
+         |) WITH (
+         |  'connector' = 'values',
+         |  'data-id' = '$dataId'
+         |)
+      """.stripMargin
+
+    tEnv.executeSql(sourceDDL)
+    val dataStream = tEnv.sqlQuery("SELECT a, ts, proctime FROM 
src").toAppendStream[Row]
+
+    val sink = new TestingAppendSink
+    dataStream.addSink(sink)
+    env.execute("TableToAppendStream")
+    // the result should like as following, we only check the result
+    //  number because of the non-deterministic of proctime()
+    //    A,1970-01-01T00:00:01,2021-03-30T03:14:40.936Z
+    //    B,1970-01-01T00:00:02,2021-03-30T03:14:40.946Z
+    //    C,1970-01-01T00:00:03,2021-03-30T03:14:40.946Z
+    //    D,1970-01-01T00:00:04,2021-03-30T03:14:40.946Z
+    //    E,1970-01-01T00:00:07,2021-03-30T03:14:40.946Z
+    assertEquals(5, sink.getAppendResults.size)
+  }
+
+  @Test
+  def testHasFromDataStreamToTableBackDataStreamWithProctime(): Unit = {
+    val data = Seq(
+      (1L, "A"),
+      (2L, "B"),
+      (3L, "C"),
+      (4L, "D"),
+      (7L, "E"))
+
+    val ds1 = env.fromCollection(data)
+      // second to millisecond
+      .assignAscendingTimestamps(_._1 * 1000L)
+    val table = ds1.toTable(tEnv, 'ts, 'a, 'proctime.proctime())
+    tEnv.registerTable("t1", table)
+
+    val ds2 = tEnv.sqlQuery(
+      """
+        | SELECT CONCAT(a, '_'), ts, proctime
+        | FROM t1
+      """.stripMargin
+    ).toAppendStream[Row]
+
+    val sink = new TestingAppendSink
+    ds2.addSink(sink)
+    env.execute("DataStreamToTableBackDataStream")
+
+    // the result should like as following, we only check the result

Review comment:
       ditto.

##########
File path: 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/WindowAggregateJsonPlanTest_jsonplan/testProcTimeCumulateWindow.out
##########
@@ -363,14 +363,14 @@
         "EXPR$1" : "BIGINT NOT NULL"
       }, {
         "window_start" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",
           "nullable" : false,
           "precision" : 3,
           "kind" : "REGULAR"
         }
       }, {
         "window_end" : {
-          "type" : "TIMESTAMP_WITHOUT_TIME_ZONE",
+          "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE",

Review comment:
       Should be `TIMESTAMP`. 
   

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util;
+
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.TimeZone;
+
+/** Time util to deals window start and end in different timezone. */
+public class TimeWindowUtil {
+
+    private static final ZoneId UTC_ZONE_ID = 
TimeZone.getTimeZone("UTC").toZoneId();
+
+    private static final long SECONDS_PER_HOUR = 60 * 60L;
+
+    private static final long MILLS_PER_HOUR = SECONDS_PER_HOUR * 1000L;
+
+    /**
+     * Convert a epoch mills to timestamp mills which can describe a locate 
date time.
+     *
+     * <p>For example: The timestamp string of epoch mills 5 in UTC+8 is 
1970-01-01 08:00:05, the
+     * timestamp mills is 8 * 60 * 60 * 100 + 5.
+     *
+     * @param epochMills the epoch mills.
+     * @param timeZone the timezone
+     * @return the mills which can describe the local timestamp string in 
given timezone.
+     */
+    public static long toUtcTimestampMills(long epochMills, TimeZone timeZone) 
{
+        if (timeZone.toZoneId().equals(UTC_ZONE_ID)) {
+            return epochMills;
+        }
+        LocalDateTime localDateTime =
+                LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMills), 
timeZone.toZoneId());
+        return localDateTime.atZone(UTC_ZONE_ID).toInstant().toEpochMilli();
+    }
+
+    /**
+     * Convert a timestamp mills with given timezone to epoch mills.
+     *
+     * @param utcTimestampMills the timestamp mills.
+     * @param timeZone the timezone
+     * @param usedInTimer if the utc timestamp can map to multiple epoch 
mills(at most two), use the
+     *     bigger one for timer, use the smaller one for window property.
+     * @return the epoch mills.
+     */
+    public static long toEpochMills(
+            long utcTimestampMills, TimeZone timeZone, boolean usedInTimer) {
+        if (timeZone.toZoneId().equals(UTC_ZONE_ID)) {
+            return utcTimestampMills;
+        }
+
+        if (timeZone.useDaylightTime()) {
+            // return the larger epoch mills if the time is leaving the DST.
+            // eg. Los_Angeles has two timestamp 2021-11-07 01:00:00 when 
leaving DST.
+            //  long epoch0  = 1636268400000L;  2021-11-07 00:00:00
+            //  long epoch1  = 1636272000000L;  the first local timestamp 
2021-11-07 01:00:00
+            //  long epoch2  = 1636275600000L;  rollback to  2021-11-07 
01:00:00
+            //  long epoch3  = 1636279200000L;  2021-11-07 02:00:00
+            // we should use the epoch2 to register timer to ensure the two 
hours' data can be fired
+            // properly.
+            LocalDateTime utcTimestamp =

Review comment:
       Would be better to mention:
   
   ```
               // 2021-11-07 00:00:00 => long epoch0 = 1636268400000L;
               // 2021-11-07 01:00:00 => long epoch1 = 1636272000000L; register 
1636275600000L
               // 2021-11-07 02:00:00 => long epoch2 = 1636279200000L;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to