This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 37a2e0228 [FLINK-36973][cdc-transform] DateFormat function supports 
LocalZonedTimestampData type
37a2e0228 is described below

commit 37a2e02280d10e7d9b691fbed51cfd41a5a7579e
Author: yuxiqian <[email protected]>
AuthorDate: Fri Apr 25 15:38:50 2025 +0800

    [FLINK-36973][cdc-transform] DateFormat function supports 
LocalZonedTimestampData type
    
    This closes  #4003
    
    Co-authored-by: hiliuxg <[email protected]>
---
 .../flink/FlinkPipelineTransformITCase.java        | 113 +++++++++++++++++++++
 .../cdc/runtime/functions/SystemFunctionUtils.java |  15 ++-
 .../flink/cdc/runtime/parser/JaninoCompiler.java   |   6 +-
 .../transform/PostTransformOperatorTest.java       |   2 +-
 .../cdc/runtime/parser/TransformParserTest.java    |   3 +-
 5 files changed, 134 insertions(+), 5 deletions(-)

diff --git 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index e1df23a11..08c6da71d 100644
--- 
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++ 
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -19,6 +19,8 @@ package org.apache.flink.cdc.composer.flink;
 
 import org.apache.flink.cdc.common.configuration.Configuration;
 import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.TimestampData;
 import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.event.AddColumnEvent;
@@ -65,12 +67,15 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.math.BigDecimal;
+import java.sql.Timestamp;
 import java.time.Instant;
 import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.time.ZoneOffset;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
@@ -1024,6 +1029,110 @@ class FlinkPipelineTransformITCase {
                         
"DataChangeEvent{tableId=default_namespace.default_schema.table1, 
before=[class2, 2, , type2], after=[new-class2, 20, new-package2, type2], 
op=UPDATE, meta=({timestamp-type=type2})}");
     }
 
+    /** This tests if transform temporal functions works as expected. */
+    @ParameterizedTest
+    @ValueSource(strings = {"America/Los_Angeles", "UTC", "Asia/Shanghai"})
+    void testTransformWithTimestamps(String timezone) throws Exception {
+        FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+        // Setup value source
+        Configuration sourceConfig = new Configuration();
+        sourceConfig.set(
+                ValuesDataSourceOptions.EVENT_SET_ID,
+                ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+        TableId myTable = TableId.tableId("default_namespace", 
"default_schema", "mytable1");
+        Schema tableSchema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.INT().notNull())
+                        .physicalColumn("ts", DataTypes.TIMESTAMP(6))
+                        .physicalColumn("ts_ltz", DataTypes.TIMESTAMP_LTZ(6))
+                        .primaryKey("id")
+                        .build();
+
+        BinaryRecordDataGenerator generator =
+                new BinaryRecordDataGenerator(
+                        tableSchema.getColumnDataTypes().toArray(new 
DataType[0]));
+
+        List<Event> events =
+                Arrays.asList(
+                        new CreateTableEvent(myTable, tableSchema),
+                        DataChangeEvent.insertEvent(
+                                myTable,
+                                generator.generate(
+                                        new Object[] {
+                                            1,
+                                            TimestampData.fromTimestamp(
+                                                    
Timestamp.valueOf("2023-11-27 20:12:31")),
+                                            
LocalZonedTimestampData.fromInstant(
+                                                    toInstant("2020-07-17 
18:00:22", timezone)),
+                                        })),
+                        DataChangeEvent.insertEvent(
+                                myTable,
+                                generator.generate(
+                                        new Object[] {
+                                            2,
+                                            TimestampData.fromTimestamp(
+                                                    
Timestamp.valueOf("2018-02-01 04:14:01")),
+                                            
LocalZonedTimestampData.fromInstant(
+                                                    toInstant("2019-12-31 
21:00:22", timezone)),
+                                        })),
+                        DataChangeEvent.insertEvent(
+                                myTable, generator.generate(new Object[] {3, 
null, null})));
+
+        
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+        SourceDef sourceDef =
+                new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", 
sourceConfig);
+
+        // Setup value sink
+        Configuration sinkConfig = new Configuration();
+        sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+        SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value 
Sink", sinkConfig);
+
+        // Setup pipeline
+        Configuration pipelineConfig = new Configuration();
+        pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+        pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, timezone);
+        PipelineDef pipelineDef =
+                new PipelineDef(
+                        sourceDef,
+                        sinkDef,
+                        Collections.emptyList(),
+                        Collections.singletonList(
+                                new TransformDef(
+                                        
"default_namespace.default_schema.\\.*",
+                                        "id, "
+                                                + "DATE_FORMAT(ts, 
'yyyy~MM~dd') AS df1, "
+                                                + "DATE_FORMAT(ts_ltz, 
'yyyy~MM~dd') AS df2, "
+                                                + "DATE_FORMAT(ts, 
'yyyy->MM->dd / HH->mm->ss') AS df3, "
+                                                + "DATE_FORMAT(ts_ltz, 
'yyyy->MM->dd / HH->mm->ss') AS df4, "
+                                                + 
"DATE_FORMAT(TIMESTAMPADD(SECOND, 17, ts), 'yyyy->MM->dd / HH->mm->ss') AS df5, 
"
+                                                + 
"DATE_FORMAT(TIMESTAMPADD(SECOND, 17, ts_ltz), 'yyyy->MM->dd / HH->mm->ss') AS 
df6",
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null,
+                                        null)),
+                        Collections.emptyList(),
+                        pipelineConfig);
+
+        // Execute the pipeline
+        PipelineExecution execution = composer.compose(pipelineDef);
+        execution.execute();
+
+        // Check the order and content of all received events
+        String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+        Assertions.assertThat(outputEvents)
+                .containsExactly(
+                        
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, 
schema=columns={`id` INT NOT NULL,`df1` STRING,`df2` STRING,`df3` STRING,`df4` 
STRING,`df5` STRING,`df6` STRING}, primaryKeys=id, options=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[1, 2023~11~27, 2020~07~17, 2023->11->27 / 20->12->31, 2020->07->17 / 
18->00->22, 2023->11->27 / 20->12->48, 2020->07->17 / 18->00->39], op=INSERT, 
meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[2, 2018~02~01, 2019~12~31, 2018->02->01 / 04->14->01, 2019->12->31 / 
21->00->22, 2018->02->01 / 04->14->18, 2019->12->31 / 21->00->39], op=INSERT, 
meta=()}",
+                        
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], 
after=[3, null, null, null, null, null, null], op=INSERT, meta=()}");
+    }
+
     void runGenericTransformTest(
             ValuesDataSink.SinkApi sinkApi,
             List<TransformDef> transformDefs,
@@ -3043,4 +3152,8 @@ class FlinkPipelineTransformITCase {
                                                         : e)
                                 .toArray());
     }
+
+    private Instant toInstant(String ts, String timezone) {
+        return 
Timestamp.valueOf(ts).toLocalDateTime().atZone(ZoneId.of(timezone)).toInstant();
+    }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
index ac91e7f24..bdf767485 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
@@ -100,11 +100,24 @@ public class SystemFunctionUtils {
         return DateTimeUtils.unixTimestamp(dateTimeStr, format, 
TimeZone.getTimeZone(timezone));
     }
 
-    public static String dateFormat(TimestampData timestamp, String format) {
+    public static String dateFormat(TimestampData timestamp, String format, 
String timezone) {
+        if (timestamp == null) {
+            return null;
+        }
+        // `timezone` is ignored since TimestampData is time-zone insensitive
         return DateTimeUtils.formatTimestampMillis(
                 timestamp.getMillisecond(), format, 
TimeZone.getTimeZone("UTC"));
     }
 
+    public static String dateFormat(
+            LocalZonedTimestampData timestamp, String format, String timezone) 
{
+        if (timestamp == null) {
+            return null;
+        }
+        return DateTimeUtils.formatTimestampMillis(
+                timestamp.getEpochMillisecond(), format, 
TimeZone.getTimeZone(timezone));
+    }
+
     public static int toDate(String str, String timezone) {
         return toDate(str, "yyyy-MM-dd", timezone);
     }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index c5bcb3139..0fec90d39 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -41,6 +41,7 @@ import org.codehaus.janino.Java;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -66,7 +67,7 @@ public class JaninoCompiler {
                     "UNIX_TIMESTAMP");
 
     private static final List<String> 
TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS =
-            Arrays.asList("DATE_FORMAT");
+            Collections.emptyList();
 
     private static final List<String> 
TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS =
             Arrays.asList(
@@ -75,7 +76,8 @@ public class JaninoCompiler {
                     "FROM_UNIXTIME",
                     "TIMESTAMPADD",
                     "TIMESTAMPDIFF",
-                    "TIMESTAMP_DIFF");
+                    "TIMESTAMP_DIFF",
+                    "DATE_FORMAT");
 
     public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
     public static final String DEFAULT_TIME_ZONE = "__time_zone__";
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index 0de8ad17d..b3c585ef7 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -1669,7 +1669,7 @@ class PostTransformOperatorTest {
     }
 
     @Test
-    void testTimestampaddTransform() throws Exception {
+    void testTimestampAddTransform() throws Exception {
         PostTransformOperator transform =
                 PostTransformOperator.newBuilder()
                         .addTransform(
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index dc3db2912..bebb533ba 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -231,7 +231,8 @@ class TransformParserTest {
         testFilterExpression("QUARTER(dt)", "quarter(dt)");
         testFilterExpression("MONTH(dt)", "month(dt)");
         testFilterExpression("WEEK(dt)", "week(dt)");
-        testFilterExpression("DATE_FORMAT(dt,'yyyy-MM-dd')", "dateFormat(dt, 
\"yyyy-MM-dd\")");
+        testFilterExpression(
+                "DATE_FORMAT(dt,'yyyy-MM-dd')", "dateFormat(dt, 
\"yyyy-MM-dd\", __time_zone__)");
         testFilterExpression(
                 "TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt, \"yyyy-MM-dd\", 
__time_zone__)");
         testFilterExpression("TO_TIMESTAMP(dt)", "toTimestamp(dt, 
__time_zone__)");

Reply via email to