This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.8
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 92e2f00f82edaad2340634eb6180e08d97da65bf
Author: yuzelin <[email protected]>
AuthorDate: Mon May 13 15:34:04 2024 +0800

    [core] FileStoreWrite.State should get max sequence number from 
RecordWriter instead of from data files (#3327)
---
 .../org/apache/paimon/append/AppendOnlyWriter.java |  5 +++
 .../apache/paimon/mergetree/MergeTreeWriter.java   |  5 +++
 .../paimon/operation/AbstractFileStoreWrite.java   |  6 ++++
 .../paimon/operation/AppendOnlyFileStoreWrite.java |  6 ++--
 .../apache/paimon/operation/FileStoreWrite.java    |  6 +++-
 .../paimon/operation/KeyValueFileStoreWrite.java   |  4 +--
 .../java/org/apache/paimon/utils/RecordWriter.java |  3 ++
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 42 ++++++++++++++++++++++
 .../src/test/resources/mysql/sync_table_setup.sql  | 11 ++++++
 9 files changed, 81 insertions(+), 7 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 0ae21e6b2..52f6241a9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -173,6 +173,11 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
         return compactManager.allFiles();
     }
 
+    @Override
+    public long maxSequenceNumber() {
+        return seqNumCounter.getValue() - 1;
+    }
+
     @Override
     public CommitIncrement prepareCommit(boolean waitCompaction) throws 
Exception {
         flush(false, false);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index 17673fc06..6becbf731 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -182,6 +182,11 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
         return compactManager.allFiles();
     }
 
+    @Override
+    public long maxSequenceNumber() {
+        return newSequenceNumber - 1;
+    }
+
     @Override
     public long memoryOccupancy() {
         return writeBuffer.memoryOccupancy();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 30ee9c6c1..e4ef21b55 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -52,6 +52,8 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
+
 /**
  * Base {@link FileStoreWrite} implementation.
  *
@@ -297,6 +299,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
                                 writerContainer.baseSnapshotId,
                                 writerContainer.lastModifiedCommitIdentifier,
                                 dataFiles,
+                                writerContainer.writer.maxSequenceNumber(),
                                 writerContainer.indexMaintainer,
                                 writerContainer.deletionVectorsMaintainer,
                                 increment));
@@ -317,6 +320,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
                             state.partition,
                             state.bucket,
                             state.dataFiles,
+                            state.maxSequenceNumber,
                             state.commitIncrement,
                             compactExecutor(),
                             state.deletionVectorsMaintainer);
@@ -382,6 +386,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
                         partition.copy(),
                         bucket,
                         restoreFiles,
+                        getMaxSequenceNumber(restoreFiles),
                         null,
                         compactExecutor(),
                         deletionVectorsMaintainer);
@@ -432,6 +437,7 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
             BinaryRow partition,
             int bucket,
             List<DataFileMeta> restoreFiles,
+            long restoredMaxSeqNumber,
             @Nullable CommitIncrement restoreIncrement,
             ExecutorService compactExecutor,
             @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index bb3074ac7..e7ae19919 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -54,8 +54,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 
-import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
-
 /** {@link FileStoreWrite} for {@link AppendOnlyFileStore}. */
 public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow> {
 
@@ -119,12 +117,12 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
             BinaryRow partition,
             int bucket,
             List<DataFileMeta> restoredFiles,
+            long restoredMaxSeqNumber,
             @Nullable CommitIncrement restoreIncrement,
             ExecutorService compactExecutor,
             @Nullable DeletionVectorsMaintainer ignore) {
         // let writer and compact manager hold the same reference
         // and make restore files mutable to update
-        long maxSequenceNumber = getMaxSequenceNumber(restoredFiles);
         DataFilePathFactory factory = 
pathFactory.createDataFilePathFactory(partition, bucket);
         CompactManager compactManager =
                 skipCompaction
@@ -147,7 +145,7 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
                 fileFormat,
                 targetFileSize,
                 rowType,
-                maxSequenceNumber,
+                restoredMaxSeqNumber,
                 compactManager,
                 bucketReader(partition, bucket),
                 commitForceCompact,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index 1391b6916..7f8fee45f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -146,6 +146,7 @@ public interface FileStoreWrite<T> extends 
Restorable<List<FileStoreWrite.State<
         protected final long baseSnapshotId;
         protected final long lastModifiedCommitIdentifier;
         protected final List<DataFileMeta> dataFiles;
+        protected final long maxSequenceNumber;
         @Nullable protected final IndexMaintainer<T> indexMaintainer;
         @Nullable protected final DeletionVectorsMaintainer 
deletionVectorsMaintainer;
         protected final CommitIncrement commitIncrement;
@@ -156,6 +157,7 @@ public interface FileStoreWrite<T> extends 
Restorable<List<FileStoreWrite.State<
                 long baseSnapshotId,
                 long lastModifiedCommitIdentifier,
                 Collection<DataFileMeta> dataFiles,
+                long maxSequenceNumber,
                 @Nullable IndexMaintainer<T> indexMaintainer,
                 @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer,
                 CommitIncrement commitIncrement) {
@@ -164,6 +166,7 @@ public interface FileStoreWrite<T> extends 
Restorable<List<FileStoreWrite.State<
             this.baseSnapshotId = baseSnapshotId;
             this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier;
             this.dataFiles = new ArrayList<>(dataFiles);
+            this.maxSequenceNumber = maxSequenceNumber;
             this.indexMaintainer = indexMaintainer;
             this.deletionVectorsMaintainer = deletionVectorsMaintainer;
             this.commitIncrement = commitIncrement;
@@ -172,12 +175,13 @@ public interface FileStoreWrite<T> extends 
Restorable<List<FileStoreWrite.State<
         @Override
         public String toString() {
             return String.format(
-                    "{%s, %d, %d, %d, %s, %s,  %s, %s}",
+                    "{%s, %d, %d, %d, %s, %d, %s, %s, %s}",
                     partition,
                     bucket,
                     baseSnapshotId,
                     lastModifiedCommitIdentifier,
                     dataFiles,
+                    maxSequenceNumber,
                     indexMaintainer,
                     deletionVectorsMaintainer,
                     commitIncrement);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 314019a60..b7702ef8a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -84,7 +84,6 @@ import java.util.function.Supplier;
 import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION;
 import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
 import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
-import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber;
 import static org.apache.paimon.lookup.LookupStoreFactory.bfGenerator;
 
 /** {@link FileStoreWrite} for {@link KeyValueFileStore}. */
@@ -168,6 +167,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             BinaryRow partition,
             int bucket,
             List<DataFileMeta> restoreFiles,
+            long restoredMaxSeqNumber,
             @Nullable CommitIncrement restoreIncrement,
             ExecutorService compactExecutor,
             @Nullable DeletionVectorsMaintainer dvMaintainer) {
@@ -204,7 +204,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 options.spillCompression(),
                 ioManager,
                 compactManager,
-                getMaxSequenceNumber(restoreFiles),
+                restoredMaxSeqNumber,
                 keyComparator,
                 mfFactory.create(),
                 writerFactory,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java
index d2df1be46..5795290f1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java
@@ -53,6 +53,9 @@ public interface RecordWriter<T> {
     /** Get all data files maintained by this writer. */
     Collection<DataFileMeta> dataFiles();
 
+    /** Get max sequence number of records written by this writer. */
+    long maxSequenceNumber();
+
     /**
      * Prepare for a commit.
      *
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 58ed2098e..d4f847b0a 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.action.cdc.mysql;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.FileSystemCatalogOptions;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.schema.SchemaChange;
@@ -1364,4 +1365,45 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         assertThat(actual.get("c1").description()).isEqualTo("c1 comment");
         assertThat(actual.get("c2").description()).isEqualTo("c2 comment");
     }
+
+    @Test
+    @Timeout(60)
+    public void testWriteOnlyAndSchemaEvolution() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", "write_only_and_schema_evolution");
+        mySqlConfig.put("table-name", "t");
+
+        Map<String, String> tableConfig = getBasicTableConfig();
+        tableConfig.put(CoreOptions.WRITE_ONLY.key(), "true");
+
+        MySqlSyncTableAction action =
+                
syncTableActionBuilder(mySqlConfig).withTableConfig(tableConfig).build();
+
+        runActionWithDefaultEnv(action);
+        FileStoreTable table = getFileStoreTable();
+
+        try (Statement statement = getStatement()) {
+            statement.executeUpdate("USE write_only_and_schema_evolution");
+            statement.executeUpdate("INSERT INTO t VALUES (1, 'one'), (2, 
'two')");
+            RowType rowType =
+                    RowType.of(
+                            new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10)},
+                            new String[] {"k", "v1"});
+            List<String> primaryKeys = Collections.singletonList("k");
+            List<String> expected = Arrays.asList("+I[1, one]", "+I[2, two]");
+            waitForResult(expected, table, rowType, primaryKeys);
+
+            statement.executeUpdate("ALTER TABLE t ADD COLUMN v2 INT");
+            statement.executeUpdate("UPDATE t SET v2 = 1 WHERE k = 1");
+
+            rowType =
+                    RowType.of(
+                            new DataType[] {
+                                DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10), DataTypes.INT()
+                            },
+                            new String[] {"k", "v1", "v2"});
+            expected = Arrays.asList("+I[1, one, 1]", "+I[2, two, NULL]");
+            waitForResult(expected, table, rowType, primaryKeys);
+        }
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index 5cf9cc1d9..676185fb9 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -420,3 +420,14 @@ USE invalid_alter_bucket;
 CREATE TABLE t (
     k INT PRIMARY KEY
 );
+
+-- 
################################################################################
+--  testInvalidAlterBucket
+-- 
################################################################################
+
+CREATE DATABASE write_only_and_schema_evolution;
+USE write_only_and_schema_evolution;
+CREATE TABLE t (
+    k INT PRIMARY KEY,
+    v1 VARCHAR(10)
+);

Reply via email to