This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 37a2e0228 [FLINK-36973][cdc-transform] DateFormat function supports
LocalZonedTimestampData type
37a2e0228 is described below
commit 37a2e02280d10e7d9b691fbed51cfd41a5a7579e
Author: yuxiqian <[email protected]>
AuthorDate: Fri Apr 25 15:38:50 2025 +0800
[FLINK-36973][cdc-transform] DateFormat function supports
LocalZonedTimestampData type
This closes #4003
Co-authored-by: hiliuxg <[email protected]>
---
.../flink/FlinkPipelineTransformITCase.java | 113 +++++++++++++++++++++
.../cdc/runtime/functions/SystemFunctionUtils.java | 15 ++-
.../flink/cdc/runtime/parser/JaninoCompiler.java | 6 +-
.../transform/PostTransformOperatorTest.java | 2 +-
.../cdc/runtime/parser/TransformParserTest.java | 3 +-
5 files changed, 134 insertions(+), 5 deletions(-)
diff --git
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
index e1df23a11..08c6da71d 100644
---
a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
+++
b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java
@@ -19,6 +19,8 @@ package org.apache.flink.cdc.composer.flink;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.data.DecimalData;
+import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
+import org.apache.flink.cdc.common.data.TimestampData;
import org.apache.flink.cdc.common.data.binary.BinaryRecordData;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
@@ -65,12 +67,15 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import java.math.BigDecimal;
+import java.sql.Timestamp;
import java.time.Instant;
import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
@@ -1024,6 +1029,110 @@ class FlinkPipelineTransformITCase {
"DataChangeEvent{tableId=default_namespace.default_schema.table1,
before=[class2, 2, , type2], after=[new-class2, 20, new-package2, type2],
op=UPDATE, meta=({timestamp-type=type2})}");
}
+ /** This tests if transform temporal functions works as expected. */
+ @ParameterizedTest
+ @ValueSource(strings = {"America/Los_Angeles", "UTC", "Asia/Shanghai"})
+ void testTransformWithTimestamps(String timezone) throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+ ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);
+
+ TableId myTable = TableId.tableId("default_namespace",
"default_schema", "mytable1");
+ Schema tableSchema =
+ Schema.newBuilder()
+ .physicalColumn("id", DataTypes.INT().notNull())
+ .physicalColumn("ts", DataTypes.TIMESTAMP(6))
+ .physicalColumn("ts_ltz", DataTypes.TIMESTAMP_LTZ(6))
+ .primaryKey("id")
+ .build();
+
+ BinaryRecordDataGenerator generator =
+ new BinaryRecordDataGenerator(
+ tableSchema.getColumnDataTypes().toArray(new
DataType[0]));
+
+ List<Event> events =
+ Arrays.asList(
+ new CreateTableEvent(myTable, tableSchema),
+ DataChangeEvent.insertEvent(
+ myTable,
+ generator.generate(
+ new Object[] {
+ 1,
+ TimestampData.fromTimestamp(
+
Timestamp.valueOf("2023-11-27 20:12:31")),
+
LocalZonedTimestampData.fromInstant(
+ toInstant("2020-07-17
18:00:22", timezone)),
+ })),
+ DataChangeEvent.insertEvent(
+ myTable,
+ generator.generate(
+ new Object[] {
+ 2,
+ TimestampData.fromTimestamp(
+
Timestamp.valueOf("2018-02-01 04:14:01")),
+
LocalZonedTimestampData.fromInstant(
+ toInstant("2019-12-31
21:00:22", timezone)),
+ })),
+ DataChangeEvent.insertEvent(
+ myTable, generator.generate(new Object[] {3,
null, null})));
+
+
ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));
+
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+ pipelineConfig.set(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE, timezone);
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.singletonList(
+ new TransformDef(
+
"default_namespace.default_schema.\\.*",
+ "id, "
+ + "DATE_FORMAT(ts,
'yyyy~MM~dd') AS df1, "
+ + "DATE_FORMAT(ts_ltz,
'yyyy~MM~dd') AS df2, "
+ + "DATE_FORMAT(ts,
'yyyy->MM->dd / HH->mm->ss') AS df3, "
+ + "DATE_FORMAT(ts_ltz,
'yyyy->MM->dd / HH->mm->ss') AS df4, "
+ +
"DATE_FORMAT(TIMESTAMPADD(SECOND, 17, ts), 'yyyy->MM->dd / HH->mm->ss') AS df5,
"
+ +
"DATE_FORMAT(TIMESTAMPADD(SECOND, 17, ts_ltz), 'yyyy->MM->dd / HH->mm->ss') AS
df6",
+ null,
+ null,
+ null,
+ null,
+ null,
+ null)),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check the order and content of all received events
+ String[] outputEvents = outCaptor.toString().trim().split("\n");
+
+ Assertions.assertThat(outputEvents)
+ .containsExactly(
+
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1,
schema=columns={`id` INT NOT NULL,`df1` STRING,`df2` STRING,`df3` STRING,`df4`
STRING,`df5` STRING,`df6` STRING}, primaryKeys=id, options=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[1, 2023~11~27, 2020~07~17, 2023->11->27 / 20->12->31, 2020->07->17 /
18->00->22, 2023->11->27 / 20->12->48, 2020->07->17 / 18->00->39], op=INSERT,
meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[2, 2018~02~01, 2019~12~31, 2018->02->01 / 04->14->01, 2019->12->31 /
21->00->22, 2018->02->01 / 04->14->18, 2019->12->31 / 21->00->39], op=INSERT,
meta=()}",
+
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[],
after=[3, null, null, null, null, null, null], op=INSERT, meta=()}");
+ }
+
void runGenericTransformTest(
ValuesDataSink.SinkApi sinkApi,
List<TransformDef> transformDefs,
@@ -3043,4 +3152,8 @@ class FlinkPipelineTransformITCase {
: e)
.toArray());
}
+
+ private Instant toInstant(String ts, String timezone) {
+ return
Timestamp.valueOf(ts).toLocalDateTime().atZone(ZoneId.of(timezone)).toInstant();
+ }
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
index ac91e7f24..bdf767485 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/functions/SystemFunctionUtils.java
@@ -100,11 +100,24 @@ public class SystemFunctionUtils {
return DateTimeUtils.unixTimestamp(dateTimeStr, format,
TimeZone.getTimeZone(timezone));
}
- public static String dateFormat(TimestampData timestamp, String format) {
+ public static String dateFormat(TimestampData timestamp, String format,
String timezone) {
+ if (timestamp == null) {
+ return null;
+ }
+ // `timezone` is ignored since TimestampData is time-zone insensitive
return DateTimeUtils.formatTimestampMillis(
timestamp.getMillisecond(), format,
TimeZone.getTimeZone("UTC"));
}
+ public static String dateFormat(
+ LocalZonedTimestampData timestamp, String format, String timezone)
{
+ if (timestamp == null) {
+ return null;
+ }
+ return DateTimeUtils.formatTimestampMillis(
+ timestamp.getEpochMillisecond(), format,
TimeZone.getTimeZone(timezone));
+ }
+
public static int toDate(String str, String timezone) {
return toDate(str, "yyyy-MM-dd", timezone);
}
diff --git
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
index c5bcb3139..0fec90d39 100644
---
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
+++
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java
@@ -41,6 +41,7 @@ import org.codehaus.janino.Java;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -66,7 +67,7 @@ public class JaninoCompiler {
"UNIX_TIMESTAMP");
private static final List<String>
TIMEZONE_FREE_TEMPORAL_CONVERSION_FUNCTIONS =
- Arrays.asList("DATE_FORMAT");
+ Collections.emptyList();
private static final List<String>
TIMEZONE_REQUIRED_TEMPORAL_CONVERSION_FUNCTIONS =
Arrays.asList(
@@ -75,7 +76,8 @@ public class JaninoCompiler {
"FROM_UNIXTIME",
"TIMESTAMPADD",
"TIMESTAMPDIFF",
- "TIMESTAMP_DIFF");
+ "TIMESTAMP_DIFF",
+ "DATE_FORMAT");
public static final String DEFAULT_EPOCH_TIME = "__epoch_time__";
public static final String DEFAULT_TIME_ZONE = "__time_zone__";
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
index 0de8ad17d..b3c585ef7 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/transform/PostTransformOperatorTest.java
@@ -1669,7 +1669,7 @@ class PostTransformOperatorTest {
}
@Test
- void testTimestampaddTransform() throws Exception {
+ void testTimestampAddTransform() throws Exception {
PostTransformOperator transform =
PostTransformOperator.newBuilder()
.addTransform(
diff --git
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index dc3db2912..bebb533ba 100644
---
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -231,7 +231,8 @@ class TransformParserTest {
testFilterExpression("QUARTER(dt)", "quarter(dt)");
testFilterExpression("MONTH(dt)", "month(dt)");
testFilterExpression("WEEK(dt)", "week(dt)");
- testFilterExpression("DATE_FORMAT(dt,'yyyy-MM-dd')", "dateFormat(dt,
\"yyyy-MM-dd\")");
+ testFilterExpression(
+ "DATE_FORMAT(dt,'yyyy-MM-dd')", "dateFormat(dt,
\"yyyy-MM-dd\", __time_zone__)");
testFilterExpression(
"TO_DATE(dt, 'yyyy-MM-dd')", "toDate(dt, \"yyyy-MM-dd\",
__time_zone__)");
testFilterExpression("TO_TIMESTAMP(dt)", "toTimestamp(dt,
__time_zone__)");