This is an automated email from the ASF dual-hosted git repository.

shuaixu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 73067f9681 [GLUTEN-10247][FLINK] Support date_format function (#10248)
73067f9681 is described below

commit 73067f9681e1cdc012cdd05206d1e269e077f350
Author: kevinyhzou <[email protected]>
AuthorDate: Thu Oct 9 17:09:34 2025 +0800

    [GLUTEN-10247][FLINK] Support date_format function (#10248)
    
    * Support date_format function
---
 .github/workflows/flink.yml                        |  3 +-
 gluten-flink/docs/Flink.md                         |  2 +-
 .../rexnode/functions/RexCallConverterFactory.java |  1 +
 .../table/runtime/config/VeloxQueryConfig.java     | 52 +++++++++++++++++
 .../runtime/operators/GlutenOneInputOperator.java  |  6 +-
 .../runtime/operators/GlutenSourceFunction.java    |  6 +-
 .../operators/GlutenVectorOneInputOperator.java    |  6 +-
 .../operators/GlutenVectorSourceFunction.java      | 11 +++-
 .../operators/GlutenVectorTwoInputOperator.java    |  8 ++-
 .../apache/gluten/util/LogicalTypeConverter.java   |  3 +
 .../gluten/vectorized/ArrowVectorWriter.java       |  1 -
 .../vectorized/FlinkRowToVLVectorConvertor.java    |  1 -
 .../runtime/stream/custom/ScalarFunctionsTest.java | 68 ++++++++++++++++++++++
 13 files changed, 152 insertions(+), 16 deletions(-)

diff --git a/.github/workflows/flink.yml b/.github/workflows/flink.yml
index c202d31d3d..d6de6118da 100644
--- a/.github/workflows/flink.yml
+++ b/.github/workflows/flink.yml
@@ -58,8 +58,9 @@ jobs:
         run: |
           source /opt/rh/gcc-toolset-11/enable
           sudo dnf install -y patchelf
+          sudo yum install 
https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/Packages/tzdata-2025a-1.el9.noarch.rpm
 -y
           git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
-          cd velox4j && git reset --hard 
0180528e9b98fad22bc9da8a3864d2929ef73eec
+          cd velox4j && git reset --hard 
f389bafb05ebf3563eb3a06ea7574d06720b37e9
           git apply $GITHUB_WORKSPACE/gluten-flink/patches/fix-velox4j.patch
           mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
           cd ..
diff --git a/gluten-flink/docs/Flink.md b/gluten-flink/docs/Flink.md
index 6f98d3e143..92dc6fbc37 100644
--- a/gluten-flink/docs/Flink.md
+++ b/gluten-flink/docs/Flink.md
@@ -48,7 +48,7 @@ As some features have not been committed to upstream, you 
have to use the follow
 ## fetch velox4j code
 git clone -b gluten-0530 https://github.com/bigo-sg/velox4j.git
 cd velox4j
-git reset --hard 0180528e9b98fad22bc9da8a3864d2929ef73eec
+git reset --hard f389bafb05ebf3563eb3a06ea7574d06720b37e9
 mvn clean install -DskipTests -Dgpg.skip -Dspotless.skip=true
 ```
 **Get gluten**
diff --git 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
index 88f532544e..88ed5750c0 100644
--- 
a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
+++ 
b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/functions/RexCallConverterFactory.java
@@ -82,6 +82,7 @@ public class RexCallConverterFactory {
           Map.entry("AND", Arrays.asList(() -> new 
DefaultRexCallConverter("and"))),
           Map.entry("SPLIT_INDEX", Arrays.asList(() -> new 
SplitIndexRexCallConverter())),
           Map.entry("SEARCH", Arrays.asList(() -> new 
DefaultRexCallConverter("in"))),
+          Map.entry("DATE_FORMAT", Arrays.asList(() -> new 
DefaultRexCallConverter("date_format"))),
           Map.entry(
               ">=",
               Arrays.asList(
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java
new file mode 100644
index 0000000000..788ecb4a68
--- /dev/null
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.table.runtime.config;
+
+import io.github.zhztheplayer.velox4j.config.Config;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.table.api.config.TableConfigOptions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class VeloxQueryConfig {
+
+  private static final String keyVeloxAdjustTimestampToSessionTimeZone =
+      "adjust_timestamp_to_session_timezone";
+  private static final String keyVeloxSessionTimezone = "session_timezone";
+
+  public static Config getConfig(RuntimeContext context) {
+    if (!(context instanceof StreamingRuntimeContext)) {
+      return Config.empty();
+    }
+    Configuration config = ((StreamingRuntimeContext) 
context).getJobConfiguration();
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put(keyVeloxAdjustTimestampToSessionTimeZone, "true");
+    String localTimeZone = config.get(TableConfigOptions.LOCAL_TIME_ZONE);
+    // As flink's default timezone value is `default`, it is not a valid 
timezone id, so we should
+    // convert it to `UTC` timezone.
+    if 
(TableConfigOptions.LOCAL_TIME_ZONE.defaultValue().equals(localTimeZone)) {
+      configMap.put(keyVeloxSessionTimezone, "UTC");
+    } else {
+      configMap.put(keyVeloxSessionTimezone, localTimeZone);
+    }
+    return Config.create(configMap);
+  }
+}
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
index b5c73b2b68..d8f6f1a0b2 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOneInputOperator.java
@@ -17,10 +17,10 @@
 package org.apache.gluten.table.runtime.operators;
 
 import org.apache.gluten.streaming.api.operators.GlutenOperator;
+import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
 import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor;
 
 import io.github.zhztheplayer.velox4j.Velox4j;
-import io.github.zhztheplayer.velox4j.config.Config;
 import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
 import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit;
 import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle;
@@ -99,7 +99,9 @@ public class GlutenOneInputOperator extends 
TableStreamOperator<RowData>
     mockInput.addTarget(glutenPlan);
     LOG.debug("Gluten Plan: {}", Serde.toJson(mockInput));
     LOG.debug("OutTypes: {}", outputTypes.keySet());
-    query = new Query(mockInput, Config.empty(), ConnectorConfig.empty());
+    query =
+        new Query(
+            mockInput, VeloxQueryConfig.getConfig(getRuntimeContext()), 
ConnectorConfig.empty());
     allocator = new RootAllocator(Long.MAX_VALUE);
     task = session.queryOps().execute(query);
     ExternalStreamConnectorSplit split =
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
index ddcd5cae94..360eb39a84 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenSourceFunction.java
@@ -16,10 +16,10 @@
  */
 package org.apache.gluten.table.runtime.operators;
 
+import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
 import org.apache.gluten.vectorized.FlinkRowToVLVectorConvertor;
 
 import io.github.zhztheplayer.velox4j.Velox4j;
-import io.github.zhztheplayer.velox4j.config.Config;
 import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
 import io.github.zhztheplayer.velox4j.connector.ConnectorSplit;
 import io.github.zhztheplayer.velox4j.data.RowVector;
@@ -92,7 +92,9 @@ public class GlutenSourceFunction extends 
RichParallelSourceFunction<RowData> {
     LOG.debug("Running GlutenSourceFunction: " + Serde.toJson(planNode));
     memoryManager = MemoryManager.create(AllocationListener.NOOP);
     session = Velox4j.newSession(memoryManager);
-    query = new Query(planNode, Config.empty(), ConnectorConfig.empty());
+    query =
+        new Query(
+            planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), 
ConnectorConfig.empty());
     allocator = new RootAllocator(Long.MAX_VALUE);
 
     SerialTask task = session.queryOps().execute(query);
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
index 5319c61b79..e859e15ca3 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorOneInputOperator.java
@@ -17,9 +17,9 @@
 package org.apache.gluten.table.runtime.operators;
 
 import org.apache.gluten.streaming.api.operators.GlutenOperator;
+import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
 
 import io.github.zhztheplayer.velox4j.Velox4j;
-import io.github.zhztheplayer.velox4j.config.Config;
 import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
 import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit;
 import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle;
@@ -94,7 +94,9 @@ public class GlutenVectorOneInputOperator extends 
TableStreamOperator<StatefulRe
     mockInput.addTarget(glutenPlan);
     LOG.debug("Gluten Plan: {}", Serde.toJson(mockInput));
     LOG.debug("OutTypes: {}", outputTypes.keySet());
-    query = new Query(mockInput, Config.empty(), ConnectorConfig.empty());
+    query =
+        new Query(
+            mockInput, VeloxQueryConfig.getConfig(getRuntimeContext()), 
ConnectorConfig.empty());
     task = session.queryOps().execute(query);
   }
 
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
index a4d6745163..399211ec1b 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorSourceFunction.java
@@ -16,8 +16,9 @@
  */
 package org.apache.gluten.table.runtime.operators;
 
+import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
+
 import io.github.zhztheplayer.velox4j.Velox4j;
-import io.github.zhztheplayer.velox4j.config.Config;
 import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
 import io.github.zhztheplayer.velox4j.connector.ConnectorSplit;
 import io.github.zhztheplayer.velox4j.iterator.UpIterator;
@@ -98,7 +99,9 @@ public class GlutenVectorSourceFunction extends 
RichParallelSourceFunction<State
     if (memoryManager == null) {
       memoryManager = MemoryManager.create(AllocationListener.NOOP);
       session = Velox4j.newSession(memoryManager);
-      query = new Query(planNode, Config.empty(), ConnectorConfig.empty());
+      query =
+          new Query(
+              planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), 
ConnectorConfig.empty());
       allocator = new RootAllocator(Long.MAX_VALUE);
 
       task = session.queryOps().execute(query);
@@ -150,7 +153,9 @@ public class GlutenVectorSourceFunction extends 
RichParallelSourceFunction<State
       LOG.debug("Running GlutenSourceFunction: " + Serde.toJson(planNode));
       memoryManager = MemoryManager.create(AllocationListener.NOOP);
       session = Velox4j.newSession(memoryManager);
-      query = new Query(planNode, Config.empty(), ConnectorConfig.empty());
+      query =
+          new Query(
+              planNode, VeloxQueryConfig.getConfig(getRuntimeContext()), 
ConnectorConfig.empty());
       allocator = new RootAllocator(Long.MAX_VALUE);
 
       task = session.queryOps().execute(query);
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
index 04ed555a3b..a5c4da5f0c 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenVectorTwoInputOperator.java
@@ -17,9 +17,9 @@
 package org.apache.gluten.table.runtime.operators;
 
 import org.apache.gluten.streaming.api.operators.GlutenOperator;
+import org.apache.gluten.table.runtime.config.VeloxQueryConfig;
 
 import io.github.zhztheplayer.velox4j.Velox4j;
-import io.github.zhztheplayer.velox4j.config.Config;
 import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
 import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit;
 import io.github.zhztheplayer.velox4j.connector.ExternalStreams;
@@ -94,11 +94,13 @@ public class GlutenVectorTwoInputOperator extends 
AbstractStreamOperator<Statefu
   private void initGlutenTask() {
     memoryManager = MemoryManager.create(AllocationListener.NOOP);
     session = Velox4j.newSession(memoryManager);
-    query = new Query(glutenPlan, Config.empty(), ConnectorConfig.empty());
+    query =
+        new Query(
+            glutenPlan, VeloxQueryConfig.getConfig(getRuntimeContext()), 
ConnectorConfig.empty());
     task = session.queryOps().execute(query);
     LOG.debug("Gluten Plan: {}", Serde.toJson(glutenPlan));
     LOG.debug("OutTypes: {}", outputTypes.keySet());
-    LOG.debug("RuntimeContex: {}", getRuntimeContext().getClass().getName());
+    LOG.debug("RuntimeContext: {}", getRuntimeContext().getClass().getName());
   }
 
   @Override
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
index e8cbc00eb5..8d684fab52 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java
@@ -107,6 +107,9 @@ public class LogicalTypeConverter {
                 Type valueType = toVLType(mapType.getValueType());
                 return 
io.github.zhztheplayer.velox4j.type.MapType.create(keyType, valueType);
               }),
+          // Map the flink's `TimestampLTZ` type to velox `Timestamp` type. 
And the timezone would
+          // be specified by using flink's table config `LOCAL_TIME_ZONE`, 
which would be passed to
+          // velox's `session_timezone` config.
           // TODO: may need precision
           Map.entry(
               LocalZonedTimestampType.class,
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorWriter.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorWriter.java
index d8a50081f8..d96f48ecbe 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorWriter.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/ArrowVectorWriter.java
@@ -441,7 +441,6 @@ class DateDayVectorWriter extends 
BaseVectorWriter<DateDayVector, Integer> {
 
 class StructVectorWriter extends BaseVectorWriter<StructVector, RowData> {
   private final int fieldCount;
-  private BufferAllocator allocator;
   private final List<ArrowVectorWriter> fieldWriters;
 
   public StructVectorWriter(Type fieldType, BufferAllocator allocator, 
FieldVector vector) {
diff --git 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
index e839806137..3abb06c511 100644
--- 
a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
+++ 
b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLVectorConvertor.java
@@ -41,7 +41,6 @@ public class FlinkRowToVLVectorConvertor {
     List<Type> fieldTypes = rowType.getChildren();
     List<String> fieldNames = rowType.getNames();
     for (int i = 0; i < rowType.size(); i++) {
-      Type fieldType = rowType.getChildren().get(i);
       ArrowVectorWriter writer =
           ArrowVectorWriter.create(fieldNames.get(i), fieldTypes.get(i), 
allocator);
       writer.write(i, row);
diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
index ff99550b81..51f1b42e85 100644
--- 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
@@ -25,6 +25,10 @@ import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
 import java.util.Arrays;
 import java.util.List;
 
@@ -226,4 +230,68 @@ class ScalarFunctionsTest extends GlutenStreamingTestBase {
     query = "select b + e as x from tblDecimal where a > 0";
     runAndCheck(query, Arrays.asList("+I[2.0]", "+I[5.0]", "+I[7.0]"));
   }
+
+  @Test
+  void testDateFormat() {
+    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss");
+    List<Row> rows =
+        Arrays.asList(
+            Row.of(1, LocalDateTime.parse("2024-12-31 12:12:12", formatter)),
+            Row.of(2, LocalDateTime.parse("2025-02-28 12:12:12", formatter)));
+    createSimpleBoundedValuesTable("timestampTable", "a int, b Timestamp(3)", 
rows);
+    String query =
+        "select a, DATE_FORMAT(b, 'yyyy-MM-dd'), DATE_FORMAT(b, 'yyyy-MM-dd 
HH:mm:ss') from timestampTable";
+    runAndCheck(
+        query,
+        Arrays.asList(
+            "+I[1, 2024-12-31, 2024-12-31 12:12:12]", "+I[2, 2025-02-28, 
2025-02-28 12:12:12]"));
+    tEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
+    runAndCheck(
+        query,
+        Arrays.asList(
+            "+I[1, 2024-12-31, 2024-12-31 12:12:12]", "+I[2, 2025-02-28, 
2025-02-28 12:12:12]"));
+
+    rows =
+        Arrays.asList(
+            Row.of(
+                1, LocalDateTime.parse("2024-12-31 12:12:12", 
formatter).toInstant(ZoneOffset.UTC)),
+            Row.of(
+                2,
+                LocalDateTime.parse("2025-02-28 12:12:12", 
formatter).toInstant(ZoneOffset.UTC)));
+    createSimpleBoundedValuesTable("timestampLtzTable", "a int, b 
Timestamp_LTZ(3)", rows);
+    query =
+        "select a, DATE_FORMAT(b, 'yyyy-MM-dd'), DATE_FORMAT(b, 'yyyy-MM-dd 
HH:mm:ss') from timestampLtzTable";
+    tEnv().getConfig().setLocalTimeZone(ZoneId.of("America/Los_Angeles"));
+    runAndCheck(
+        query,
+        Arrays.asList(
+            "+I[1, 2024-12-31, 2024-12-31 04:12:12]", "+I[2, 2025-02-28, 
2025-02-28 04:12:12]"));
+
+    tEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
+    runAndCheck(
+        query,
+        Arrays.asList(
+            "+I[1, 2024-12-31, 2024-12-31 20:12:12]", "+I[2, 2025-02-28, 
2025-02-28 20:12:12]"));
+
+    rows =
+        Arrays.asList(
+            Row.of(
+                1,
+                LocalDateTime.parse("2024-12-31 12:12:12", formatter),
+                LocalDateTime.parse("2024-12-31 12:12:12", 
formatter).toInstant(ZoneOffset.UTC)),
+            Row.of(
+                2,
+                LocalDateTime.parse("2025-02-28 12:12:12", formatter),
+                LocalDateTime.parse("2024-02-28 12:12:12", 
formatter).toInstant(ZoneOffset.UTC)));
+    createSimpleBoundedValuesTable(
+        "timestampTable0", "a int, b Timestamp(3), c Timestamp_LTZ(3)", rows);
+    query =
+        "select a, DATE_FORMAT(b, 'yyyy-MM-dd HH:mm:ss'), DATE_FORMAT(c, 
'yyyy-MM-dd HH:mm:ss') from timestampTable0";
+    tEnv().getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));
+    runAndCheck(
+        query,
+        Arrays.asList(
+            "+I[1, 2024-12-31 12:12:12, 2024-12-31 20:12:12]",
+            "+I[2, 2025-02-28 12:12:12, 2024-02-28 20:12:12]"));
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to