This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-0.8 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 92e2f00f82edaad2340634eb6180e08d97da65bf Author: yuzelin <[email protected]> AuthorDate: Mon May 13 15:34:04 2024 +0800 [core] FileStoreWrite.State should get max sequence number from RecordWriter instead of from data files (#3327) --- .../org/apache/paimon/append/AppendOnlyWriter.java | 5 +++ .../apache/paimon/mergetree/MergeTreeWriter.java | 5 +++ .../paimon/operation/AbstractFileStoreWrite.java | 6 ++++ .../paimon/operation/AppendOnlyFileStoreWrite.java | 6 ++-- .../apache/paimon/operation/FileStoreWrite.java | 6 +++- .../paimon/operation/KeyValueFileStoreWrite.java | 4 +-- .../java/org/apache/paimon/utils/RecordWriter.java | 3 ++ .../cdc/mysql/MySqlSyncTableActionITCase.java | 42 ++++++++++++++++++++++ .../src/test/resources/mysql/sync_table_setup.sql | 11 ++++++ 9 files changed, 81 insertions(+), 7 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 0ae21e6b2..52f6241a9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -173,6 +173,11 @@ public class AppendOnlyWriter implements RecordWriter<InternalRow>, MemoryOwner return compactManager.allFiles(); } + @Override + public long maxSequenceNumber() { + return seqNumCounter.getValue() - 1; + } + @Override public CommitIncrement prepareCommit(boolean waitCompaction) throws Exception { flush(false, false); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index 17673fc06..6becbf731 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -182,6 +182,11 @@ public class MergeTreeWriter implements RecordWriter<KeyValue>, MemoryOwner { return compactManager.allFiles(); } + @Override + public long maxSequenceNumber() { + return newSequenceNumber - 1; + } + @Override public long memoryOccupancy() { return writeBuffer.memoryOccupancy(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java index 30ee9c6c1..e4ef21b55 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java @@ -52,6 +52,8 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber; + /** * Base {@link FileStoreWrite} implementation. * @@ -297,6 +299,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { writerContainer.baseSnapshotId, writerContainer.lastModifiedCommitIdentifier, dataFiles, + writerContainer.writer.maxSequenceNumber(), writerContainer.indexMaintainer, writerContainer.deletionVectorsMaintainer, increment)); @@ -317,6 +320,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { state.partition, state.bucket, state.dataFiles, + state.maxSequenceNumber, state.commitIncrement, compactExecutor(), state.deletionVectorsMaintainer); @@ -382,6 +386,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { partition.copy(), bucket, restoreFiles, + getMaxSequenceNumber(restoreFiles), null, compactExecutor(), deletionVectorsMaintainer); @@ -432,6 +437,7 @@ public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T> { BinaryRow partition, int bucket, List<DataFileMeta> restoreFiles, + long restoredMaxSeqNumber, @Nullable CommitIncrement restoreIncrement, ExecutorService compactExecutor, @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index bb3074ac7..e7ae19919 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -54,8 +54,6 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; -import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber; - /** {@link FileStoreWrite} for {@link AppendOnlyFileStore}. */ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite<InternalRow> { @@ -119,12 +117,12 @@ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite<InternalRow> BinaryRow partition, int bucket, List<DataFileMeta> restoredFiles, + long restoredMaxSeqNumber, @Nullable CommitIncrement restoreIncrement, ExecutorService compactExecutor, @Nullable DeletionVectorsMaintainer ignore) { // let writer and compact manager hold the same reference // and make restore files mutable to update - long maxSequenceNumber = getMaxSequenceNumber(restoredFiles); DataFilePathFactory factory = pathFactory.createDataFilePathFactory(partition, bucket); CompactManager compactManager = skipCompaction @@ -147,7 +145,7 @@ public class AppendOnlyFileStoreWrite extends MemoryFileStoreWrite<InternalRow> fileFormat, targetFileSize, rowType, - maxSequenceNumber, + restoredMaxSeqNumber, compactManager, bucketReader(partition, bucket), commitForceCompact, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java index 1391b6916..7f8fee45f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java @@ -146,6 +146,7 @@ public interface FileStoreWrite<T> extends Restorable<List<FileStoreWrite.State< protected final long baseSnapshotId; protected final long lastModifiedCommitIdentifier; protected final List<DataFileMeta> dataFiles; + protected final long maxSequenceNumber; @Nullable protected final IndexMaintainer<T> indexMaintainer; @Nullable protected final DeletionVectorsMaintainer deletionVectorsMaintainer; protected final CommitIncrement commitIncrement; @@ -156,6 +157,7 @@ public interface FileStoreWrite<T> extends Restorable<List<FileStoreWrite.State< long baseSnapshotId, long lastModifiedCommitIdentifier, Collection<DataFileMeta> dataFiles, + long maxSequenceNumber, @Nullable IndexMaintainer<T> indexMaintainer, @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer, CommitIncrement commitIncrement) { @@ -164,6 +166,7 @@ public interface FileStoreWrite<T> extends Restorable<List<FileStoreWrite.State< this.baseSnapshotId = baseSnapshotId; this.lastModifiedCommitIdentifier = lastModifiedCommitIdentifier; this.dataFiles = new ArrayList<>(dataFiles); + this.maxSequenceNumber = maxSequenceNumber; this.indexMaintainer = indexMaintainer; this.deletionVectorsMaintainer = deletionVectorsMaintainer; this.commitIncrement = commitIncrement; @@ -172,12 +175,13 @@ public interface FileStoreWrite<T> extends Restorable<List<FileStoreWrite.State< @Override public String toString() { return String.format( - "{%s, %d, %d, %d, %s, %s, %s, %s}", + "{%s, %d, %d, %d, %s, %d, %s, %s, %s}", partition, bucket, baseSnapshotId, lastModifiedCommitIdentifier, dataFiles, + maxSequenceNumber, indexMaintainer, deletionVectorsMaintainer, commitIncrement); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java index 314019a60..b7702ef8a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java @@ -84,7 +84,6 @@ import java.util.function.Supplier; import static org.apache.paimon.CoreOptions.ChangelogProducer.FULL_COMPACTION; import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE; import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW; -import static org.apache.paimon.io.DataFileMeta.getMaxSequenceNumber; import static org.apache.paimon.lookup.LookupStoreFactory.bfGenerator; /** {@link FileStoreWrite} for {@link KeyValueFileStore}. */ @@ -168,6 +167,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { BinaryRow partition, int bucket, List<DataFileMeta> restoreFiles, + long restoredMaxSeqNumber, @Nullable CommitIncrement restoreIncrement, ExecutorService compactExecutor, @Nullable DeletionVectorsMaintainer dvMaintainer) { @@ -204,7 +204,7 @@ public class KeyValueFileStoreWrite extends MemoryFileStoreWrite<KeyValue> { options.spillCompression(), ioManager, compactManager, - getMaxSequenceNumber(restoreFiles), + restoredMaxSeqNumber, keyComparator, mfFactory.create(), writerFactory, diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java index d2df1be46..5795290f1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/RecordWriter.java @@ -53,6 +53,9 @@ public interface RecordWriter<T> { /** Get all data files maintained by this writer. */ Collection<DataFileMeta> dataFiles(); + /** Get max sequence number of records written by this writer. */ + long maxSequenceNumber(); + /** * Prepare for a commit. * diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java index 58ed2098e..d4f847b0a 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.action.cdc.mysql; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.FileSystemCatalogOptions; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.schema.SchemaChange; @@ -1364,4 +1365,45 @@ public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase { assertThat(actual.get("c1").description()).isEqualTo("c1 comment"); assertThat(actual.get("c2").description()).isEqualTo("c2 comment"); } + + @Test + @Timeout(60) + public void testWriteOnlyAndSchemaEvolution() throws Exception { + Map<String, String> mySqlConfig = getBasicMySqlConfig(); + mySqlConfig.put("database-name", "write_only_and_schema_evolution"); + mySqlConfig.put("table-name", "t"); + + Map<String, String> tableConfig = getBasicTableConfig(); + tableConfig.put(CoreOptions.WRITE_ONLY.key(), "true"); + + MySqlSyncTableAction action = + syncTableActionBuilder(mySqlConfig).withTableConfig(tableConfig).build(); + + runActionWithDefaultEnv(action); + FileStoreTable table = getFileStoreTable(); + + try (Statement statement = getStatement()) { + statement.executeUpdate("USE write_only_and_schema_evolution"); + statement.executeUpdate("INSERT INTO t VALUES (1, 'one'), (2, 'two')"); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, + new String[] {"k", "v1"}); + List<String> primaryKeys = Collections.singletonList("k"); + List<String> expected = Arrays.asList("+I[1, one]", "+I[2, two]"); + waitForResult(expected, table, rowType, primaryKeys); + + statement.executeUpdate("ALTER TABLE t ADD COLUMN v2 INT"); + statement.executeUpdate("UPDATE t SET v2 = 1 WHERE k = 1"); + + rowType = + RowType.of( + new DataType[] { + DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT() + }, + new String[] {"k", "v1", "v2"}); + expected = Arrays.asList("+I[1, one, 1]", "+I[2, two, NULL]"); + waitForResult(expected, table, rowType, primaryKeys); + } + } } diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql index 5cf9cc1d9..676185fb9 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql @@ -420,3 +420,14 @@ USE invalid_alter_bucket; CREATE TABLE t ( k INT PRIMARY KEY ); + +-- ################################################################################ +-- testInvalidAlterBucket +-- ################################################################################ + +CREATE DATABASE write_only_and_schema_evolution; +USE write_only_and_schema_evolution; +CREATE TABLE t ( + k INT PRIMARY KEY, + v1 VARCHAR(10) +);
