This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 19eaeadc9 [core] Rename ChangelogWithKey to PrimaryKey (#2172)
19eaeadc9 is described below

commit 19eaeadc9b57b4bb7e004c57d215d708f9d731ae
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Oct 24 13:00:49 2023 +0800

    [core] Rename ChangelogWithKey to PrimaryKey (#2172)
---
 .../org/apache/paimon/operation/AbstractFileStoreScan.java |  2 +-
 .../org/apache/paimon/table/FileStoreTableFactory.java     |  2 +-
 ...eyFileStoreTable.java => PrimaryKeyFileStoreTable.java} | 14 +++++++-------
 ...logWithKeyTableUtils.java => PrimaryKeyTableUtils.java} | 11 +++++------
 ...DataTest.java => PrimaryKeyColumnTypeFileDataTest.java} |  4 ++--
 ...DataTableTest.java => PrimaryKeyFileDataTableTest.java} |  6 +++---
 ...taFilterTest.java => PrimaryKeyFileMetaFilterTest.java} | 14 +++++++-------
 ...oreTableTest.java => PrimaryKeyFileStoreTableTest.java} | 10 ++++------
 ...est.java => PrimaryKeyTableColumnTypeFileMetaTest.java} |  6 +++---
 .../org/apache/paimon/table/WritePreemptMemoryTest.java    |  8 ++++----
 .../java/org/apache/paimon/flink/sink/FlinkTableSink.java  |  8 ++++----
 .../org/apache/paimon/flink/sink/FlinkTableSinkBase.java   |  4 ++--
 .../org/apache/paimon/flink/sink/LocalMergeOperator.java   |  7 +++----
 .../java/org/apache/paimon/flink/sorter/OrderSorter.java   |  2 +-
 .../org/apache/paimon/flink/source/DataTableSource.java    |  4 ++--
 ...ableITCase.java => PrimaryKeyFileStoreTableITCase.java} |  2 +-
 .../action/SortCompactActionForDynamicBucketITCase.java    | 14 +++++++-------
 ...tisticsTest.java => PrimaryKeyTableStatisticsTest.java} |  8 ++++----
 18 files changed, 61 insertions(+), 65 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 356d002db..dd638a187 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -280,7 +280,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
         // We group files by bucket here, and filter them by the whole bucket 
filter.
         // Why do this: because in primary key table, we can't just filter the 
value
-        // by the stat in files (see 
`ChangelogWithKeyFileStoreTable.nonPartitionFilterConsumer`),
+        // by the stat in files (see 
`PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
         // but we can do this by filter the whole bucket files
         files =
                 files.stream()
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index 0680ad4e1..638253585 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -97,7 +97,7 @@ public class FileStoreTableFactory {
                 tableSchema.primaryKeys().isEmpty()
                         ? new AppendOnlyFileStoreTable(
                                 fileIO, tablePath, tableSchema, 
catalogEnvironment)
-                        : new ChangelogWithKeyFileStoreTable(
+                        : new PrimaryKeyFileStoreTable(
                                 fileIO, tablePath, tableSchema, 
catalogEnvironment);
         return table.copy(dynamicOptions.toMap());
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
similarity index 93%
rename from 
paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
rename to 
paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 0d112cc29..4a84553a4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -53,17 +53,17 @@ import static 
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMap
 import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
 
 /** {@link FileStoreTable} for primary key table. */
-public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
+public class PrimaryKeyFileStoreTable extends AbstractFileStoreTable {
 
     private static final long serialVersionUID = 1L;
 
     private transient KeyValueFileStore lazyStore;
 
-    ChangelogWithKeyFileStoreTable(FileIO fileIO, Path path, TableSchema 
tableSchema) {
+    PrimaryKeyFileStoreTable(FileIO fileIO, Path path, TableSchema 
tableSchema) {
         this(fileIO, path, tableSchema, new 
CatalogEnvironment(Lock.emptyFactory(), null, null));
     }
 
-    ChangelogWithKeyFileStoreTable(
+    PrimaryKeyFileStoreTable(
             FileIO fileIO,
             Path path,
             TableSchema tableSchema,
@@ -73,7 +73,7 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
 
     @Override
     protected FileStoreTable copy(TableSchema newTableSchema) {
-        return new ChangelogWithKeyFileStoreTable(fileIO, path, 
newTableSchema, catalogEnvironment);
+        return new PrimaryKeyFileStoreTable(fileIO, path, newTableSchema, 
catalogEnvironment);
     }
 
     @Override
@@ -83,10 +83,10 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
             Options conf = Options.fromMap(tableSchema.options());
             CoreOptions options = new CoreOptions(conf);
             KeyValueFieldsExtractor extractor =
-                    
ChangelogWithKeyTableUtils.ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
+                    PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR;
 
             MergeFunctionFactory<KeyValue> mfFactory =
-                    
ChangelogWithKeyTableUtils.createMergeFunctionFactory(tableSchema);
+                    
PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema);
             if (options.changelogProducer() == ChangelogProducer.LOOKUP) {
                 mfFactory =
                         LookupMergeFunction.wrap(
@@ -101,7 +101,7 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                             tableSchema.crossPartitionUpdate(),
                             options,
                             tableSchema.logicalPartitionType(),
-                            ChangelogWithKeyTableUtils.addKeyNamePrefix(
+                            PrimaryKeyTableUtils.addKeyNamePrefix(
                                     tableSchema.logicalBucketKeyType()),
                             new RowType(extractor.keyFields(tableSchema)),
                             rowType,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
 b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
similarity index 89%
rename from 
paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
rename to 
paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index 0ae8cc3cd..dd8fb9a75 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -37,7 +37,7 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
 
 /** Utils for creating changelog table with primary keys. */
-public class ChangelogWithKeyTableUtils {
+public class PrimaryKeyTableUtils {
 
     public static RowType addKeyNamePrefix(RowType type) {
         return new RowType(addKeyNamePrefix(type.getFields()));
@@ -61,7 +61,7 @@ public class ChangelogWithKeyTableUtils {
         Options conf = Options.fromMap(tableSchema.options());
         CoreOptions options = new CoreOptions(conf);
         CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
-        KeyValueFieldsExtractor extractor = 
ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
+        KeyValueFieldsExtractor extractor = 
PrimaryKeyFieldsExtractor.EXTRACTOR;
 
         switch (mergeEngine) {
             case DEDUPLICATE:
@@ -82,14 +82,13 @@ public class ChangelogWithKeyTableUtils {
         }
     }
 
-    static class ChangelogWithKeyKeyValueFieldsExtractor implements 
KeyValueFieldsExtractor {
+    static class PrimaryKeyFieldsExtractor implements KeyValueFieldsExtractor {
 
         private static final long serialVersionUID = 1L;
 
-        static final ChangelogWithKeyKeyValueFieldsExtractor EXTRACTOR =
-                new ChangelogWithKeyKeyValueFieldsExtractor();
+        static final PrimaryKeyFieldsExtractor EXTRACTOR = new 
PrimaryKeyFieldsExtractor();
 
-        private ChangelogWithKeyKeyValueFieldsExtractor() {}
+        private PrimaryKeyFieldsExtractor() {}
 
         @Override
         public List<DataField> keyFields(TableSchema schema) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyColumnTypeFileDataTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
similarity index 95%
rename from 
paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyColumnTypeFileDataTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
index edf5ef7ae..8ba25c661 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyColumnTypeFileDataTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
@@ -34,7 +34,7 @@ import java.util.Map;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Column type evolution for file data in changelog with key table. */
-public class ChangelogWithKeyColumnTypeFileDataTest extends 
ColumnTypeFileDataTestBase {
+public class PrimaryKeyColumnTypeFileDataTest extends 
ColumnTypeFileDataTestBase {
 
     @BeforeEach
     public void before() throws Exception {
@@ -89,7 +89,7 @@ public class ChangelogWithKeyColumnTypeFileDataTest extends 
ColumnTypeFileDataTe
     @Override
     protected FileStoreTable createFileStoreTable(Map<Long, TableSchema> 
tableSchemas) {
         SchemaManager schemaManager = new TestingSchemaManager(tablePath, 
tableSchemas);
-        return new ChangelogWithKeyFileStoreTable(fileIO, tablePath, 
schemaManager.latest().get()) {
+        return new PrimaryKeyFileStoreTable(fileIO, tablePath, 
schemaManager.latest().get()) {
             @Override
             protected SchemaManager schemaManager() {
                 return schemaManager;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileDataTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java
similarity index 97%
rename from 
paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileDataTableTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java
index 79d9472b3..4c75b23dc 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileDataTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java
@@ -36,8 +36,8 @@ import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests of {@link ChangelogWithKeyFileStoreTable} for schema evolution. */
-public class ChangelogWithKeyFileDataTableTest extends FileDataFilterTestBase {
+/** Tests of {@link PrimaryKeyFileStoreTable} for schema evolution. */
+public class PrimaryKeyFileDataTableTest extends FileDataFilterTestBase {
 
     @BeforeEach
     public void before() throws Exception {
@@ -244,7 +244,7 @@ public class ChangelogWithKeyFileDataTableTest extends 
FileDataFilterTestBase {
     @Override
     protected FileStoreTable createFileStoreTable(Map<Long, TableSchema> 
tableSchemas) {
         SchemaManager schemaManager = new TestingSchemaManager(tablePath, 
tableSchemas);
-        return new ChangelogWithKeyFileStoreTable(fileIO, tablePath, 
schemaManager.latest().get()) {
+        return new PrimaryKeyFileStoreTable(fileIO, tablePath, 
schemaManager.latest().get()) {
 
             @Override
             protected SchemaManager schemaManager() {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileMetaFilterTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java
similarity index 90%
rename from 
paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileMetaFilterTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java
index d56e45188..99b23c247 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileMetaFilterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java
@@ -33,8 +33,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-/** Tests for meta files in {@link ChangelogWithKeyFileStoreTable} with schema 
evolution. */
-public class ChangelogWithKeyFileMetaFilterTest extends FileMetaFilterTestBase 
{
+/** Tests for meta files in {@link PrimaryKeyFileStoreTable} with schema 
evolution. */
+public class PrimaryKeyFileMetaFilterTest extends FileMetaFilterTestBase {
 
     @BeforeEach
     public void before() throws Exception {
@@ -59,8 +59,8 @@ public class ChangelogWithKeyFileMetaFilterTest extends 
FileMetaFilterTestBase {
                     checkFilterRowCount(toDataFileMetas(splits), 12L);
 
                     /**
-                     * TODO ChangelogWithKeyFileStoreTable doesn't support 
value predicate and can't
-                     * get value stats. The test for filtering the primary key 
and partition already
+                     * TODO PrimaryKeyFileStoreTable doesn't support value 
predicate and can't get
+                     * value stats. The test for filtering the primary key and 
partition already
                      * exists.
                      */
                 },
@@ -98,8 +98,8 @@ public class ChangelogWithKeyFileMetaFilterTest extends 
FileMetaFilterTestBase {
                     checkFilterRowCount(toDataFileMetas(splits), 12L);
 
                     /**
-                     * TODO ChangelogWithKeyFileStoreTable doesn't support 
value predicate and can't
-                     * get value stats. The test for filtering the primary key 
and partition already
+                     * TODO PrimaryKeyFileStoreTable doesn't support value 
predicate and can't get
+                     * value stats. The test for filtering the primary key and 
partition already
                      * exists.
                      */
                 },
@@ -144,7 +144,7 @@ public class ChangelogWithKeyFileMetaFilterTest extends 
FileMetaFilterTestBase {
     @Override
     protected FileStoreTable createFileStoreTable(Map<Long, TableSchema> 
tableSchemas) {
         SchemaManager schemaManager = new TestingSchemaManager(tablePath, 
tableSchemas);
-        return new ChangelogWithKeyFileStoreTable(fileIO, tablePath, 
schemaManager.latest().get()) {
+        return new PrimaryKeyFileStoreTable(fileIO, tablePath, 
schemaManager.latest().get()) {
             @Override
             protected SchemaManager schemaManager() {
                 return schemaManager;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
similarity index 99%
rename from 
paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 5bbdfa220..a06af7b74 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -78,8 +78,8 @@ import static 
org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-/** Tests for {@link ChangelogWithKeyFileStoreTable}. */
-public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase 
{
+/** Tests for {@link PrimaryKeyFileStoreTable}. */
+public class PrimaryKeyFileStoreTableTest extends FileStoreTableTestBase {
 
     protected static final RowType COMPATIBILITY_ROW_TYPE =
             RowType.of(
@@ -1119,8 +1119,7 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 Arrays.asList("pk", "pt0", "pt1"),
                                 conf.toMap(),
                                 ""));
-        return new ChangelogWithKeyFileStoreTable(
-                FileIOFinder.find(tablePath), tablePath, tableSchema);
+        return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, tableSchema);
     }
 
     private FileStoreTable createFileStoreTable(Consumer<Options> configure, 
RowType rowType)
@@ -1137,7 +1136,6 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
                                 Arrays.asList("pt", "a"),
                                 conf.toMap(),
                                 ""));
-        return new ChangelogWithKeyFileStoreTable(
-                FileIOFinder.find(tablePath), tablePath, tableSchema);
+        return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, tableSchema);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
similarity index 95%
rename from 
paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
index 3a91bfd4a..8b3e3726f 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
@@ -37,8 +37,8 @@ import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** File meta tests for column type evolution in {@link 
ChangelogWithKeyFileStoreTable}. */
-public class ChangelogWithKeyTableColumnTypeFileMetaTest extends 
ColumnTypeFileMetaTestBase {
+/** File meta tests for column type evolution in {@link 
PrimaryKeyFileStoreTable}. */
+public class PrimaryKeyTableColumnTypeFileMetaTest extends 
ColumnTypeFileMetaTestBase {
 
     @BeforeEach
     public void before() throws Exception {
@@ -48,7 +48,7 @@ public class ChangelogWithKeyTableColumnTypeFileMetaTest 
extends ColumnTypeFileM
     @Override
     protected FileStoreTable createFileStoreTable(Map<Long, TableSchema> 
tableSchemas) {
         SchemaManager schemaManager = new TestingSchemaManager(tablePath, 
tableSchemas);
-        return new ChangelogWithKeyFileStoreTable(fileIO, tablePath, 
schemaManager.latest().get()) {
+        return new PrimaryKeyFileStoreTable(fileIO, tablePath, 
schemaManager.latest().get()) {
             @Override
             protected SchemaManager schemaManager() {
                 return schemaManager;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
index 114777609..adb0c6b8b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
@@ -44,7 +44,7 @@ import java.util.function.Consumer;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link ChangelogWithKeyFileStoreTable}. */
+/** Tests for {@link PrimaryKeyFileStoreTable}. */
 public class WritePreemptMemoryTest extends FileStoreTableTestBase {
 
     @Test
@@ -57,7 +57,7 @@ public class WritePreemptMemoryTest extends 
FileStoreTableTestBase {
         testWritePreemptMemory(true);
     }
 
-    @Override // this has been tested in ChangelogWithKeyFileStoreTableTest
+    @Override // this has been tested in PrimaryKeyFileStoreTableTest
     @Test
     public void testReadFilter() {}
 
@@ -106,7 +106,7 @@ public class WritePreemptMemoryTest extends 
FileStoreTableTestBase {
                                 Arrays.asList("pt", "a"),
                                 conf.toMap(),
                                 ""));
-        return new 
ChangelogWithKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, schema);
+        return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, schema);
     }
 
     @Override
@@ -127,6 +127,6 @@ public class WritePreemptMemoryTest extends 
FileStoreTableTestBase {
                                 Arrays.asList("pk", "pt0", "pt1"),
                                 conf.toMap(),
                                 ""));
-        return new 
ChangelogWithKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, schema);
+        return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath), 
tablePath, schema);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
index 9e6b861dd..4193ce145 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -30,7 +30,7 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.AbstractFileStoreTable;
 import org.apache.paimon.table.AppendOnlyFileStoreTable;
-import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
+import org.apache.paimon.table.PrimaryKeyFileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableUtils;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
@@ -91,9 +91,9 @@ public class FlinkTableSink extends FlinkTableSinkBase
         // Since only UPDATE_AFTER type messages can be received at present,
         // AppendOnlyFileStoreTable cannot correctly handle old data, so they 
are marked as
         // unsupported. Similarly, it is not allowed to update the primary key 
column when updating
-        // the column of ChangelogWithKeyFileStoreTable, because the old data 
cannot be handled
+        // the column of PrimaryKeyFileStoreTable, because the old data cannot 
be handled
         // correctly.
-        if (table instanceof ChangelogWithKeyFileStoreTable) {
+        if (table instanceof PrimaryKeyFileStoreTable) {
             Options options = Options.fromMap(table.options());
             Set<String> primaryKeys = new HashSet<>(table.primaryKeys());
             updatedColumns.forEach(
@@ -184,7 +184,7 @@ public class FlinkTableSink extends FlinkTableSinkBase
     }
 
     private void validateDeletable() {
-        if (table instanceof ChangelogWithKeyFileStoreTable) {
+        if (table instanceof PrimaryKeyFileStoreTable) {
             Options options = Options.fromMap(table.options());
             if (options.get(MERGE_ENGINE) == MergeEngine.DEDUPLICATE) {
                 return;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index 042b736b7..be6628bde 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -27,8 +27,8 @@ import org.apache.paimon.flink.log.LogSinkProvider;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.AppendOnlyFileStoreTable;
-import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.PrimaryKeyFileStoreTable;
 import org.apache.paimon.table.Table;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -79,7 +79,7 @@ public abstract class FlinkTableSinkBase
             // Don't check this, for example, only inserts are available from 
the database, but the
             // plan phase contains all changelogs
             return requestedMode;
-        } else if (table instanceof ChangelogWithKeyFileStoreTable) {
+        } else if (table instanceof PrimaryKeyFileStoreTable) {
             Options options = Options.fromMap(table.options());
             if (options.get(CHANGELOG_PRODUCER) == ChangelogProducer.INPUT) {
                 return requestedMode;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index aab614f03..b113ab09c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -28,7 +28,7 @@ import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.mergetree.SortBufferWriteBuffer;
 import org.apache.paimon.mergetree.compact.MergeFunction;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.ChangelogWithKeyTableUtils;
+import org.apache.paimon.table.PrimaryKeyTableUtils;
 import org.apache.paimon.table.sink.SequenceGenerator;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
@@ -77,8 +77,7 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
     public void open() throws Exception {
         super.open();
 
-        RowType keyType =
-                
ChangelogWithKeyTableUtils.addKeyNamePrefix(schema.logicalPrimaryKeysType());
+        RowType keyType = 
PrimaryKeyTableUtils.addKeyNamePrefix(schema.logicalPrimaryKeysType());
         RowType valueType = schema.logicalRowType();
         CoreOptions options = new CoreOptions(schema.options());
 
@@ -89,7 +88,7 @@ public class LocalMergeOperator extends 
AbstractStreamOperator<InternalRow>
 
         recordCount = 0;
         sequenceGenerator = SequenceGenerator.create(schema, options);
-        mergeFunction = 
ChangelogWithKeyTableUtils.createMergeFunctionFactory(schema).create();
+        mergeFunction = 
PrimaryKeyTableUtils.createMergeFunctionFactory(schema).create();
 
         buffer =
                 new SortBufferWriteBuffer(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
index 7c38d9a8c..dadd4c413 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
@@ -33,7 +33,7 @@ import org.apache.flink.table.data.RowData;
 
 import java.util.List;
 
-import static 
org.apache.paimon.table.ChangelogWithKeyTableUtils.addKeyNamePrefix;
+import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix;
 
 /** Alphabetical order sorter to sort records by the given `orderColNames`. */
 public class OrderSorter extends TableSorter {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index ef9fb5a12..db5b10cae 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -32,7 +32,7 @@ import 
org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.AppendOnlyFileStoreTable;
-import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
+import org.apache.paimon.table.PrimaryKeyFileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.utils.Projection;
@@ -140,7 +140,7 @@ public class DataTableSource extends FlinkTableSource {
 
         if (table instanceof AppendOnlyFileStoreTable) {
             return ChangelogMode.insertOnly();
-        } else if (table instanceof ChangelogWithKeyFileStoreTable) {
+        } else if (table instanceof PrimaryKeyFileStoreTable) {
             Options options = Options.fromMap(table.options());
 
             if (new CoreOptions(options).mergeEngine() == 
CoreOptions.MergeEngine.FIRST_ROW) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
similarity index 99%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index e49c830d0..eba3fae70 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -57,7 +57,7 @@ import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingO
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for changelog table with primary keys. */
-public class ChangelogWithKeyFileStoreTableITCase extends AbstractTestBase {
+public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase {
 
     // ------------------------------------------------------------------------
     //  Test Utilities
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
index 0c5cb408b..2c079f379 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
@@ -27,8 +27,8 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.Schema;
-import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.PrimaryKeyFileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.BatchTableWrite;
@@ -60,7 +60,7 @@ public class SortCompactActionForDynamicBucketITCase extends 
ActionITCaseBase {
 
         List<ManifestEntry> files = ((FileStoreTable) 
getTable()).store().newScan().plan().files();
         List<ManifestEntry> filesFilter =
-                ((ChangelogWithKeyFileStoreTable) getTable())
+                ((PrimaryKeyFileStoreTable) getTable())
                         .store()
                         .newScan()
                         .withValueFilter(predicate)
@@ -72,7 +72,7 @@ public class SortCompactActionForDynamicBucketITCase extends 
ActionITCaseBase {
         List<ManifestEntry> filesZorder =
                 ((FileStoreTable) getTable()).store().newScan().plan().files();
         List<ManifestEntry> filesFilterZorder =
-                ((ChangelogWithKeyFileStoreTable) getTable())
+                ((PrimaryKeyFileStoreTable) getTable())
                         .store()
                         .newScan()
                         .withValueFilter(predicate)
@@ -94,7 +94,7 @@ public class SortCompactActionForDynamicBucketITCase extends 
ActionITCaseBase {
         order(Arrays.asList("f2", "f1"));
         List<ManifestEntry> files = ((FileStoreTable) 
getTable()).store().newScan().plan().files();
         List<ManifestEntry> filesFilter =
-                ((ChangelogWithKeyFileStoreTable) getTable())
+                ((PrimaryKeyFileStoreTable) getTable())
                         .store()
                         .newScan()
                         .withValueFilter(predicate)
@@ -106,7 +106,7 @@ public class SortCompactActionForDynamicBucketITCase 
extends ActionITCaseBase {
         List<ManifestEntry> filesZorder =
                 ((FileStoreTable) getTable()).store().newScan().plan().files();
         List<ManifestEntry> filesFilterZorder =
-                ((ChangelogWithKeyFileStoreTable) getTable())
+                ((PrimaryKeyFileStoreTable) getTable())
                         .store()
                         .newScan()
                         .withValueFilter(predicate)
@@ -131,7 +131,7 @@ public class SortCompactActionForDynamicBucketITCase 
extends ActionITCaseBase {
 
         List<ManifestEntry> files = ((FileStoreTable) 
getTable()).store().newScan().plan().files();
         List<ManifestEntry> filesFilter =
-                ((ChangelogWithKeyFileStoreTable) getTable())
+                ((PrimaryKeyFileStoreTable) getTable())
                         .store()
                         .newScan()
                         .withValueFilter(predicate)
@@ -143,7 +143,7 @@ public class SortCompactActionForDynamicBucketITCase 
extends ActionITCaseBase {
         List<ManifestEntry> filesZorder =
                 ((FileStoreTable) getTable()).store().newScan().plan().files();
         List<ManifestEntry> filesFilterZorder =
-                ((ChangelogWithKeyFileStoreTable) getTable())
+                ((PrimaryKeyFileStoreTable) getTable())
                         .store()
                         .newScan()
                         .withValueFilter(predicate)
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/ChangelogWithKeyTableStatisticsTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
similarity index 89%
rename from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/ChangelogWithKeyTableStatisticsTest.java
rename to 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
index 489a91d65..bb58ea823 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/ChangelogWithKeyTableStatisticsTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
@@ -25,17 +25,17 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.PrimaryKeyFileStoreTable;
 
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
 
-/** Statistics tests for {@link ChangelogWithKeyFileStoreTable}. */
-public class ChangelogWithKeyTableStatisticsTest extends 
FileStoreTableStatisticsTestBase {
+/** Statistics tests for {@link PrimaryKeyFileStoreTable}. */
+public class PrimaryKeyTableStatisticsTest extends 
FileStoreTableStatisticsTestBase {
 
-    /** {@link ChangelogWithKeyFileStoreTable} does not support filter value. 
*/
+    /** {@link PrimaryKeyFileStoreTable} does not support filter value. */
     @Test
     public void testTableFilterValueStatistics() throws Exception {
         FileStoreTable table = writeData();


Reply via email to