This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 94c48f428be [FLINK-35318][table-planner] Use UTC time zone for
TIMESTAMP_LTZ type in RexNodeToExpressionConverter
94c48f428be is described below
commit 94c48f428bedcc07840d243ccf3f0c31e5831794
Author: 林尚泉 <[email protected]>
AuthorDate: Mon Jul 15 14:23:10 2024 +0800
[FLINK-35318][table-planner] Use UTC time zone for TIMESTAMP_LTZ type in
RexNodeToExpressionConverter
This closes #24787.
---
.../planner/operations/DeletePushDownUtils.java | 6 +----
.../plan/abilities/source/FilterPushDownSpec.java | 4 ----
.../PushFilterIntoLegacyTableSourceScanRule.java | 6 +----
.../PushPartitionIntoTableSourceScanRule.java | 6 +----
...ushPartitionIntoLegacyTableSourceScanRule.scala | 6 ++---
.../table/planner/plan/utils/FlinkRexUtil.scala | 5 ++--
.../planner/plan/utils/RexNodeExtractor.scala | 22 +++++++-----------
.../operations/DeletePushDownUtilsTest.java | 17 ++++++++++++++
.../PushFilterInCalcIntoTableSourceRuleTest.java | 25 ++++++++++++++++++++
.../PushFilterIntoTableSourceScanRuleTest.java | 24 +++++++++++++++++++
.../PushFilterIntoTableSourceScanRuleTestBase.java | 5 ++++
.../PushFilterInCalcIntoTableSourceRuleTest.xml | 17 ++++++++++++++
...PushFilterIntoLegacyTableSourceScanRuleTest.xml | 18 +++++++++++++++
.../PushFilterIntoTableSourceScanRuleTest.xml | 18 +++++++++++++++
...shFilterIntoLegacyTableSourceScanRuleTest.scala | 27 ++++++++++++++++++++++
.../planner/plan/utils/RexNodeExtractorTest.scala | 23 +++++++-----------
16 files changed, 174 insertions(+), 55 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
index d5270cb89b7..6c4f72e6b0d 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/DeletePushDownUtils.java
@@ -38,7 +38,6 @@ import
org.apache.flink.table.planner.plan.rules.logical.SimplifyFilterCondition
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
import org.apache.flink.table.planner.utils.ShortcutUtils;
-import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.calcite.plan.RelOptPredicateList;
import org.apache.calcite.plan.RelOptRuleCall;
@@ -56,7 +55,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
-import java.util.TimeZone;
import java.util.stream.Collectors;
import scala.Option;
@@ -229,9 +227,7 @@ public class DeletePushDownUtils {
filter.getCluster().getRexBuilder(),
filter.getInput().getRowType().getFieldNames().toArray(new String[0]),
context.getFunctionCatalog(),
- context.getCatalogManager(),
- TimeZone.getTimeZone(
-
TableConfigUtils.getLocalTimeZone(context.getTableConfig())));
+ context.getCatalogManager());
List<Expression> filters =
Arrays.stream(convertiblePredicates)
.map(
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
index 007fa0a4054..4a0cb5b2baf 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/source/FilterPushDownSpec.java
@@ -27,7 +27,6 @@ import
org.apache.flink.table.expressions.resolver.ExpressionResolver;
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
-import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.types.logical.RowType;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -42,7 +41,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
-import java.util.TimeZone;
import java.util.stream.Collectors;
import scala.Option;
@@ -105,8 +103,6 @@ public final class FilterPushDownSpec extends
SourceAbilitySpecBase {
context.getSourceRowType().getFieldNames().toArray(new String[0]),
context.getFunctionCatalog(),
context.getCatalogManager(),
- TimeZone.getTimeZone(
-
TableConfigUtils.getLocalTimeZone(context.getTableConfig())),
Option.apply(
context.getTypeFactory()
.buildRelNodeRowType(context.getSourceRowType())));
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRule.java
index 5a07d90828f..2a4045dbfe8 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRule.java
@@ -30,7 +30,6 @@ import
org.apache.flink.table.planner.plan.stats.FlinkStatistic;
import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil;
import org.apache.flink.table.planner.plan.utils.FlinkRexUtil;
import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
-import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.sources.FilterableTableSource;
import org.apache.flink.table.sources.TableSource;
@@ -46,7 +45,6 @@ import org.immutables.value.Value;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
-import java.util.TimeZone;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -120,9 +118,7 @@ public class PushFilterIntoLegacyTableSourceScanRule
filter.getInput().getRowType().getFieldNames(),
relBuilder.getRexBuilder(),
context.getFunctionCatalog(),
- context.getCatalogManager(),
- TimeZone.getTimeZone(
-
TableConfigUtils.getLocalTimeZone(unwrapTableConfig(scan))));
+ context.getCatalogManager());
Expression[] predicates = extracted._1;
RexNode[] unconvertedRexNodes = extracted._2;
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
index 4d87e25e0ce..ea39984c56c 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoTableSourceScanRule.java
@@ -43,7 +43,6 @@ import
org.apache.flink.table.planner.plan.utils.PartitionPruner;
import org.apache.flink.table.planner.plan.utils.RexNodeExtractor;
import org.apache.flink.table.planner.plan.utils.RexNodeToExpressionConverter;
import org.apache.flink.table.planner.utils.ShortcutUtils;
-import org.apache.flink.table.planner.utils.TableConfigUtils;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.calcite.plan.RelOptRule;
@@ -64,7 +63,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.TimeZone;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -320,9 +318,7 @@ public class PushPartitionIntoTableSourceScanRule extends
RelOptRule {
rexBuilder,
allFieldNames.toArray(new String[0]),
context.getFunctionCatalog(),
- context.getCatalogManager(),
- TimeZone.getTimeZone(
-
TableConfigUtils.getLocalTimeZone(context.getTableConfig())));
+ context.getCatalogManager());
ArrayList<Expression> partitionFilters = new ArrayList<>();
Option<ResolvedExpression> subExpr;
for (RexNode node : JavaConversions.seqAsJavaList(partitionPredicate))
{
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRule.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRule.scala
index 373b1d7feea..1ae85c17a30 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRule.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushPartitionIntoLegacyTableSourceScanRule.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.plan.schema.LegacyTableSourceTable
import org.apache.flink.table.planner.plan.stats.FlinkStatistic
import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil,
PartitionPruner, RexNodeExtractor, RexNodeToExpressionConverter}
-import org.apache.flink.table.planner.utils.{CatalogTableStatisticsConverter,
TableConfigUtils}
+import org.apache.flink.table.planner.utils.CatalogTableStatisticsConverter
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
import org.apache.flink.table.planner.utils.ShortcutUtils.unwrapContext
import org.apache.flink.table.sources.PartitionableTableSource
@@ -38,7 +38,6 @@ import org.apache.calcite.rel.logical.LogicalTableScan
import org.apache.calcite.rex.{RexInputRef, RexNode, RexShuttle, RexUtil}
import java.util
-import java.util.TimeZone
import scala.collection.{mutable, JavaConversions}
import scala.collection.JavaConversions._
@@ -157,8 +156,7 @@ class PushPartitionIntoLegacyTableSourceScanRule
rexBuilder,
inputFields,
context.getFunctionCatalog,
- context.getCatalogManager,
-
TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(tableConfig)))
+ context.getCatalogManager)
def toExpressions: Option[Seq[Expression]] = {
val expressions = new mutable.ArrayBuffer[Expression]()
for (predicate <- partitionPredicates) {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
index 5c2efec9072..c05a9d13443 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala
@@ -25,7 +25,7 @@ import
org.apache.flink.table.planner.plan.nodes.calcite.{LegacySink, Sink}
import org.apache.flink.table.planner.plan.optimize.RelNodeBlock
import
org.apache.flink.table.planner.plan.utils.ExpressionDetail.ExpressionDetail
import
org.apache.flink.table.planner.plan.utils.ExpressionFormat.ExpressionFormat
-import org.apache.flink.table.planner.utils.{ShortcutUtils, TableConfigUtils}
+import org.apache.flink.table.planner.utils.ShortcutUtils
import com.google.common.base.Function
import com.google.common.collect.{ImmutableList, Lists}
@@ -45,7 +45,7 @@ import org.apache.calcite.util._
import java.lang.{Iterable => JIterable}
import java.math.BigDecimal
import java.util
-import java.util.{Optional, TimeZone}
+import java.util.Optional
import java.util.function.Predicate
import scala.collection.JavaConversions._
@@ -675,7 +675,6 @@ object FlinkRexUtil {
inputNames,
context.getFunctionCatalog,
context.getCatalogManager,
-
TimeZone.getTimeZone(TableConfigUtils.getLocalTimeZone(context.getTableConfig)),
Some(rel.getRowType));
RexNodeExtractor.extractConjunctiveConditions(
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
index 481cbda8b82..d27e34a664e 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractor.scala
@@ -45,8 +45,9 @@ import org.apache.calcite.sql.{SqlFunction, SqlKind,
SqlPostfixOperator}
import org.apache.calcite.sql.fun.{SqlStdOperatorTable, SqlTrimFunction}
import org.apache.calcite.util.{TimestampString, Util}
+import java.time.{ZoneId, ZoneOffset}
import java.util
-import java.util.{Collections, List => JList, TimeZone}
+import java.util.{Collections, List => JList}
import scala.collection.{mutable, JavaConverters}
import scala.collection.JavaConversions._
@@ -110,15 +111,10 @@ object RexNodeExtractor extends Logging {
inputFieldNames: JList[String],
rexBuilder: RexBuilder,
functionCatalog: FunctionCatalog,
- catalogManager: CatalogManager,
- timeZone: TimeZone): (Array[Expression], Array[RexNode]) = {
+ catalogManager: CatalogManager): (Array[Expression], Array[RexNode]) = {
val inputNames = inputFieldNames.asScala.toArray
- val converter = new RexNodeToExpressionConverter(
- rexBuilder,
- inputNames,
- functionCatalog,
- catalogManager,
- timeZone)
+ val converter =
+ new RexNodeToExpressionConverter(rexBuilder, inputNames,
functionCatalog, catalogManager)
val (convertibleRexNodes, unconvertedRexNodes) =
extractConjunctiveConditions(expr, maxCnfNodeCount, rexBuilder,
converter)
val convertedExpressions = convertibleRexNodes.map(_.accept(converter).get)
@@ -399,7 +395,6 @@ class RexNodeToExpressionConverter(
inputNames: Array[String],
functionCatalog: FunctionCatalog,
catalogManager: CatalogManager,
- timeZone: TimeZone,
relDataType: Option[RelDataType] = None)
extends RexVisitor[Option[ResolvedExpression]] {
@@ -407,9 +402,8 @@ class RexNodeToExpressionConverter(
rexBuilder: RexBuilder,
inputNames: Array[String],
functionCatalog: FunctionCatalog,
- catalogManager: CatalogManager,
- timeZone: TimeZone) = {
- this(rexBuilder, inputNames, functionCatalog, catalogManager, timeZone,
None)
+ catalogManager: CatalogManager) = {
+ this(rexBuilder, inputNames, functionCatalog, catalogManager, None)
}
override def visitInputRef(inputRef: RexInputRef):
Option[ResolvedExpression] = {
@@ -455,7 +449,7 @@ class RexNodeToExpressionConverter(
case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
val v = literal.getValueAs(classOf[TimestampString])
- toLocalDateTime(v).atZone(timeZone.toZoneId).toInstant
+ toLocalDateTime(v).atZone(ZoneId.of(ZoneOffset.UTC.getId)).toInstant
case INTERVAL_DAY_TIME =>
val v = literal.getValueAs(classOf[java.lang.Long])
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java
index f134223d243..90aeb61c031 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/DeletePushDownUtilsTest.java
@@ -38,13 +38,18 @@ import
org.apache.flink.table.planner.delegation.PlannerContext;
import org.apache.flink.table.planner.factories.TestUpdateDeleteTableFactory;
import org.apache.flink.table.planner.parse.CalciteParser;
import org.apache.flink.table.planner.utils.PlannerMocks;
+import org.apache.flink.table.planner.utils.TimestampStringUtils;
import org.apache.flink.table.utils.CatalogManagerMocks;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.logical.LogicalTableModify;
import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.util.TimestampString;
import org.junit.jupiter.api.Test;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -123,6 +128,7 @@ public class DeletePushDownUtilsTest {
.column("f0", DataTypes.INT().notNull())
.column("f1", DataTypes.STRING().nullable())
.column("f2", DataTypes.BIGINT().nullable())
+ .column("f3",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE().nullable())
.build(),
null,
Collections.emptyList(),
@@ -154,6 +160,17 @@ public class DeletePushDownUtilsTest {
tableModify = getTableModifyFromSql("DELETE FROM t where f0 > (select
count(1) from t)");
optionalResolvedExpressions =
DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
assertThat(optionalResolvedExpressions).isEmpty();
+
+ String dateTime = "2024-05-13 08:00:00";
+ tableModify =
+ getTableModifyFromSql(String.format("DELETE FROM t where f3 >
'%s'", dateTime));
+ LocalDateTime ldt = TimestampStringUtils.toLocalDateTime(new
TimestampString(dateTime));
+ Instant instant =
ldt.toInstant(ZoneId.systemDefault().getRules().getOffset(ldt));
+ optionalResolvedExpressions =
DeletePushDownUtils.getResolvedFilterExpressions(tableModify);
+ assertThat(optionalResolvedExpressions).isPresent();
+ verifyExpression(
+ optionalResolvedExpressions,
+ String.format("[greaterThan(f3, %s)]", instant.toString()));
}
private CatalogTable createTestCatalogTable(Map<String, String> options) {
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java
index 3e3bbb76fa7..a01ae2bd7e5 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.java
@@ -37,6 +37,8 @@ import org.apache.calcite.tools.RuleSets;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.time.ZoneId;
+
/** Test for {@link PushFilterInCalcIntoTableSourceRuleTest}. */
class PushFilterInCalcIntoTableSourceRuleTest extends
PushFilterIntoTableSourceScanRuleTestBase {
@@ -146,4 +148,27 @@ class PushFilterInCalcIntoTableSourceRuleTest extends
PushFilterIntoTableSourceS
util.tableEnv().executeSql(ddl);
super.testWithInterval();
}
+
+ @Test
+ public void testWithTimestampWithTimeZone() {
+ String ddl =
+ "CREATE TABLE MTable (\n"
+ + "a TIMESTAMP_LTZ(3),\n"
+ + "b TIMESTAMP(3)\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'false',\n"
+ + " 'filterable-fields' = 'a',\n"
+ + " 'disable-lookup' = 'true'"
+ + ")";
+
+ util.tableEnv().executeSql(ddl);
+ ZoneId preZoneId = util.tableEnv().getConfig().getLocalTimeZone();
+
util.tableEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
+ try {
+ super.testWithTimestampWithTimeZone();
+ } finally {
+ util.tableEnv().getConfig().setLocalTimeZone(preZoneId);
+ }
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java
index f4969a6fe88..da21dbc5b33 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.java
@@ -33,6 +33,8 @@ import org.apache.calcite.tools.RuleSets;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.time.ZoneId;
+
/** Test for {@link PushFilterIntoTableSourceScanRule}. */
class PushFilterIntoTableSourceScanRuleTest extends
PushFilterIntoTableSourceScanRuleTestBase {
@@ -151,6 +153,28 @@ class PushFilterIntoTableSourceScanRuleTest extends
PushFilterIntoTableSourceSca
super.testWithInterval();
}
+ @Test
+ public void testWithTimestampWithTimeZone() {
+ String ddl =
+ "CREATE TABLE MTable (\n"
+ + "a TIMESTAMP_LTZ(3),\n"
+ + "b TIMESTAMP(3)\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'bounded' = 'true',\n"
+ + " 'filterable-fields' = 'a',\n"
+ + " 'disable-lookup' = 'true'"
+ + ")";
+ util.tableEnv().executeSql(ddl);
+ ZoneId preZoneId = util.tableEnv().getConfig().getLocalTimeZone();
+
util.tableEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
+ try {
+ super.testWithTimestampWithTimeZone();
+ } finally {
+ util.tableEnv().getConfig().setLocalTimeZone(preZoneId);
+ }
+ }
+
@Test
void testBasicNestedFilter() {
util.verifyRelPlan("SELECT * FROM NestedTable WHERE
deepNested.nested1.`value` > 2");
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTestBase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTestBase.java
index 3debdf2d481..573bd8d6e78 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTestBase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTestBase.java
@@ -129,6 +129,11 @@ abstract class PushFilterIntoTableSourceScanRuleTestBase
extends TableTestBase {
+ "TIMESTAMPADD(YEAR, 2, b) >= a");
}
+ @Test
+ public void testWithTimestampWithTimeZone() {
+ util.verifyRelPlan("SELECT * FROM MTable WHERE a > '2024-05-13
08:00:00'");
+ }
+
@Test
void testCannotPushDownIn() {
// this test is to avoid filter push down rules throwing exceptions
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.xml
index 8160f58a149..ceae952923d 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterInCalcIntoTableSourceRuleTest.xml
@@ -288,6 +288,23 @@ LogicalProject(a=[$0], b=[$1])
<![CDATA[
FlinkLogicalCalc(select=[a, b], where=[OR(>=(+(a, *(3600000:INTERVAL DAY TO
SECOND, 5)), b), >=(+(b, *(12:INTERVAL YEAR TO MONTH, 2)), a))])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database,
MTable, filter=[]]], fields=[a, b])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testWithTimestampWithTimeZone">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MTable WHERE a > '2024-05-13 08:00:00']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($0, CAST(_UTF-16LE'2024-05-13
08:00:00'):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, MTable,
filter=[>(a, 2024-05-13 00:00:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))]]],
fields=[a, b])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.xml
index b40aebe273c..e49909ed7a9 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.xml
@@ -284,6 +284,24 @@ LogicalProject(a=[$0], b=[$1])
LogicalProject(a=[$0], b=[$1])
+- LogicalFilter(condition=[OR(>=(+($0, *(3600000:INTERVAL DAY TO SECOND, 5)),
$1), >=(+($1, *(12:INTERVAL YEAR TO MONTH, 2)), $0))])
+- LogicalTableScan(table=[[default_catalog, default_database, MTable,
source: [filterPushedDown=[true], filter=[]]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testWithTimestampWithTimeZone">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MTable WHERE a > '2024-05-13 08:00:00']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($0, CAST(_UTF-16LE'2024-05-13
08:00:00'):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MTable,
source: [filterPushedDown=[false], filter=[]]]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalTableScan(table=[[default_catalog, default_database, MTable, source:
[filterPushedDown=[true], filter=[greaterThan(a, 2024-05-13T00:00:00Z)]]]])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml
index b27c92d52bf..8a27b0a7d68 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRuleTest.xml
@@ -284,6 +284,24 @@ LogicalProject(a=[$0], b=[$1])
LogicalProject(a=[$0], b=[$1])
+- LogicalFilter(condition=[OR(>=(+($0, *(3600000:INTERVAL DAY TO SECOND, 5)),
$1), >=(+($1, *(12:INTERVAL YEAR TO MONTH, 2)), $0))])
+- LogicalTableScan(table=[[default_catalog, default_database, MTable,
filter=[]]])
+]]>
+ </Resource>
+ </TestCase>
+ <TestCase name="testWithTimestampWithTimeZone">
+ <Resource name="sql">
+ <![CDATA[SELECT * FROM MTable WHERE a > '2024-05-13 08:00:00']]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalFilter(condition=[>($0, CAST(_UTF-16LE'2024-05-13
08:00:00'):TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) NOT NULL)])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MTable]])
+]]>
+ </Resource>
+ <Resource name="optimized rel plan">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1])
++- LogicalTableScan(table=[[default_catalog, default_database, MTable,
filter=[>(a, 2024-05-13 00:00:00:TIMESTAMP_WITH_LOCAL_TIME_ZONE(3))]]])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.scala
index d418dbeeffb..276b15fb08c 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoLegacyTableSourceScanRuleTest.scala
@@ -28,6 +28,8 @@ import org.apache.calcite.rel.rules.CoreRules
import org.apache.calcite.tools.RuleSets
import org.junit.jupiter.api.{BeforeEach, Test}
+import java.time.ZoneId
+
/** Test for [[PushFilterIntoLegacyTableSourceScanRule]]. */
class PushFilterIntoLegacyTableSourceScanRuleTest
extends PushFilterIntoTableSourceScanRuleTestBase {
@@ -111,4 +113,29 @@ class PushFilterIntoLegacyTableSourceScanRuleTest
super.testWithInterval()
}
+
+ @Test
+ override def testWithTimestampWithTimeZone(): Unit = {
+ val schema = TableSchema
+ .builder()
+ .field("a", DataTypes.TIMESTAMP_LTZ(3))
+ .field("b", DataTypes.TIMESTAMP)
+ .build()
+ val data = List(
+ Row.of(localDateTime("2024-05-13 08:00:00"), localDateTime("2024-05-13
08:00:00")))
+ TestLegacyFilterableTableSource.createTemporaryTable(
+ util.tableEnv,
+ schema,
+ "MTable",
+ isBounded = true,
+ data,
+ Set("a", "b"))
+ val preZoneId = util.tableEnv.getConfig.getLocalTimeZone
+ util.tableEnv.getConfig.setLocalTimeZone(ZoneId.of("Asia/Shanghai"))
+ try {
+ super.testWithTimestampWithTimeZone()
+ } finally {
+ util.tableEnv.getConfig.setLocalTimeZone(preZoneId)
+ }
+ }
}
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
index 26c70a34f46..991d7fe3700 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/utils/RexNodeExtractorTest.scala
@@ -45,8 +45,8 @@ import org.junit.jupiter.api.Test
import java.math.BigDecimal
import java.net.URL
-import java.time.ZoneId
-import java.util.{Arrays, List => JList, TimeZone}
+import java.time.{ZoneId, ZoneOffset}
+import java.util.{Arrays, List => JList}
import scala.collection.JavaConverters._
@@ -716,17 +716,12 @@ class RexNodeExtractorTest extends RexNodeTestBase {
val relBuilder: RexBuilder = new FlinkRexBuilder(typeFactory)
- val shanghai = ZoneId.of("Asia/Shanghai")
- val (converted, _) = extractConjunctiveConditions(
- and,
- -1,
- fieldNames,
- relBuilder,
- functionCatalog,
- TimeZone.getTimeZone(shanghai))
+ val (converted, _) =
+ extractConjunctiveConditions(and, -1, fieldNames, relBuilder,
functionCatalog)
val datetime = DateTimeTestUtil.localDateTime("2017-09-10 14:23:01.123456")
- val instant = datetime.toInstant(shanghai.getRules.getOffset(datetime))
+ val instant =
+
datetime.toInstant(ZoneId.of(ZoneOffset.UTC.getId).getRules.getOffset(datetime))
{
val expected = Array[Expression](
@@ -775,16 +770,14 @@ class RexNodeExtractorTest extends RexNodeTestBase {
maxCnfNodeCount: Int,
inputFieldNames: JList[String],
rexBuilder: RexBuilder,
- catalog: FunctionCatalog,
- tz: TimeZone = TimeZone.getDefault): (Array[Expression], Array[RexNode])
= {
+ catalog: FunctionCatalog): (Array[Expression], Array[RexNode]) = {
RexNodeExtractor.extractConjunctiveConditions(
expr,
maxCnfNodeCount,
inputFieldNames,
rexBuilder,
catalog,
- catalogManager,
- tz)
+ catalogManager)
}
}