This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d6ce2808f [core] Introduce consumer expiration (#1348)
d6ce2808f is described below
commit d6ce2808f1b0b810de594fde99e6586773295b08
Author: yuzelin <[email protected]>
AuthorDate: Fri Jun 9 18:51:18 2023 +0800
[core] Introduce consumer expiration (#1348)
---
docs/content/how-to/querying-tables.md | 4 +--
.../shortcodes/generated/core_configuration.html | 6 ++++
.../generated/flink_connector_configuration.html | 2 +-
.../main/java/org/apache/paimon/fs/FileStatus.java | 8 +++++
.../org/apache/paimon/fs/hadoop/HadoopFileIO.java | 5 +++
.../org/apache/paimon/fs/local/LocalFileIO.java | 5 +++
.../org/apache/paimon/utils/DateTimeUtils.java | 10 ++++++
.../main/java/org/apache/paimon/CoreOptions.java | 12 ++++++++
.../apache/paimon/consumer/ConsumerManager.java | 36 ++++++++++++++++++++++
.../paimon/table/AbstractFileStoreTable.java | 5 ++-
.../apache/paimon/table/sink/TableCommitImpl.java | 16 +++++++++-
.../table/source/InnerStreamTableScanImpl.java | 2 +-
.../paimon/consumer/ConsumerManagerTest.java | 23 ++++++++++++++
.../paimon/table/FileStoreTableTestBase.java | 21 +++++++++++++
.../org/apache/paimon/utils/FailingFileIO.java | 5 +++
.../apache/paimon/oss/HadoopCompliantFileIO.java | 5 +++
.../apache/paimon/s3/HadoopCompliantFileIO.java | 5 +++
.../java/org/apache/paimon/flink/FlinkFileIO.java | 5 +++
.../cdc/mysql/MySqlDebeziumJsonEventParser.java | 4 +--
19 files changed, 170 insertions(+), 9 deletions(-)
diff --git a/docs/content/how-to/querying-tables.md
b/docs/content/how-to/querying-tables.md
index 8b0ca96cc..b55132df7 100644
--- a/docs/content/how-to/querying-tables.md
+++ b/docs/content/how-to/querying-tables.md
@@ -162,8 +162,8 @@ When stream read Paimon tables, the next snapshot id to be
recorded into the fil
and if there are consumers that still depend on this snapshot, then this
snapshot will not be deleted by expiration.
{{< hint info >}}
-NOTE: If there is a consumer that will not be used anymore, please delete it,
otherwise it will affect the expiration
-of the snapshot. The consumer file is in
`${table-path}/consumer/consumer-${consumer-id}`.
+NOTE: The consumer will prevent expiration of the snapshot. You can specify
'consumer.expiration-time' to manage the
+lifetime of consumers.
{{< /hint >}}
## System Tables
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 9bc9ea6df..872a8da15 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -98,6 +98,12 @@ under the License.
<td>String</td>
<td>Consumer id for recording the offset of consumption in the
storage.</td>
</tr>
+ <tr>
+ <td><h5>consumer.expiration-time</h5></td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>Duration</td>
+ <td>The expiration interval of consumer files. A consumer file
will be expired if it's lifetime after last modification is over this
value.</td>
+ </tr>
<tr>
<td><h5>continuous.discovery-interval</h5></td>
<td style="word-wrap: break-word;">10 s</td>
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index c76abeccd..d1776e9f8 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -59,7 +59,7 @@ under the License.
<tr>
<td><h5>scan.split-enumerator.mode</h5></td>
<td style="word-wrap: break-word;">fair</td>
- <td>Enum</td>
+ <td><p>Enum</p></td>
<td>The mode used by StaticFileStoreSplitEnumerator to assign
splits.<br /><br />Possible values:<ul><li>"fair": Distribute splits evenly
when batch reading to prevent a few tasks from reading
all.</li><li>"preemptive": Distribute splits preemptively according to the
consumption speed of the task.</li></ul></td>
</tr>
<tr>
diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileStatus.java
b/paimon-common/src/main/java/org/apache/paimon/fs/FileStatus.java
index 53eb6023f..8308f5205 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/FileStatus.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileStatus.java
@@ -48,4 +48,12 @@ public interface FileStatus {
* @return the corresponding Path to the FileStatus
*/
Path getPath();
+
+ /**
+ * Get the last modification time of the file.
+ *
+ * @return A long value representing the time the file was last modified,
measured in
+ * milliseconds since the epoch (UTC January 1, 1970).
+ */
+ long getModificationTime();
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
index 3cb7cfee7..dd15b5280 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
@@ -284,5 +284,10 @@ public class HadoopFileIO implements FileIO {
public Path getPath() {
return new Path(status.getPath().toUri());
}
+
+ @Override
+ public long getModificationTime() {
+ return status.getModificationTime();
+ }
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
index bf6e3eab2..f1d941c6f 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/local/LocalFileIO.java
@@ -337,5 +337,10 @@ public class LocalFileIO implements FileIO {
public Path getPath() {
return new Path(scheme + ":" + file.toURI().getPath());
}
+
+ @Override
+ public long getModificationTime() {
+ return file.lastModified();
+ }
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
index 42433a55b..819334b34 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/DateTimeUtils.java
@@ -21,9 +21,11 @@ package org.apache.paimon.utils;
import org.apache.paimon.data.Timestamp;
import java.time.DateTimeException;
+import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
+import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeFormatterBuilder;
import java.time.temporal.TemporalAccessor;
@@ -416,6 +418,14 @@ public class DateTimeUtils {
fromTemporalAccessor(DEFAULT_TIMESTAMP_FORMATTER.parse(dateStr), precision));
}
+ public static LocalDateTime toLocalDateTime(long timeMills) {
+ return toLocalDateTime(timeMills, ZoneId.systemDefault());
+ }
+
+ public static LocalDateTime toLocalDateTime(long timeMills, ZoneId zoneId)
{
+ return
Instant.ofEpochMilli(timeMills).atZone(zoneId).toLocalDateTime();
+ }
+
public static LocalDateTime toLocalDateTime(String dateStr, int precision)
{
return
fromTemporalAccessor(DEFAULT_TIMESTAMP_FORMATTER.parse(dateStr), precision);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index 0d66e95e0..d38150de0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -640,6 +640,14 @@ public class CoreOptions implements Serializable {
+ ": Read from the
data of table log store."))
.build());
+ public static final ConfigOption<Duration> CONSUMER_EXPIRATION_TIME =
+ key("consumer.expiration-time")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "The expiration interval of consumer files. A
consumer file will be expired if "
+ + "it's lifetime after last modification
is over this value.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -926,6 +934,10 @@ public class CoreOptions implements Serializable {
return options.get(STREAMING_READ_MODE);
}
+ public Duration consumerExpireTime() {
+ return options.get(CONSUMER_EXPIRATION_TIME);
+ }
+
/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
index 273a32dfc..d9565b0a9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
@@ -22,12 +22,14 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.utils.DateTimeUtils;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
+import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Optional;
import java.util.OptionalLong;
@@ -90,6 +92,36 @@ public class ConsumerManager implements Serializable {
}
}
+ public void expire(LocalDateTime expireDateTime) {
+ try {
+ Path directory = consumerDirectory();
+ if (!fileIO.exists(directory)) {
+ return;
+ }
+
+ FileStatus[] statuses = fileIO.listStatus(directory);
+
+ if (statuses == null) {
+ throw new RuntimeException(
+ String.format(
+ "The return value is null of the listStatus
for the '%s' directory.",
+ directory));
+ }
+
+ for (FileStatus status : statuses) {
+ if (isConsumerFile(status.getPath().getName())) {
+ LocalDateTime modificationTime =
+
DateTimeUtils.toLocalDateTime(status.getModificationTime());
+ if (expireDateTime.isAfter(modificationTime)) {
+ fileIO.deleteQuietly(status.getPath());
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
private Path consumerDirectory() {
return new Path(tablePath + "/consumer");
}
@@ -97,4 +129,8 @@ public class ConsumerManager implements Serializable {
private Path consumerPath(String consumerId) {
return new Path(tablePath + "/consumer/" + CONSUMER_PREFIX +
consumerId);
}
+
+ private boolean isConsumerFile(String file) {
+ return file.startsWith(CONSUMER_PREFIX);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 7f2c9d602..db85d12b2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.operation.FileStoreScan;
@@ -228,7 +229,9 @@ public abstract class AbstractFileStoreTable implements
FileStoreTable {
return new TableCommitImpl(
store().newCommit(commitUser),
coreOptions().writeOnly() ? null : store().newExpire(),
- coreOptions().writeOnly() ? null :
store().newPartitionExpire(commitUser));
+ coreOptions().writeOnly() ? null :
store().newPartitionExpire(commitUser),
+ CoreOptions.fromMap(options()).consumerExpireTime(),
+ new ConsumerManager(fileIO, path));
}
private Optional<TableSchema> tryTimeTravel(Options options) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 9fe8ea3fb..a8758fb92 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -18,6 +18,7 @@
package org.apache.paimon.table.sink;
+import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreExpire;
@@ -26,6 +27,8 @@ import org.apache.paimon.operation.PartitionExpire;
import javax.annotation.Nullable;
+import java.time.Duration;
+import java.time.LocalDateTime;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -43,6 +46,8 @@ public class TableCommitImpl implements InnerTableCommit {
private final FileStoreCommit commit;
@Nullable private final FileStoreExpire expire;
@Nullable private final PartitionExpire partitionExpire;
+ @Nullable private final Duration consumerExpireTime;
+ private final ConsumerManager consumerManager;
@Nullable private Map<String, String> overwritePartition = null;
@Nullable private Lock lock;
@@ -52,10 +57,14 @@ public class TableCommitImpl implements InnerTableCommit {
public TableCommitImpl(
FileStoreCommit commit,
@Nullable FileStoreExpire expire,
- @Nullable PartitionExpire partitionExpire) {
+ @Nullable PartitionExpire partitionExpire,
+ @Nullable Duration consumerExpireTime,
+ ConsumerManager consumerManager) {
this.commit = commit;
this.expire = expire;
this.partitionExpire = partitionExpire;
+ this.consumerExpireTime = consumerExpireTime;
+ this.consumerManager = consumerManager;
}
@Override
@@ -137,6 +146,11 @@ public class TableCommitImpl implements InnerTableCommit {
}
private void expire(long partitionExpireIdentifier) {
+ // expire consumer first to avoid preventing snapshot expiration
+ if (consumerExpireTime != null) {
+
consumerManager.expire(LocalDateTime.now().minus(consumerExpireTime));
+ }
+
if (expire != null) {
expire.expire();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
index 42f8714f7..b62978644 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java
@@ -120,7 +120,7 @@ public class InnerStreamTableScanImpl extends
AbstractInnerTableScan
if (earliestSnapshotId != null && earliestSnapshotId >
nextSnapshotId) {
throw new OutOfRangeException(
String.format(
- "The snapshot with id %d has expired., You
can: "
+ "The snapshot with id %d has expired. You
can: "
+ "1. increase the snapshot
expiration time. "
+ "2. use consumer-id to ensure
that unconsumed snapshots will not be expired.",
nextSnapshotId));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
index 572055f17..cab65809d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/consumer/ConsumerManagerTest.java
@@ -19,12 +19,14 @@
package org.apache.paimon.consumer;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.utils.DateTimeUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import java.nio.file.Path;
+import java.time.LocalDateTime;
import java.util.Optional;
import java.util.OptionalLong;
@@ -61,4 +63,25 @@ public class ConsumerManagerTest {
assertThat(manager.minNextSnapshot()).isEqualTo(OptionalLong.of(5L));
}
+
+ @Test
+ public void testExpire() throws Exception {
+ manager.recordConsumer("id1", new Consumer(1));
+ Thread.sleep(1000);
+ LocalDateTime expireDateTime =
DateTimeUtils.toLocalDateTime(System.currentTimeMillis());
+ Thread.sleep(1000);
+ manager.recordConsumer("id2", new Consumer(2));
+
+ // check expire
+ manager.expire(expireDateTime);
+ assertThat(manager.consumer("id1")).isEmpty();
+
assertThat(manager.consumer("id2")).map(Consumer::nextSnapshot).get().isEqualTo(2L);
+
+ // check last modification
+ expireDateTime =
DateTimeUtils.toLocalDateTime(System.currentTimeMillis());
+ Thread.sleep(1000);
+ manager.recordConsumer("id2", new Consumer(3));
+ manager.expire(expireDateTime);
+
assertThat(manager.consumer("id2")).map(Consumer::nextSnapshot).get().isEqualTo(3L);
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index 70418c778..e814d19a4 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -49,6 +49,7 @@ import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
@@ -630,6 +631,26 @@ public abstract class FileStoreTableTestBase {
assertThat(result)
.containsExactlyInAnyOrder("+1|40|400|binary|varbinary|mapKey:mapVal|multiset");
+ // expire consumer and then test snapshot expiration
+ Thread.sleep(1000);
+ table =
+ table.copy(
+ Collections.singletonMap(
+ CoreOptions.CONSUMER_EXPIRATION_TIME.key(), "1
s"));
+ // commit to trigger expiration
+ writeBuilder = table.newStreamWriteBuilder();
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ write.write(rowData(1, 100, 1000L));
+ commit.commit(9, write.prepareCommit(true, 9));
+
+ StreamTableScan finalScan = scan;
+ assertThatThrownBy(finalScan::plan)
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ OutOfRangeException.class, "The snapshot with
id 5 has expired."));
+
write.close();
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/FailingFileIO.java
b/paimon-core/src/test/java/org/apache/paimon/utils/FailingFileIO.java
index 0c45cedb7..7eb9ed3fc 100644
--- a/paimon-core/src/test/java/org/apache/paimon/utils/FailingFileIO.java
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/FailingFileIO.java
@@ -167,6 +167,11 @@ public class FailingFileIO extends TraceableFileIO {
return path;
}
+ @Override
+ public long getModificationTime() {
+ return file.lastModified();
+ }
+
@Override
public String toString() {
return "FailingLocalFileStatus{file=" + this.file + ", path=" +
this.path + '}';
diff --git
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
index bb4810aa4..d6fef224b 100644
---
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
+++
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/HadoopCompliantFileIO.java
@@ -267,5 +267,10 @@ public abstract class HadoopCompliantFileIO implements
FileIO {
public Path getPath() {
return new Path(status.getPath().toUri());
}
+
+ @Override
+ public long getModificationTime() {
+ return status.getModificationTime();
+ }
}
}
diff --git
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
index 65fd1b103..fe86aca96 100644
---
a/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
+++
b/paimon-filesystems/paimon-s3-impl/src/main/java/org/apache/paimon/s3/HadoopCompliantFileIO.java
@@ -267,5 +267,10 @@ public abstract class HadoopCompliantFileIO implements
FileIO {
public Path getPath() {
return new Path(status.getPath().toUri());
}
+
+ @Override
+ public long getModificationTime() {
+ return status.getModificationTime();
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
index b65c66280..db66379f3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
@@ -222,5 +222,10 @@ public class FlinkFileIO implements FileIO {
public Path getPath() {
return new Path(status.getPath().toUri());
}
+
+ @Override
+ public long getModificationTime() {
+ return status.getModificationTime();
+ }
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
index 691d59a0f..e814e9701 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlDebeziumJsonEventParser.java
@@ -271,9 +271,7 @@ public class MySqlDebeziumJsonEventParser implements
EventParser<String> {
// RowDataDebeziumDeserializeSchema#convertToTimestamp in
flink-cdc-connector
// for implementation
LocalDateTime localDateTime =
- Instant.ofEpochMilli(Long.parseLong(oldValue))
- .atZone(ZoneOffset.UTC)
- .toLocalDateTime();
+
DateTimeUtils.toLocalDateTime(Long.parseLong(oldValue), ZoneOffset.UTC);
newValue = DateTimeUtils.formatLocalDateTime(localDateTime, 3);
} else if ("io.debezium.time.MicroTimestamp".equals(className)) {
// MySQL datetime (precision 4-6)