This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 94c48f428be [FLINK-35318][table-planner] Use UTC time zone for 
TIMESTAMP_LTZ type in RexNodeToExpressionConverter
94c48f428be is described below

commit 94c48f428bedcc07840d243ccf3f0c31e5831794
Author: 林尚泉 <[email protected]>
AuthorDate: Mon Jul 15 14:23:10 2024 +0800

    [FLINK-35318][table-planner] Use UTC time zone for TIMESTAMP_LTZ type in 
RexNodeToExpressionConverter
    
    This closes #24787.
---
 .../planner/operations/DeletePushDownUtils.java    |  6 +----
 .../plan/abilities/source/FilterPushDownSpec.java  |  4 ----
 .../PushFilterIntoLegacyTableSourceScanRule.java   |  6 +----
 .../PushPartitionIntoTableSourceScanRule.java      |  6 +----
 ...ushPartitionIntoLegacyTableSourceScanRule.scala |  6 ++---
 .../table/planner/plan/utils/FlinkRexUtil.scala    |  5 ++--
 .../planner/plan/utils/RexNodeExtractor.scala      | 22 +++++++-----------
 .../operations/DeletePushDownUtilsTest.java        | 17 ++++++++++++++
 .../PushFilterInCalcIntoTableSourceRuleTest.java   | 25 ++++++++++++++++++++
 .../PushFilterIntoTableSourceScanRuleTest.java     | 24 +++++++++++++++++++
 .../PushFilterIntoTableSourceScanRuleTestBase.java |  5 ++++
 .../PushFilterInCalcIntoTableSourceRuleTest.xml    | 17 ++++++++++++++
 ...PushFilterIntoLegacyTableSourceScanRuleTest.xml | 18 +++++++++++++++
 .../PushFilterIntoTableSourceScanRuleTest.xml      | 18 +++++++++++++++
 ...shFilterIntoLegacyTableSourceScanRuleTest.scala | 27 ++++++++++++++++++++++
 .../planner/plan/utils/RexNodeExtractorTest.scala  | 23 +++++++-----------
 16 files changed, 174 insertions(+), 55 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
index d5270cb89b7..6c4f72e6b0d 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
@@ -38,7 +38,6 @@ import 
org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterCondition
 import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
 import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
-import org.apache.flink.table.planner.utils.TableConfigUtils;
 
 import org.apache.calcite.plan.RelOptPredicateList;
 import org.apache.calcite.plan.RelOptRuleCall;
@@ -56,7 +55,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
-import java.util.TimeZone;
 import java.util.stream.Collectors;
 
 import scala.Option;
@@ -229,9 +227,7 @@ public class DeletePushDownUtils {
                         filter.getCluster().getRexBuilder(),
                         
filter.getInput().getRowType().getFieldNames().toArray(new String[0]),
                         context.getFunctionCatalog(),
-                        context.getCatalogManager(),
-                        TimeZone.getTimeZone(
-                                
TableConfigUtils.getLocalTimeZone(context.getTableConfig())));
+                        context.getCatalogManager());
         List<Expression> filters =
                 Arrays.stream(convertiblePredicates)
                         .map(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
index 007fa0a4054..4a0cb5b2baf 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.table.expressions.resolver.ExpressionResolver;
 import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
 import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.planner.utils.TableConfigUtils;
 import org.apache.flink.table.types.logical.RowType;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -42,7 +41,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.TimeZone;
 import java.util.stream.Collectors;
 
 import scala.Option;
@@ -105,8 +103,6 @@ public final class FilterPushDownSpec extends 
SourceAbilitySpecBase {
                             
context.getSourceRowType().getFieldNames().toArray(new String[0]),
                             context.getFunctionCatalog(),
                             context.getCatalogManager(),
-                            TimeZone.getTimeZone(
-                                    
TableConfigUtils.getLocalTimeZone(context.getTableConfig())),
                             Option.apply(
                                     context.getTypeFactory()
                                             
.buildRelNodeRowType(context.getSourceRowType())));
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRule.java
index 5a07d90828f..2a4045dbfe8 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRule.java
@@ -30,7 +30,6 @@ import 
org.apache.flink.table.planner.plan.stats.FlinkStatistic;
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
 import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
 import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
-import org.apache.flink.table.planner.utils.TableConfigUtils;
 import org.apache.flink.table.sources.FilterableTableSource;
 import org.apache.flink.table.sources.TableSource;
 
@@ -46,7 +45,6 @@ import org.immutables.value.Value;
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.TimeZone;
 import java.util.stream.Collectors;
 
 import scala.Tuple2;
@@ -120,9 +118,7 @@ public class PushFilterIntoLegacyTableSourceScanRule
                         filter.getInput().getRowType().getFieldNames(),
                         relBuilder.getRexBuilder(),
                         context.getFunctionCatalog(),
-                        context.getCatalogManager(),
-                        TimeZone.getTimeZone(
-                                
TableConfigUtils.getLocalTimeZone(unwrapTableConfig(scan))));
+                        context.getCatalogManager());
 
         Expression[] predicates = extracted._1;
         RexNode[] unconvertedRexNodes = extracted._2;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
index 4d87e25e0ce..ea39984c56c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
@@ -43,7 +43,6 @@ import 
org.apache.flink.table.planner.plan.utils.PartitionPruner;
 import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
 import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
-import org.apache.flink.table.planner.utils.TableConfigUtils;
 import org.apache.flink.table.types.logical.LogicalType;
 
 import org.apache.calcite.plan.RelOptRule;
@@ -64,7 +63,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.TimeZone;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -320,9 +318,7 @@ public class PushPartitionIntoTableSourceScanRule extends 
RelOptRule {
                         rexBuilder,
                         allFieldNames.toArray(new String[0]),
                         context.getFunctionCatalog(),
-                        context.getCatalogManager(),
-                        TimeZone.getTimeZone(
-                                
TableConfigUtils.getLocalTimeZone(context.getTableConfig())));
+                        context.getCatalogManager());
         ArrayList<Expression> partitionFilters = new ArrayList<>();
         Option<ResolvedExpression> subExpr;
         for (RexNode node : JavaConversions.seqAsJavaList(partitionPredicate)) 
{
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRule.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRule.scala
index 373b1d7feea..1ae85c17a30 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRule.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRule.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
 import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, 
PartitionPruner, RexNodeExtractor, RexNodeToExpressionConverter}
-import org.apache.flink.table.planner.utils.{CatalogTableStatisticsConverter, 
TableConfigUtils}
+import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
 import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext
 import org.apache.flink.table.sources.PartitionableTableSource
@@ -38,7 +38,6 @@ import org.apache.calcite.rel.logical.LogicalTableScan
 import org.apache.calcite.rex.{RexInputRef, RexNode, RexShuttle, RexUtil}
 
 import java.util
-import java.util.TimeZone
 
 import scala.collection.{mutable, JavaConversions}
 import scala.collection.JavaConversions._
@@ -157,8 +156,7 @@ class PushPartitionIntoLegacyTableSourceScanRule
               rexBuilder,
               inputFields,
               context.getFunctionCatalog,
-              context.getCatalogManager,
-              
TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(tableConfig)))
+              context.getCatalogManager)
             def toExpressions: Option[Seq[Expression]] = {
               val expressions = new mutable.ArrayBuffer[Expression]()
               for (predicate <- partitionPredicates) {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
index 5c2efec9072..c05a9d13443 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
@@ -25,7 +25,7 @@ import 
org.apache.flink.table.planner.plan.nodes.calcite.{LegacySink, Sink}
 import org.apache.flink.table.planner.plan.optimize.RelNodeBlock
 import 
org.apache.flink.table.planner.plan.utils.ExpressionDetail.ExpressionDetail
 import 
org.apache.flink.table.planner.plan.utils.ExpressionFormat.ExpressionFormat
-import org.apache.flink.table.planner.utils.{ShortcutUtils, TableConfigUtils}
+import org.apache.flink.table.planner.utils.ShortcutUtils
 
 import com.google.common.base.Function
 import com.google.common.collect.{ImmutableList, Lists}
@@ -45,7 +45,7 @@ import org.apache.calcite.util._
 import java.lang.{Iterable => JIterable}
 import java.math.BigDecimal
 import java.util
-import java.util.{Optional, TimeZone}
+import java.util.Optional
 import java.util.function.Predicate
 
 import scala.collection.JavaConversions._
@@ -675,7 +675,6 @@ object FlinkRexUtil {
         inputNames,
         context.getFunctionCatalog,
         context.getCatalogManager,
-        
TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(context.getTableConfig)),
         Some(rel.getRowType));
 
     RexNodeExtractor.extractConjunctiveConditions(
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
index 481cbda8b82..d27e34a664e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
@@ -45,8 +45,9 @@ import org.apache.calcite.sql.{SqlFunction, SqlKind, 
SqlPostfixOperator}
 import org.apache.calcite.sql.fun.{SqlStdOperatorTable, SqlTrimFunction}
 import org.apache.calcite.util.{TimestampString, Util}
 
+import java.time.{ZoneId, ZoneOffset}
 import java.util
-import java.util.{Collections, List => JList, TimeZone}
+import java.util.{Collections, List => JList}
 
 import scala.collection.{mutable, JavaConverters}
 import scala.collection.JavaConversions._
@@ -110,15 +111,10 @@ object RexNodeExtractor extends Logging {
       inputFieldNames: JList[String],
       rexBuilder: RexBuilder,
       functionCatalog: FunctionCatalog,
-      catalogManager: CatalogManager,
-      timeZone: TimeZone): (Array[Expression], Array[RexNode]) = {
+      catalogManager: CatalogManager): (Array[Expression], Array[RexNode]) = {
     val inputNames = inputFieldNames.asScala.toArray
-    val converter = new RexNodeToExpressionConverter(
-      rexBuilder,
-      inputNames,
-      functionCatalog,
-      catalogManager,
-      timeZone)
+    val converter =
+      new RexNodeToExpressionConverter(rexBuilder, inputNames, 
functionCatalog, catalogManager)
     val (convertibleRexNodes, unconvertedRexNodes) =
       extractConjunctiveConditions(expr, maxCnfNodeCount, rexBuilder, 
converter)
     val convertedExpressions = convertibleRexNodes.map(_.accept(converter).get)
@@ -399,7 +395,6 @@ class RexNodeToExpressionConverter(
     inputNames: Array[String],
     functionCatalog: FunctionCatalog,
     catalogManager: CatalogManager,
-    timeZone: TimeZone,
     relDataType: Option[RelDataType] = None)
   extends RexVisitor[Option[ResolvedExpression]] {
 
@@ -407,9 +402,8 @@ class RexNodeToExpressionConverter(
       rexBuilder: RexBuilder,
       inputNames: Array[String],
       functionCatalog: FunctionCatalog,
-      catalogManager: CatalogManager,
-      timeZone: TimeZone) = {
-    this(rexBuilder, inputNames, functionCatalog, catalogManager, timeZone, 
None)
+      catalogManager: CatalogManager) = {
+    this(rexBuilder, inputNames, functionCatalog, catalogManager, None)
   }
 
   override def visitInputRef(inputRef: RexInputRef): 
Option[ResolvedExpression] = {
@@ -455,7 +449,7 @@ class RexNodeToExpressionConverter(
 
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
         val v = literal.getValueAs(classOf[TimestampString])
-        toLocalDateTime(v).atZone(timeZone.toZoneId).toInstant
+        toLocalDateTime(v).atZone(ZoneId.of(ZoneOffset.UTC.getId)).toInstant
 
       case INTERVAL_DAY_TIME =>
         val v = literal.getValueAs(classOf[java.lang.Long])
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java
index f134223d243..90aeb61c031 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java
@@ -38,13 +38,18 @@ import 
org.apache.flink.table.planner.delegation.PlannerContext;
 import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory;
 import org.apache.flink.table.planner.parse.CalciteParser;
 import org.apache.flink.table.planner.utils.PlannerMocks;
+import org.apache.flink.table.planner.utils.TimestampStringUtils;
 import org.apache.flink.table.utils.CatalogManagerMocks;
 
 import org.apache.calcite.rel.RelRoot;
 import org.apache.calcite.rel.logical.LogicalTableModify;
 import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.TimestampString;
 import org.junit.jupiter.api.Test;
 
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -123,6 +128,7 @@ public class DeletePushDownUtilsTest {
                                 .column("f0", DataTypes.INT().notNull())
                                 .column("f1", DataTypes.STRING().nullable())
                                 .column("f2", DataTypes.BIGINT().nullable())
+                                .column("f3", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE().nullable())
                                 .build(),
                         null,
                         Collections.emptyList(),
@@ -154,6 +160,17 @@ public class DeletePushDownUtilsTest {
         tableModify = getTableModifyFromSql("DELETE FROM t where f0 > (select 
count(1) from t)");
         optionalResolvedExpressions = 
DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
         assertThat(optionalResolvedExpressions).isEmpty();
+
+        String dateTime = "2024-05-13 08:00:00";
+        tableModify =
+                getTableModifyFromSql(String.format("DELETE FROM t where f3 > 
'%s'", dateTime));
+        LocalDateTime ldt = TimestampStringUtils.toLocalDateTime(new 
TimestampString(dateTime));
+        Instant instant = 
ldt.toInstant(ZoneId.systemDefault().getRules().getOffset(ldt));
+        optionalResolvedExpressions = 
DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
+        assertThat(optionalResolvedExpressions).isPresent();
+        verifyExpression(
+                optionalResolvedExpressions,
+                String.format("[greaterThan(f3, %s)]", instant.toString()));
     }
 
     private CatalogTable createTestCatalogTable(Map<String, String> options) {
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java
index 3e3bbb76fa7..a01ae2bd7e5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java
@@ -37,6 +37,8 @@ import org.apache.calcite.tools.RuleSets;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.time.ZoneId;
+
 /** Test for {@link PushFilterInCalcIntoTableSourceRuleTest}. */
 class PushFilterInCalcIntoTableSourceRuleTest extends 
PushFilterIntoTableSourceScanRuleTestBase {
 
@@ -146,4 +148,27 @@ class PushFilterInCalcIntoTableSourceRuleTest extends 
PushFilterIntoTableSourceS
         util.tableEnv().executeSql(ddl);
         super.testWithInterval();
     }
+
+    @Test
+    public void testWithTimestampWithTimeZone() {
+        String ddl =
+                "CREATE TABLE MTable (\n"
+                        + "a TIMESTAMP_LTZ(3),\n"
+                        + "b TIMESTAMP(3)\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',\n"
+                        + " 'bounded' = 'false',\n"
+                        + " 'filterable-fields' = 'a',\n"
+                        + " 'disable-lookup' = 'true'"
+                        + ")";
+
+        util.tableEnv().executeSql(ddl);
+        ZoneId preZoneId = util.tableEnv().getConfig().getLocalTimeZone();
+        
util.tableEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
+        try {
+            super.testWithTimestampWithTimeZone();
+        } finally {
+            util.tableEnv().getConfig().setLocalTimeZone(preZoneId);
+        }
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java
index f4969a6fe88..da21dbc5b33 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java
@@ -33,6 +33,8 @@ import org.apache.calcite.tools.RuleSets;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.time.ZoneId;
+
 /** Test for {@link PushFilterIntoTableSourceScanRule}. */
 class PushFilterIntoTableSourceScanRuleTest extends 
PushFilterIntoTableSourceScanRuleTestBase {
 
@@ -151,6 +153,28 @@ class PushFilterIntoTableSourceScanRuleTest extends 
PushFilterIntoTableSourceSca
         super.testWithInterval();
     }
 
+    @Test
+    public void testWithTimestampWithTimeZone() {
+        String ddl =
+                "CREATE TABLE MTable (\n"
+                        + "a TIMESTAMP_LTZ(3),\n"
+                        + "b TIMESTAMP(3)\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'values',\n"
+                        + " 'bounded' = 'true',\n"
+                        + " 'filterable-fields' = 'a',\n"
+                        + " 'disable-lookup' = 'true'"
+                        + ")";
+        util.tableEnv().executeSql(ddl);
+        ZoneId preZoneId = util.tableEnv().getConfig().getLocalTimeZone();
+        
util.tableEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
+        try {
+            super.testWithTimestampWithTimeZone();
+        } finally {
+            util.tableEnv().getConfig().setLocalTimeZone(preZoneId);
+        }
+    }
+
     @Test
     void testBasicNestedFilter() {
         util.verifyRelPlan("SELECT * FROM NestedTable WHERE 
deepNested.nested1.`value` > 2");
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTestBase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTestBase.java
index 3debdf2d481..573bd8d6e78 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTestBase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTestBase.java
@@ -129,6 +129,11 @@ abstract class PushFilterIntoTableSourceScanRuleTestBase 
extends TableTestBase {
                         + "TIMESTAMPADD(YEAR, 2, b) >= a");
     }
 
+    @Test
+    public void testWithTimestampWithTimeZone() {
+        util.verifyRelPlan("SELECT * FROM MTable WHERE a > '2024-05-13 
08:00:00'");
+    }
+
     @Test
     void testCannotPushDownIn() {
         // this test is to avoid filter push down rules throwing exceptions
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.xml
index 8160f58a149..ceae952923d 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.xml
@@ -288,6 +288,23 @@ LogicalProject(a=[$0], b=[$1])
       <![CDATA[
 FlinkLogicalCalc(select=[a, b], where=[OR(>=(+(a, *(3600000:INTERVAL DAY TO 
SECOND, 5)), b), >=(+(b, *(12:INTERVAL YEAR TO MONTH, 2)), a))])
 +- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, 
MTable, filter=[]]], fields=[a, b])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWithTimestampWithTimeZone">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MTable WHERE a > '2024-05-13 08:00:00']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($0, CAST(_UTF-16LE'2024-05-13 
08:00:00'):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MTable, 
filter=[>(a, 2024-05-13 00:00:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))]]], 
fields=[a, b])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.xml
index b40aebe273c..e49909ed7a9 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.xml
@@ -284,6 +284,24 @@ LogicalProject(a=[$0], b=[$1])
 LogicalProject(a=[$0], b=[$1])
 +- LogicalFilter(condition=[OR(>=(+($0, *(3600000:INTERVAL DAY TO SECOND, 5)), 
$1), >=(+($1, *(12:INTERVAL YEAR TO MONTH, 2)), $0))])
    +- LogicalTableScan(table=[[default_catalog, default_database, MTable, 
source: [filterPushedDown=[true], filter=[]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWithTimestampWithTimeZone">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MTable WHERE a > '2024-05-13 08:00:00']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($0, CAST(_UTF-16LE'2024-05-13 
08:00:00'):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MTable, 
source: [filterPushedDown=[false], filter=[]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalTableScan(table=[[default_catalog, default_database, MTable, source: 
[filterPushedDown=[true], filter=[greaterThan(a, 2024-05-13T00:00:00Z)]]]])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml
index b27c92d52bf..8a27b0a7d68 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml
@@ -284,6 +284,24 @@ LogicalProject(a=[$0], b=[$1])
 LogicalProject(a=[$0], b=[$1])
 +- LogicalFilter(condition=[OR(>=(+($0, *(3600000:INTERVAL DAY TO SECOND, 5)), 
$1), >=(+($1, *(12:INTERVAL YEAR TO MONTH, 2)), $0))])
    +- LogicalTableScan(table=[[default_catalog, default_database, MTable, 
filter=[]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testWithTimestampWithTimeZone">
+    <Resource name="sql">
+      <![CDATA[SELECT * FROM MTable WHERE a > '2024-05-13 08:00:00']]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($0, CAST(_UTF-16LE'2024-05-13 
08:00:00'):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL)])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MTable]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalTableScan(table=[[default_catalog, default_database, MTable, 
filter=[>(a, 2024-05-13 00:00:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))]]])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.scala
index d418dbeeffb..276b15fb08c 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.scala
@@ -28,6 +28,8 @@ import org.apache.calcite.rel.rules.CoreRules
 import org.apache.calcite.tools.RuleSets
 import org.junit.jupiter.api.{BeforeEach, Test}
 
+import java.time.ZoneId
+
 /** Test for [[PushFilterIntoLegacyTableSourceScanRule]]. */
 class PushFilterIntoLegacyTableSourceScanRuleTest
   extends PushFilterIntoTableSourceScanRuleTestBase {
@@ -111,4 +113,29 @@ class PushFilterIntoLegacyTableSourceScanRuleTest
 
     super.testWithInterval()
   }
+
+  @Test
+  override def testWithTimestampWithTimeZone(): Unit = {
+    val schema = TableSchema
+      .builder()
+      .field("a", DataTypes.TIMESTAMP_LTZ(3))
+      .field("b", DataTypes.TIMESTAMP)
+      .build()
+    val data = List(
+      Row.of(localDateTime("2024-05-13 08:00:00"), localDateTime("2024-05-13 
08:00:00")))
+    TestLegacyFilterableTableSource.createTemporaryTable(
+      util.tableEnv,
+      schema,
+      "MTable",
+      isBounded = true,
+      data,
+      Set("a", "b"))
+    val preZoneId = util.tableEnv.getConfig.getLocalTimeZone
+    util.tableEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
+    try {
+      super.testWithTimestampWithTimeZone()
+    } finally {
+      util.tableEnv.getConfig.setLocalTimeZone(preZoneId)
+    }
+  }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
index 26c70a34f46..991d7fe3700 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
@@ -45,8 +45,8 @@ import org.junit.jupiter.api.Test
 
 import java.math.BigDecimal
 import java.net.URL
-import java.time.ZoneId
-import java.util.{Arrays, List => JList, TimeZone}
+import java.time.{ZoneId, ZoneOffset}
+import java.util.{Arrays, List => JList}
 
 import scala.collection.JavaConverters._
 
@@ -716,17 +716,12 @@ class RexNodeExtractorTest extends RexNodeTestBase {
 
     val relBuilder: RexBuilder = new FlinkRexBuilder(typeFactory)
 
-    val shanghai = ZoneId.of("Asia/Shanghai")
-    val (converted, _) = extractConjunctiveConditions(
-      and,
-      -1,
-      fieldNames,
-      relBuilder,
-      functionCatalog,
-      TimeZone.getTimeZone(shanghai))
+    val (converted, _) =
+      extractConjunctiveConditions(and, -1, fieldNames, relBuilder, 
functionCatalog)
 
     val datetime = DateTimeTestUtil.localDateTime("2017-09-10 14:23:01.123456")
-    val instant = datetime.toInstant(shanghai.getRules.getOffset(datetime))
+    val instant =
+      
datetime.toInstant(ZoneId.of(ZoneOffset.UTC.getId).getRules.getOffset(datetime))
 
     {
       val expected = Array[Expression](
@@ -775,16 +770,14 @@ class RexNodeExtractorTest extends RexNodeTestBase {
       maxCnfNodeCount: Int,
       inputFieldNames: JList[String],
       rexBuilder: RexBuilder,
-      catalog: FunctionCatalog,
-      tz: TimeZone = TimeZone.getDefault): (Array[Expression], Array[RexNode]) 
= {
+      catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = {
     RexNodeExtractor.extractConjunctiveConditions(
       expr,
       maxCnfNodeCount,
       inputFieldNames,
       rexBuilder,
       catalog,
-      catalogManager,
-      tz)
+      catalogManager)
   }
 
 }

Reply via email to