wuchong commented on a change in pull request #15280:
URL: https://github.com/apache/flink/pull/15280#discussion_r601122715



##########
File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/typeutils/FieldInfoUtils.java
##########
@@ -820,8 +821,13 @@ private static UnresolvedReferenceExpression 
getChildAsReference(Expression expr
     }
 
     private static DataType createTimeIndicatorType(TimestampKind kind) {
-        return new AtomicDataType(new TimestampType(true, kind, 3))
-                .bridgedTo(java.sql.Timestamp.class);
+        if (kind == TimestampKind.PROCTIME) {
+            return new AtomicDataType(new LocalZonedTimestampType(true, kind, 
3))
+                    .bridgedTo(java.time.Instant.class);
+        } else {
+            return new AtomicDataType(new TimestampType(true, kind, 3))
+                    .bridgedTo(java.sql.Timestamp.class);

Review comment:
       Shouldn't `TimestampType` be bridged to `LocalDateTime` by default? It 
seems this conversion class is not used. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.rel.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelDistributionTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Snapshot;
+import org.apache.calcite.rel.metadata.RelMdCollation;
+import org.apache.calcite.rel.metadata.RelMdDistribution;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Litmus;
+
+/**
+ * Sub-class of {@link org.apache.calcite.rel.core.Snapshot} not targeted at 
any particular engine
+ * or calling convention.
+ *
+ * <p>Line 80 ~ 91: Calcite only supports timestamp type as period type, but 
Flink supports both
+ * Timestamp and TimestampLtz. Should be removed once calcite support 
TimestampLtz as period type.
+ */
+public class LogicalSnapshot extends Snapshot {
+
+    // ~ Constructors 
-----------------------------------------------------------
+
+    /**
+     * Creates a LogicalSnapshot.
+     *
+     * <p>Use {@link #create} unless you know what you're doing.
+     *
+     * @param cluster Cluster that this relational expression belongs to
+     * @param traitSet The traits of this relational expression
+     * @param input Input relational expression
+     * @param period Timestamp expression which as the table was at the given 
time in the past
+     */
+    public LogicalSnapshot(
+            RelOptCluster cluster, RelTraitSet traitSet, RelNode input, 
RexNode period) {
+        super(cluster, traitSet, input, period);
+    }
+
+    @Override
+    public Snapshot copy(RelTraitSet traitSet, RelNode input, RexNode period) {
+        return new LogicalSnapshot(getCluster(), traitSet, input, period);
+    }
+
+    /** Creates a LogicalSnapshot. */
+    public static LogicalSnapshot create(RelNode input, RexNode period) {
+        final RelOptCluster cluster = input.getCluster();
+        final RelMetadataQuery mq = cluster.getMetadataQuery();
+        final RelTraitSet traitSet =
+                cluster.traitSet()
+                        .replace(Convention.NONE)
+                        .replaceIfs(
+                                RelCollationTraitDef.INSTANCE,
+                                () -> RelMdCollation.snapshot(mq, input))
+                        .replaceIf(
+                                RelDistributionTraitDef.INSTANCE,
+                                () -> RelMdDistribution.snapshot(mq, input));
+        return new LogicalSnapshot(cluster, traitSet, input, period);
+    }
+
+    @Override
+    public boolean isValid(Litmus litmus, Context context) {
+        SqlTypeName periodTypeName = getPeriod().getType().getSqlTypeName();
+
+        if (!(periodTypeName == SqlTypeName.TIMESTAMP
+                || periodTypeName == 
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {

Review comment:
       Would be better to check `SqlTypeFamily.TIMESTAMP` for better 
extensibility. 

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/sql/validate/SqlValidatorImpl.java
##########
@@ -0,0 +1,6550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.sql.validate;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import org.apache.calcite.linq4j.Ord;
+import org.apache.calcite.linq4j.function.Function2;
+import org.apache.calcite.linq4j.function.Functions;
+import org.apache.calcite.plan.RelOptTable;
+import org.apache.calcite.plan.RelOptUtil;
+import org.apache.calcite.prepare.Prepare;
+import org.apache.calcite.rel.type.DynamicRecordType;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeSystem;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexPatternFieldRef;
+import org.apache.calcite.rex.RexVisitor;
+import org.apache.calcite.runtime.CalciteContextException;
+import org.apache.calcite.runtime.CalciteException;
+import org.apache.calcite.runtime.Feature;
+import org.apache.calcite.runtime.Resources;
+import org.apache.calcite.schema.ColumnStrategy;
+import org.apache.calcite.schema.Table;
+import org.apache.calcite.schema.impl.ModifiableViewTable;
+import org.apache.calcite.sql.JoinConditionType;
+import org.apache.calcite.sql.JoinType;
+import org.apache.calcite.sql.SqlAccessEnum;
+import org.apache.calcite.sql.SqlAccessType;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCallBinding;
+import org.apache.calcite.sql.SqlDataTypeSpec;
+import org.apache.calcite.sql.SqlDelete;
+import org.apache.calcite.sql.SqlDynamicParam;
+import org.apache.calcite.sql.SqlExplain;
+import org.apache.calcite.sql.SqlFunction;
+import org.apache.calcite.sql.SqlFunctionCategory;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlInsert;
+import org.apache.calcite.sql.SqlIntervalLiteral;
+import org.apache.calcite.sql.SqlIntervalQualifier;
+import org.apache.calcite.sql.SqlJoin;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlMatchRecognize;
+import org.apache.calcite.sql.SqlMerge;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlOperatorTable;
+import org.apache.calcite.sql.SqlOrderBy;
+import org.apache.calcite.sql.SqlPivot;
+import org.apache.calcite.sql.SqlSampleSpec;
+import org.apache.calcite.sql.SqlSelect;
+import org.apache.calcite.sql.SqlSelectKeyword;
+import org.apache.calcite.sql.SqlSnapshot;
+import org.apache.calcite.sql.SqlSyntax;
+import org.apache.calcite.sql.SqlTableFunction;
+import org.apache.calcite.sql.SqlUnresolvedFunction;
+import org.apache.calcite.sql.SqlUpdate;
+import org.apache.calcite.sql.SqlUtil;
+import org.apache.calcite.sql.SqlWindow;
+import org.apache.calcite.sql.SqlWindowTableFunction;
+import org.apache.calcite.sql.SqlWith;
+import org.apache.calcite.sql.SqlWithItem;
+import org.apache.calcite.sql.fun.SqlCase;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.AssignableOperandTypeChecker;
+import org.apache.calcite.sql.type.OperandTypes;
+import org.apache.calcite.sql.type.ReturnTypes;
+import org.apache.calcite.sql.type.SqlOperandTypeChecker;
+import org.apache.calcite.sql.type.SqlOperandTypeInference;
+import org.apache.calcite.sql.type.SqlTypeCoercionRule;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.sql.util.IdPair;
+import org.apache.calcite.sql.util.SqlBasicVisitor;
+import org.apache.calcite.sql.util.SqlShuttle;
+import org.apache.calcite.sql.util.SqlVisitor;
+import org.apache.calcite.sql.validate.implicit.TypeCoercion;
+import org.apache.calcite.util.BitString;
+import org.apache.calcite.util.Bug;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.Litmus;
+import org.apache.calcite.util.Pair;
+import org.apache.calcite.util.Static;
+import org.apache.calcite.util.Util;
+import org.apache.calcite.util.trace.CalciteTrace;
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.util.AbstractList;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+
+import static org.apache.calcite.sql.SqlUtil.stripAs;
+import static org.apache.calcite.util.Static.RESOURCE;
+
+/**
+ * Default implementation of {@link SqlValidator}.
+ *
+ * <p>Lines 4697 ~ 4716, Flink enables TIMESTAMP and TIMESTAMP_LTZ for system 
time period
+ * specification type.
+ *
+ * <p>Lines 5034 ~ 5038, Flink enables TIMESTAMP and TIMESTAMP_LTZ for first 
orderBy column in

Review comment:
       Add Calcite JIRA ID. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.TimeZone;
+
+/** Time util to deals window start and end in different timezone. */
+public class TimeWindowUtil {
+
+    private static final ZoneId UTC_ZONE_ID = 
TimeZone.getTimeZone("UTC").toZoneId();
+
+    /**
+     * Method to get the window start for a timestamp.
+     *
+     * @param timestamp epoch millisecond to get the window start.
+     * @param offset The offset which window start would be shifted by.
+     * @param windowSize The size of the generated windows.
+     * @param timezone The timeZone used to shift the window start.
+     * @return window start
+     */
+    public static long getWindowStartWithOffset(
+            long timestamp, long offset, long windowSize, TimeZone timezone) {
+        if (timezone.useDaylightTime()) {
+            // mapping to UTC timestamp string
+            long timestampMills = TimeWindowUtil.toTimestampMills(timestamp, 
timezone);
+            // calculate the window start in UTC
+            long utcWindStart =
+                    timestampMills
+                            - ((timestampMills - offset) % windowSize + 
windowSize) % windowSize;
+            // mapping back from UTC timestamp
+            return TimeWindowUtil.toEpochMills(utcWindStart, timezone);
+        } else {
+            int timeZoneOffset = timezone.getOffset(timestamp);
+            return timestamp
+                    - ((timestamp + timeZoneOffset - offset) % windowSize + 
windowSize)
+                            % windowSize;
+        }
+    }
+
+    /** Minus an interval for window boundary, considering the timeZone and 
daylight savings. */
+    public static long windowMinus(long windowBoundary, long interval, 
TimeZone timeZone) {
+        if (timeZone.useDaylightTime()) {
+            long utcMills = toTimestampMills(windowBoundary, timeZone);
+            return toEpochMills(utcMills - interval, timeZone);
+        } else {
+            return windowBoundary - interval;
+        }
+    }
+
+    /** Plus an interval for window boundary, considering the timeZone and 
daylight savings. */
+    public static long windowPlus(long windowBoundary, long interval, TimeZone 
timeZone) {
+        if (timeZone.useDaylightTime()) {
+            long utcMills = toTimestampMills(windowBoundary, timeZone);
+            return toEpochMills(utcMills + interval, timeZone);
+        } else {
+            return windowBoundary + interval;
+        }
+    }
+
+    /**
+     * Convert a epoch mills to timestamp mills which can describe a locate 
date time.

Review comment:
       ```suggestion
        * Convert a epoch mills to timestamp mills which can describe a local 
date time.
   ```

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.TimeZone;
+
+/** Time util to deals window start and end in different timezone. */
+public class TimeWindowUtil {
+
+    private static final ZoneId UTC_ZONE_ID = 
TimeZone.getTimeZone("UTC").toZoneId();
+
+    /**
+     * Method to get the window start for a timestamp.
+     *
+     * @param timestamp epoch millisecond to get the window start.
+     * @param offset The offset which window start would be shifted by.
+     * @param windowSize The size of the generated windows.
+     * @param timezone The timeZone used to shift the window start.
+     * @return window start
+     */
+    public static long getWindowStartWithOffset(
+            long timestamp, long offset, long windowSize, TimeZone timezone) {
+        if (timezone.useDaylightTime()) {
+            // mapping to UTC timestamp string
+            long timestampMills = TimeWindowUtil.toTimestampMills(timestamp, 
timezone);
+            // calculate the window start in UTC
+            long utcWindStart =
+                    timestampMills
+                            - ((timestampMills - offset) % windowSize + 
windowSize) % windowSize;
+            // mapping back from UTC timestamp
+            return TimeWindowUtil.toEpochMills(utcWindStart, timezone);
+        } else {
+            int timeZoneOffset = timezone.getOffset(timestamp);
+            return timestamp
+                    - ((timestamp + timeZoneOffset - offset) % windowSize + 
windowSize)
+                            % windowSize;
+        }
+    }
+
+    /** Minus an interval for window boundary, considering the timeZone and 
daylight savings. */
+    public static long windowMinus(long windowBoundary, long interval, 
TimeZone timeZone) {
+        if (timeZone.useDaylightTime()) {
+            long utcMills = toTimestampMills(windowBoundary, timeZone);
+            return toEpochMills(utcMills - interval, timeZone);
+        } else {
+            return windowBoundary - interval;
+        }
+    }
+
+    /** Plus an interval for window boundary, considering the timeZone and 
daylight savings. */
+    public static long windowPlus(long windowBoundary, long interval, TimeZone 
timeZone) {
+        if (timeZone.useDaylightTime()) {
+            long utcMills = toTimestampMills(windowBoundary, timeZone);
+            return toEpochMills(utcMills + interval, timeZone);
+        } else {
+            return windowBoundary + interval;
+        }
+    }
+
+    /**
+     * Convert a epoch mills to timestamp mills which can describe a locate 
date time.
+     *
+     * <p>For example: The timestamp string of epoch mills 5 in UTC+8 is 
1970-01-01 08:00:05, the
+     * timestamp mills is 8 * 60 * 60 * 100 + 5.
+     *
+     * @param epochMills the epoch mills.
+     * @param timeZone the timezone
+     * @return the mills which can describe the local timestamp string in 
given timezone.
+     */
+    public static long toTimestampMills(long epochMills, TimeZone timeZone) {
+        LocalDateTime localDateTime =
+                LocalDateTime.ofInstant(Instant.ofEpochMilli(epochMills), 
timeZone.toZoneId());
+        return localDateTime.atZone(UTC_ZONE_ID).toInstant().toEpochMilli();
+    }
+
+    /**
+     * Convert a timestamp mills with given timezone to epoch mills.
+     *
+     * @param timestampMills the timestamp mills.
+     * @param timeZone the timezone
+     * @return the epoch mills.
+     */
+    public static long toEpochMills(long timestampMills, TimeZone timeZone) {

Review comment:
       can be private

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/calcite/FlinkRexBuilder.java
##########
@@ -63,4 +65,34 @@ public RexNode makeFieldAccess(RexNode expr, int i) {
 
         return field;
     }
+
+    /**
+     * Creates a literal of the default value for the given type.
+     *
+     * <p>This value is:
+     *
+     * <ul>
+     *   <li>0 for numeric types;
+     *   <li>FALSE for BOOLEAN;
+     *   <li>The epoch for TIMESTAMP and DATE;
+     *   <li>Midnight for TIME;
+     *   <li>The empty string for string types (CHAR, BINARY, VARCHAR, 
VARBINARY).
+     * </ul>
+     *
+     * <p>Uses '19-70-01-01 00:00:00'(epoch 0 second) as zero value for 
TIMESTAMP_LTZ, the zero
+     * value '0000-00-00 00:00:00' in Calcite is an invalid time whose month 
and day is invalid, we
+     * workaround here. Stop override once Calcite fix this issue.

Review comment:
       Add Calcite JIRA issue here. 

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/TableSchema.java
##########
@@ -519,18 +520,20 @@ private static void validateColumnsAndWatermarkSpecs(
                                                     String.format(
                                                             "Rowtime attribute 
'%s' is not defined in schema.",
                                                             
rowtimeAttribute)));
-            if (rowtimeType.getTypeRoot() != TIMESTAMP_WITHOUT_TIME_ZONE) {
+            if (!(rowtimeType.getTypeRoot() == TIMESTAMP_WITHOUT_TIME_ZONE
+                    || rowtimeType.getTypeRoot() == 
TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
                 throw new ValidationException(
                         String.format(
-                                "Rowtime attribute '%s' must be of type 
TIMESTAMP but is of type '%s'.",
+                                "Rowtime attribute '%s' must be of type 
TIMESTAMP or TIMESTAMP_LTZ but is of type '%s'.",
                                 rowtimeAttribute, rowtimeType));
             }
             LogicalType watermarkOutputType =
                     watermark.getWatermarkExprOutputType().getLogicalType();
-            if (watermarkOutputType.getTypeRoot() != 
TIMESTAMP_WITHOUT_TIME_ZONE) {
+            if (!(watermarkOutputType.getTypeRoot() == 
TIMESTAMP_WITHOUT_TIME_ZONE
+                    || watermarkOutputType.getTypeRoot() == 
TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {

Review comment:
       We can extract a method to do this work, e.g. 
`LogicalTypeChecks#allowTimeAttribute()`.

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.TimeZone;
+
+/** Time util to deals window start and end in different timezone. */
+public class TimeWindowUtil {
+
+    private static final ZoneId UTC_ZONE_ID = 
TimeZone.getTimeZone("UTC").toZoneId();
+
+    /**
+     * Method to get the window start for a timestamp.
+     *
+     * @param timestamp epoch millisecond to get the window start.
+     * @param offset The offset which window start would be shifted by.
+     * @param windowSize The size of the generated windows.
+     * @param timezone The timeZone used to shift the window start.
+     * @return window start
+     */
+    public static long getWindowStartWithOffset(
+            long timestamp, long offset, long windowSize, TimeZone timezone) {
+        if (timezone.useDaylightTime()) {
+            // mapping to UTC timestamp string
+            long timestampMills = TimeWindowUtil.toTimestampMills(timestamp, 
timezone);
+            // calculate the window start in UTC
+            long utcWindStart =
+                    timestampMills
+                            - ((timestampMills - offset) % windowSize + 
windowSize) % windowSize;
+            // mapping back from UTC timestamp
+            return TimeWindowUtil.toEpochMills(utcWindStart, timezone);
+        } else {
+            int timeZoneOffset = timezone.getOffset(timestamp);
+            return timestamp
+                    - ((timestamp + timeZoneOffset - offset) % windowSize + 
windowSize)
+                            % windowSize;
+        }
+    }
+
+    /** Minus an interval for window boundary, considering the timeZone and 
daylight savings. */
+    public static long windowMinus(long windowBoundary, long interval, 
TimeZone timeZone) {
+        if (timeZone.useDaylightTime()) {
+            long utcMills = toTimestampMills(windowBoundary, timeZone);
+            return toEpochMills(utcMills - interval, timeZone);
+        } else {
+            return windowBoundary - interval;
+        }
+    }
+
+    /** Plus an interval for window boundary, considering the timeZone and 
daylight savings. */
+    public static long windowPlus(long windowBoundary, long interval, TimeZone 
timeZone) {

Review comment:
       `public static long timestampPlus(long epochMills, long plusMills, 
TimeZone timeZone)`

##########
File path: 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
##########
@@ -260,25 +261,33 @@ private static DataType createLegacyType(
 
     private static DataType convertToTimeAttributeType(
             TimeIndicatorTypeInfo timeIndicatorTypeInfo) {
-        final TimestampKind kind;
         if (timeIndicatorTypeInfo.isEventTime()) {
-            kind = TimestampKind.ROWTIME;
+            return new AtomicDataType(new TimestampType(true, 
TimestampKind.ROWTIME, 3))
+                    .bridgedTo(java.sql.Timestamp.class);

Review comment:
       not sure. 

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/util/TimeWindowUtil.java
##########
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.util;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.TimeZone;
+
+/** Time util to deals window start and end in different timezone. */
+public class TimeWindowUtil {
+
+    private static final ZoneId UTC_ZONE_ID = 
TimeZone.getTimeZone("UTC").toZoneId();
+
+    /**
+     * Method to get the window start for a timestamp.
+     *
+     * @param timestamp epoch millisecond to get the window start.
+     * @param offset The offset which window start would be shifted by.
+     * @param windowSize The size of the generated windows.
+     * @param timezone The timeZone used to shift the window start.
+     * @return window start
+     */
+    public static long getWindowStartWithOffset(
+            long timestamp, long offset, long windowSize, TimeZone timezone) {
+        if (timezone.useDaylightTime()) {
+            // mapping to UTC timestamp string
+            long timestampMills = TimeWindowUtil.toTimestampMills(timestamp, 
timezone);
+            // calculate the window start in UTC
+            long utcWindStart =
+                    timestampMills
+                            - ((timestampMills - offset) % windowSize + 
windowSize) % windowSize;
+            // mapping back from UTC timestamp
+            return TimeWindowUtil.toEpochMills(utcWindStart, timezone);
+        } else {
+            int timeZoneOffset = timezone.getOffset(timestamp);
+            return timestamp
+                    - ((timestamp + timeZoneOffset - offset) % windowSize + 
windowSize)
+                            % windowSize;
+        }
+    }
+
+    /** Minus an interval for window boundary, considering the timeZone and 
daylight savings. */
+    public static long windowMinus(long windowBoundary, long interval, 
TimeZone timeZone) {
+        if (timeZone.useDaylightTime()) {
+            long utcMills = toTimestampMills(windowBoundary, timeZone);
+            return toEpochMills(utcMills - interval, timeZone);
+        } else {
+            return windowBoundary - interval;
+        }
+    }
+
+    /** Plus an interval for window boundary, considering the timeZone and 
daylight savings. */
+    public static long windowPlus(long windowBoundary, long interval, TimeZone 
timeZone) {
+        if (timeZone.useDaylightTime()) {
+            long utcMills = toTimestampMills(windowBoundary, timeZone);
+            return toEpochMills(utcMills + interval, timeZone);
+        } else {
+            return windowBoundary + interval;
+        }
+    }
+
+    /**
+     * Convert a epoch mills to timestamp mills which can describe a locate 
date time.
+     *
+     * <p>For example: The timestamp string of epoch mills 5 in UTC+8 is 
1970-01-01 08:00:05, the
+     * timestamp mills is 8 * 60 * 60 * 100 + 5.
+     *
+     * @param epochMills the epoch mills.
+     * @param timeZone the timezone
+     * @return the mills which can describe the local timestamp string in 
given timezone.
+     */
+    public static long toTimestampMills(long epochMills, TimeZone timeZone) {

Review comment:
       can be private

##########
File path: 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/window/assigners/SlidingWindowAssignerTest.java
##########
@@ -165,26 +182,76 @@ public void testWindowAssignmentWithOffset() {
                 assigner.getLastWindow(new TimeWindow(2100, 3100)), new 
TimeWindow(2100, 7100));
     }
 
+    @Test
+    public void testDstSaving() {
+        if (!timeZone.useDaylightTime()) {
+            return;
+        }
+        SlidingWindowAssigner assigner =
+                SlidingWindowAssigner.of(Duration.ofHours(4), 
Duration.ofHours(1), timeZone);
+        // Los_Angeles local time in epoch mills.
+        // The DaylightTime in Los_Angele start at time 2021-03-14 02:00:00
+        // long epoch1 = 1615708800000L; 2021-03-14 00:00:00
+        // long epoch2 = 1615712400000L; 2021-03-14 01:00:00
+        // long epoch3 = 1615716000000L; 2021-03-14 03:00:00, skip one hour 
(2021-03-14 02:00:00)
+        // long epoch4 = 1615719600000L; 2021-03-14 04:00:00
+
+        long epoch1 = 1615708800000L;

Review comment:
       Please test `assigner.assignWindows(ELEMENT, 1615719600000L)`.

##########
File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/calcite/rel/logical/LogicalSnapshot.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.calcite.rel.logical;
+
+import org.apache.calcite.plan.Convention;
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelTraitSet;
+import org.apache.calcite.rel.RelCollationTraitDef;
+import org.apache.calcite.rel.RelDistributionTraitDef;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Snapshot;
+import org.apache.calcite.rel.metadata.RelMdCollation;
+import org.apache.calcite.rel.metadata.RelMdDistribution;
+import org.apache.calcite.rel.metadata.RelMetadataQuery;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.calcite.util.Litmus;
+
+/**
+ * Sub-class of {@link org.apache.calcite.rel.core.Snapshot} not targeted at 
any particular engine
+ * or calling convention.
+ *
+ * <p>Line 80 ~ 91: Calcite only supports timestamp type as period type, but 
Flink supports both

Review comment:
       Add calcite issue ID here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to