This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 384674c9aa [cdc] Support computed expression of now (#5138)
384674c9aa is described below
commit 384674c9aa940bd5dc886297a3a146db11c49c8d
Author: JackeyLee007 <[email protected]>
AuthorDate: Sat Mar 15 19:22:14 2025 +0800
[cdc] Support computed expression of now (#5138)
---
.../apache/paimon/flink/action/cdc/Expression.java | 21 ++++-
.../flink/action/cdc/SyncDatabaseActionBase.java | 9 ++-
.../action/cdc/SyncDatabaseActionFactoryBase.java | 7 ++
.../flink/action/cdc/CdcActionITCaseBase.java | 36 ++++++++-
.../format/aliyun/AliyunJsonRecordParserTest.java | 66 +++++++++++++--
.../kafka/KafkaCanalSyncDatabaseActionITCase.java | 94 ++++++++++++++++++++++
.../canal/database/audit-time/canal-data-1.txt | 19 +++++
.../canal/database/audit-time/canal-data-2.txt | 19 +++++
.../canal/database/audit-time/canal-data-3.txt | 19 +++++
.../paimon/flink/action/ActionITCaseBase.java | 19 +++++
10 files changed, 300 insertions(+), 9 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
index 3290ec1829..50bd57da36 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/Expression.java
@@ -141,7 +141,8 @@ public interface Expression extends Serializable {
referencedField.fieldType(),
referencedField.literals());
}),
- CAST((typeMapping, caseSensitive, args) -> cast(args));
+ CAST((typeMapping, caseSensitive, args) -> cast(args)),
+ NOW((typeMapping, caseSensitive, args) -> new NowExpression());
public final ExpressionCreator creator;
@@ -608,4 +609,22 @@ public interface Expression extends Serializable {
return value;
}
}
+
+ /** Get current timestamp. */
+ final class NowExpression implements Expression {
+ @Override
+ public String fieldReference() {
+ return null;
+ }
+
+ @Override
+ public DataType outputType() {
+ return DataTypes.TIMESTAMP(3);
+ }
+
+ @Override
+ public String eval(String input) {
+ return DateTimeUtils.formatLocalDateTime(LocalDateTime.now(), 3);
+ }
+ }
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index 267d682ca6..4ce4e7c250 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -45,6 +45,7 @@ import java.util.Set;
import java.util.regex.Pattern;
import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
+import static
org.apache.paimon.flink.action.cdc.ComputedColumnUtils.buildComputedColumns;
/** Base {@link Action} for synchronizing into one Paimon database. */
public abstract class SyncDatabaseActionBase extends SynchronizationActionBase
{
@@ -60,6 +61,7 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
protected String includingTables = ".*";
protected List<String> partitionKeys = new ArrayList<>();
protected List<String> primaryKeys = new ArrayList<>();
+ protected List<ComputedColumn> computedColumns = new ArrayList<>();
@Nullable protected String excludingTables;
protected String includingDbs = ".*";
@Nullable protected String excludingDbs;
@@ -172,10 +174,15 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
return this;
}
+ public SyncDatabaseActionBase withComputedColumnArgs(List<String>
computedColumnArgs) {
+ this.computedColumns = buildComputedColumns(computedColumnArgs,
Collections.emptyList());
+ return this;
+ }
+
@Override
protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord>
recordParse() {
return syncJobHandler.provideRecordParser(
- Collections.emptyList(), typeMapping, metadataConverters);
+ this.computedColumns, typeMapping, metadataConverters);
}
public SyncDatabaseActionBase withPartitionKeyMultiple(
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
index 52bfb7271c..8d0b8b9cef 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java
@@ -22,8 +22,10 @@ import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.ActionFactory;
import org.apache.paimon.flink.action.MultipleParameterToolAdapter;
+import java.util.ArrayList;
import java.util.Optional;
+import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EAGER_INIT;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_DBS;
import static
org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.EXCLUDING_TABLES;
@@ -79,5 +81,10 @@ public abstract class SyncDatabaseActionFactoryBase<T
extends SyncDatabaseAction
String[] options = params.get(TYPE_MAPPING).split(",");
action.withTypeMapping(TypeMapping.parse(options));
}
+
+ if (params.has(COMPUTED_COLUMN)) {
+ action.withComputedColumnArgs(
+ new
ArrayList<>(params.getMultiParameter(COMPUTED_COLUMN)));
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
index 00a8b23617..855623b1af 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/CdcActionITCaseBase.java
@@ -130,6 +130,16 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
protected void waitForResult(
List<String> expected, FileStoreTable table, RowType rowType,
List<String> primaryKeys)
throws Exception {
+ waitForResult(false, expected, table, rowType, primaryKeys);
+ }
+
+ protected void waitForResult(
+ boolean withRegx,
+ List<String> expected,
+ FileStoreTable table,
+ RowType rowType,
+ List<String> primaryKeys)
+ throws Exception {
assertThat(table.schema().primaryKeys()).isEqualTo(primaryKeys);
// wait for table schema to become our expected schema
@@ -165,7 +175,8 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
rowType);
List<String> sortedActual = new ArrayList<>(result);
Collections.sort(sortedActual);
- if (sortedExpected.equals(sortedActual)) {
+ if (withRegx && isRegxMatchList(sortedActual, sortedExpected)
+ || sortedExpected.equals(sortedActual)) {
break;
}
LOG.info("actual: " + sortedActual);
@@ -174,6 +185,20 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
}
}
+ private boolean isRegxMatchList(List<String> actual, List<String>
expected) {
+ if (actual.size() != expected.size()) {
+ return false;
+ }
+
+ for (int i = 0; i < actual.size(); i++) {
+ if (!actual.get(i).matches(expected.get(i))) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
protected Map<String, String> getBasicTableConfig() {
Map<String, String> config = new HashMap<>();
ThreadLocalRandom random = ThreadLocalRandom.current();
@@ -392,6 +417,7 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
private final List<String> partitionKeys = new ArrayList<>();
private final List<String> primaryKeys = new ArrayList<>();
private final List<String> metadataColumn = new ArrayList<>();
+ private final List<String> computedColumnArgs = new ArrayList<>();
protected Map<String, String> partitionKeyMultiple = new HashMap<>();
public SyncDatabaseActionBuilder(Class<T> clazz, Map<String, String>
sourceConfig) {
@@ -464,6 +490,12 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
return this;
}
+ public SyncDatabaseActionBuilder<T> withComputedColumnArgs(
+ List<String> computedColumnArgs) {
+ this.computedColumnArgs.addAll(computedColumnArgs);
+ return this;
+ }
+
public SyncDatabaseActionBuilder<T> withPartitionKeyMultiple(
Map<String, String> partitionKeyMultiple) {
if (partitionKeyMultiple != null) {
@@ -500,6 +532,8 @@ public class CdcActionITCaseBase extends ActionITCaseBase {
args.addAll(listToArgs("--primary-keys", primaryKeys));
args.addAll(listToArgs("--metadata-column", metadataColumn));
+ args.addAll(listToMultiArgs("--computed-column",
computedColumnArgs));
+
return createAction(clazz, args);
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java
index f06268d700..6dea3ee547 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/format/aliyun/AliyunJsonRecordParserTest.java
@@ -18,7 +18,10 @@
package org.apache.paimon.flink.action.cdc.format.aliyun;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
+import org.apache.paimon.flink.action.cdc.ComputedColumn;
+import org.apache.paimon.flink.action.cdc.ComputedColumnUtils;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
import
org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
@@ -26,6 +29,7 @@ import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.utils.BinaryStringUtils;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -43,6 +47,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
/** Test for AliyunJsonRecordParser. */
public class AliyunJsonRecordParserTest extends KafkaActionITCaseBase {
@@ -51,14 +56,20 @@ public class AliyunJsonRecordParserTest extends
KafkaActionITCaseBase {
private static List<String> insertList = new ArrayList<>();
private static List<String> updateList = new ArrayList<>();
private static List<String> deleteList = new ArrayList<>();
+ private static List<ComputedColumn> computedColumns = new ArrayList<>();
private static ObjectMapper objMapper = new ObjectMapper();
+ String dateTimeRegex =
"\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}";
+
@Before
public void setup() {
String insertRes = "kafka/aliyun/table/event/event-insert.txt";
String updateRes = "kafka/aliyun/table/event/event-update-in-one.txt";
String deleteRes = "kafka/aliyun/table/event/event-delete.txt";
+
+ String[] computedColumnArgs = {"etl_create_time=now()",
"etl_update_time=now()"};
+
URL url;
try {
url =
AliyunJsonRecordParserTest.class.getClassLoader().getResource(insertRes);
@@ -76,6 +87,10 @@ public class AliyunJsonRecordParserTest extends
KafkaActionITCaseBase {
.filter(this::isRecordLine)
.forEach(e -> deleteList.add(e));
+ computedColumns =
+ ComputedColumnUtils.buildComputedColumns(
+ Arrays.asList(computedColumnArgs),
Collections.emptyList());
+
} catch (Exception e) {
log.error("Fail to init aliyun-json cases", e);
}
@@ -83,10 +98,11 @@ public class AliyunJsonRecordParserTest extends
KafkaActionITCaseBase {
@Test
public void extractInsertRecord() throws Exception {
+
AliyunRecordParser parser =
- new AliyunRecordParser(TypeMapping.defaultMapping(),
Collections.emptyList());
+ new AliyunRecordParser(TypeMapping.defaultMapping(),
computedColumns);
for (String json : insertList) {
- // 将json解析为JsonNode对象
+
JsonNode rootNode = objMapper.readValue(json, JsonNode.class);
CdcSourceRecord cdcRecord = new CdcSourceRecord(rootNode);
Schema schema = parser.buildSchema(cdcRecord);
@@ -106,15 +122,31 @@ public class AliyunJsonRecordParserTest extends
KafkaActionITCaseBase {
MessageQueueCdcTimestampExtractor extractor = new
MessageQueueCdcTimestampExtractor();
Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+
+ Map<String, String> data =
records.get(0).toRichCdcRecord().toCdcRecord().data();
+ String createTime = data.get("etl_create_time");
+ String updateTime = data.get("etl_update_time");
+
+ // Mock the real timestamp string which retrieved from store and
convert through paimon
+ // Timestamp
+ createTime =
+
BinaryStringUtils.toTimestamp(BinaryString.fromString(createTime), 6)
+ .toString();
+ updateTime =
+
BinaryStringUtils.toTimestamp(BinaryString.fromString(updateTime), 6)
+ .toString();
+
+ Assert.assertTrue(createTime.matches(dateTimeRegex));
+ Assert.assertTrue(updateTime.matches(dateTimeRegex));
}
}
@Test
public void extractUpdateRecord() throws Exception {
AliyunRecordParser parser =
- new AliyunRecordParser(TypeMapping.defaultMapping(),
Collections.emptyList());
+ new AliyunRecordParser(TypeMapping.defaultMapping(),
computedColumns);
for (String json : updateList) {
- // 将json解析为JsonNode对象
+
JsonNode jsonNode = objMapper.readValue(json, JsonNode.class);
CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode);
Schema schema = parser.buildSchema(cdcRecord);
@@ -134,15 +166,26 @@ public class AliyunJsonRecordParserTest extends
KafkaActionITCaseBase {
MessageQueueCdcTimestampExtractor extractor = new
MessageQueueCdcTimestampExtractor();
Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+
+ Map<String, String> data =
records.get(0).toRichCdcRecord().toCdcRecord().data();
+ String createTime = data.get("etl_create_time");
+ String updateTime = data.get("etl_update_time");
+ Assert.assertNotNull(createTime);
+
+ updateTime =
+
BinaryStringUtils.toTimestamp(BinaryString.fromString(updateTime), 6)
+ .toString();
+
+ Assert.assertTrue(updateTime.matches(dateTimeRegex));
}
}
@Test
public void extractDeleteRecord() throws Exception {
AliyunRecordParser parser =
- new AliyunRecordParser(TypeMapping.defaultMapping(),
Collections.emptyList());
+ new AliyunRecordParser(TypeMapping.defaultMapping(),
computedColumns);
for (String json : deleteList) {
- // 将json解析为JsonNode对象
+
JsonNode jsonNode = objMapper.readValue(json, JsonNode.class);
CdcSourceRecord cdcRecord = new CdcSourceRecord(jsonNode);
Schema schema = parser.buildSchema(cdcRecord);
@@ -162,6 +205,17 @@ public class AliyunJsonRecordParserTest extends
KafkaActionITCaseBase {
MessageQueueCdcTimestampExtractor extractor = new
MessageQueueCdcTimestampExtractor();
Assert.assertTrue(extractor.extractTimestamp(cdcRecord) > 0);
+
+ Map<String, String> data =
records.get(0).toRichCdcRecord().toCdcRecord().data();
+ String createTime = data.get("etl_create_time");
+ String updateTime = data.get("etl_update_time");
+ Assert.assertNotNull(createTime);
+
+ updateTime =
+
BinaryStringUtils.toTimestamp(BinaryString.fromString(updateTime), 6)
+ .toString();
+
+ Assert.assertTrue(updateTime.matches(dateTimeRegex));
}
}
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
index 6e37c589ac..60aa70c34b 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/kafka/KafkaCanalSyncDatabaseActionITCase.java
@@ -18,6 +18,8 @@
package org.apache.paimon.flink.action.cdc.kafka;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
@@ -643,6 +645,98 @@ public class KafkaCanalSyncDatabaseActionITCase extends
KafkaActionITCaseBase {
Collections.singletonList("k"));
}
+ @Test
+ @Timeout(120)
+ public void testExpressionNow() throws Exception {
+ final String topic = "expression-now";
+ createTestTopic(topic, 1, 1);
+ writeRecordsToKafka(topic,
"kafka/canal/database/audit-time/canal-data-1.txt");
+
+ Map<String, String> kafkaConfig = getBasicKafkaConfig();
+ kafkaConfig.put(VALUE_FORMAT.key(), "canal-json");
+ kafkaConfig.put(TOPIC.key(), topic);
+
+ KafkaSyncDatabaseAction action =
+ syncDatabaseActionBuilder(kafkaConfig)
+ .withTableConfig(getBasicTableConfig())
+ .withPrimaryKeys("k")
+ .withComputedColumnArgs(
+ Arrays.asList("etl_create_time=now()",
"etl_update_time=now()"))
+ .build();
+ runActionWithDefaultEnv(action);
+
+ waitingTables("t1");
+
+ FileStoreTable table1 = getFileStoreTable("t1");
+ assertThat(table1.primaryKeys()).containsExactly("k");
+
+ RowType rowType1 =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(),
+ DataTypes.VARCHAR(10),
+ DataTypes.TIMESTAMP(3),
+ DataTypes.TIMESTAMP(3)
+ },
+ new String[] {"k", "v1", "etl_create_time",
"etl_update_time"});
+
+ // INSERT
+ waitForResult(
+ true,
+ Collections.singletonList(
+ "\\+I\\[1, A,
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3},
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\]"),
+ table1,
+ rowType1,
+ Collections.singletonList("k"));
+
+ List<InternalRow> data = getData("t1");
+ Timestamp createTime1 = data.get(0).getTimestamp(2, 3);
+ Timestamp updateTime1 = data.get(0).getTimestamp(3, 3);
+
+
assertThat(createTime1.toLocalDateTime()).isBefore(Timestamp.now().toLocalDateTime());
+
assertThat(updateTime1.toLocalDateTime()).isBefore(Timestamp.now().toLocalDateTime());
+
+ Thread.sleep(1000);
+
+ // UPDATE1
+ writeRecordsToKafka(topic,
"kafka/canal/database/audit-time/canal-data-2.txt");
+ waitForResult(
+ true,
+ Collections.singletonList(
+ "\\+I\\[1, B,
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3},
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\]"),
+ table1,
+ rowType1,
+ Collections.singletonList("k"));
+
+ data = getData("t1");
+ Timestamp createTime2 = data.get(0).getTimestamp(2, 3);
+ Timestamp updateTime2 = data.get(0).getTimestamp(3, 3);
+
+
assertThat(createTime2.toLocalDateTime()).isAfter(createTime1.toLocalDateTime());
+
assertThat(updateTime2.toLocalDateTime()).isAfter(updateTime1.toLocalDateTime());
+
assertThat(updateTime2.toLocalDateTime()).isBefore(Timestamp.now().toLocalDateTime());
+
+ Thread.sleep(1000);
+
+ // UPDATE2
+ writeRecordsToKafka(topic,
"kafka/canal/database/audit-time/canal-data-3.txt");
+ waitForResult(
+ true,
+ Collections.singletonList(
+ "\\+I\\[1, C,
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3},
\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{3}\\]"),
+ table1,
+ rowType1,
+ Collections.singletonList("k"));
+
+ data = getData("t1");
+ Timestamp createTime3 = data.get(0).getTimestamp(2, 3);
+ Timestamp updateTime3 = data.get(0).getTimestamp(3, 3);
+
+
assertThat(createTime3.toLocalDateTime()).isAfter(createTime1.toLocalDateTime());
+
assertThat(updateTime3.toLocalDateTime()).isAfter(updateTime2.toLocalDateTime());
+
assertThat(updateTime3.toLocalDateTime()).isBefore(Timestamp.now().toLocalDateTime());
+ }
+
@Test
@Timeout(60)
public void testMultipleTablePartitionKeys() throws Exception {
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-1.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-1.txt
new file mode 100644
index 0000000000..92935ccb20
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-1.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"k":"1","v1":"A"}],"database":"test_audit_time","es":1684770072000,"id":81,"isDdl":false,"mysqlType":{"k":"INT","v1":"VARCHAR(10)"},"old":[],"pkNames":["k"],"sql":"","sqlType":{"k":4,"v1":12},"table":"t1","ts":1684770072286,"type":"INSERT"}
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-2.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-2.txt
new file mode 100644
index 0000000000..b7b9d9b635
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-2.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"k":"1","v1":"B"}],"database":"test_audit_time","es":1684770072000,"id":81,"isDdl":false,"mysqlType":{"k":"INT","v1":"VARCHAR(10)"},"old":[{"k":"1","v1":"A"}],"pkNames":["k"],"sql":"","sqlType":{"k":4,"v1":12},"table":"t1","ts":1684770072286,"type":"UPDATE"}
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-3.txt
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-3.txt
new file mode 100644
index 0000000000..acd51960e4
--- /dev/null
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/kafka/canal/database/audit-time/canal-data-3.txt
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"data":[{"k":"1","v1":"C"}],"database":"test_audit_time","es":1684770072000,"id":81,"isDdl":false,"mysqlType":{"k":"INT","v1":"VARCHAR(10)"},"old":[{"k":"1","v1":"B"}],"pkNames":["k"],"sql":"","sqlType":{"k":4,"v1":12},"table":"t1","ts":1684770072286,"type":"UPDATE"}
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
index f8ea189500..6be78e041a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ActionITCaseBase.java
@@ -32,8 +32,10 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.RowType;
import org.apache.flink.table.api.TableEnvironment;
@@ -45,6 +47,7 @@ import org.junit.jupiter.api.BeforeEach;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -142,6 +145,22 @@ public abstract class ActionITCaseBase extends
AbstractTestBase {
incrementalIdentifier++;
}
+ protected List<InternalRow> getData(String tableName) throws Exception {
+ List<InternalRow> result = new ArrayList<>();
+
+ FileStoreTable table = this.getFileStoreTable(tableName);
+
+ ReadBuilder readBuilder = table.newReadBuilder();
+ TableScan.Plan plan = readBuilder.newScan().plan();
+ List<Split> splits = plan == null ? Collections.emptyList() :
plan.splits();
+ TableRead read = readBuilder.newRead();
+ try (RecordReader<InternalRow> recordReader =
read.createReader(splits)) {
+ recordReader.forEachRemaining(result::add);
+ }
+
+ return result;
+ }
+
protected List<String> getResult(TableRead read, List<Split> splits,
RowType rowType)
throws Exception {
try (RecordReader<InternalRow> recordReader =
read.createReader(splits)) {