This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 19eaeadc9 [core] Rename ChangelogWithKey to PrimaryKey (#2172)
19eaeadc9 is described below
commit 19eaeadc9b57b4bb7e004c57d215d708f9d731ae
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Oct 24 13:00:49 2023 +0800
[core] Rename ChangelogWithKey to PrimaryKey (#2172)
---
.../org/apache/paimon/operation/AbstractFileStoreScan.java | 2 +-
.../org/apache/paimon/table/FileStoreTableFactory.java | 2 +-
...eyFileStoreTable.java => PrimaryKeyFileStoreTable.java} | 14 +++++++-------
...logWithKeyTableUtils.java => PrimaryKeyTableUtils.java} | 11 +++++------
...DataTest.java => PrimaryKeyColumnTypeFileDataTest.java} | 4 ++--
...DataTableTest.java => PrimaryKeyFileDataTableTest.java} | 6 +++---
...taFilterTest.java => PrimaryKeyFileMetaFilterTest.java} | 14 +++++++-------
...oreTableTest.java => PrimaryKeyFileStoreTableTest.java} | 10 ++++------
...est.java => PrimaryKeyTableColumnTypeFileMetaTest.java} | 6 +++---
.../org/apache/paimon/table/WritePreemptMemoryTest.java | 8 ++++----
.../java/org/apache/paimon/flink/sink/FlinkTableSink.java | 8 ++++----
.../org/apache/paimon/flink/sink/FlinkTableSinkBase.java | 4 ++--
.../org/apache/paimon/flink/sink/LocalMergeOperator.java | 7 +++----
.../java/org/apache/paimon/flink/sorter/OrderSorter.java | 2 +-
.../org/apache/paimon/flink/source/DataTableSource.java | 4 ++--
...ableITCase.java => PrimaryKeyFileStoreTableITCase.java} | 2 +-
.../action/SortCompactActionForDynamicBucketITCase.java | 14 +++++++-------
...tisticsTest.java => PrimaryKeyTableStatisticsTest.java} | 8 ++++----
18 files changed, 61 insertions(+), 65 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 356d002db..dd638a187 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -280,7 +280,7 @@ public abstract class AbstractFileStoreScan implements
FileStoreScan {
// We group files by bucket here, and filter them by the whole bucket
filter.
// Why do this: because in primary key table, we can't just filter the
value
- // by the stat in files (see
`ChangelogWithKeyFileStoreTable.nonPartitionFilterConsumer`),
+ // by the stat in files (see
`PrimaryKeyFileStoreTable.nonPartitionFilterConsumer`),
// but we can do this by filter the whole bucket files
files =
files.stream()
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
index 0680ad4e1..638253585 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTableFactory.java
@@ -97,7 +97,7 @@ public class FileStoreTableFactory {
tableSchema.primaryKeys().isEmpty()
? new AppendOnlyFileStoreTable(
fileIO, tablePath, tableSchema,
catalogEnvironment)
- : new ChangelogWithKeyFileStoreTable(
+ : new PrimaryKeyFileStoreTable(
fileIO, tablePath, tableSchema,
catalogEnvironment);
return table.copy(dynamicOptions.toMap());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
similarity index 93%
rename from
paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
rename to
paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 0d112cc29..4a84553a4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -53,17 +53,17 @@ import static
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMap
import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
/** {@link FileStoreTable} for primary key table. */
-public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
+public class PrimaryKeyFileStoreTable extends AbstractFileStoreTable {
private static final long serialVersionUID = 1L;
private transient KeyValueFileStore lazyStore;
- ChangelogWithKeyFileStoreTable(FileIO fileIO, Path path, TableSchema
tableSchema) {
+ PrimaryKeyFileStoreTable(FileIO fileIO, Path path, TableSchema
tableSchema) {
this(fileIO, path, tableSchema, new
CatalogEnvironment(Lock.emptyFactory(), null, null));
}
- ChangelogWithKeyFileStoreTable(
+ PrimaryKeyFileStoreTable(
FileIO fileIO,
Path path,
TableSchema tableSchema,
@@ -73,7 +73,7 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
@Override
protected FileStoreTable copy(TableSchema newTableSchema) {
- return new ChangelogWithKeyFileStoreTable(fileIO, path,
newTableSchema, catalogEnvironment);
+ return new PrimaryKeyFileStoreTable(fileIO, path, newTableSchema,
catalogEnvironment);
}
@Override
@@ -83,10 +83,10 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
Options conf = Options.fromMap(tableSchema.options());
CoreOptions options = new CoreOptions(conf);
KeyValueFieldsExtractor extractor =
-
ChangelogWithKeyTableUtils.ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
+ PrimaryKeyTableUtils.PrimaryKeyFieldsExtractor.EXTRACTOR;
MergeFunctionFactory<KeyValue> mfFactory =
-
ChangelogWithKeyTableUtils.createMergeFunctionFactory(tableSchema);
+
PrimaryKeyTableUtils.createMergeFunctionFactory(tableSchema);
if (options.changelogProducer() == ChangelogProducer.LOOKUP) {
mfFactory =
LookupMergeFunction.wrap(
@@ -101,7 +101,7 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
tableSchema.crossPartitionUpdate(),
options,
tableSchema.logicalPartitionType(),
- ChangelogWithKeyTableUtils.addKeyNamePrefix(
+ PrimaryKeyTableUtils.addKeyNamePrefix(
tableSchema.logicalBucketKeyType()),
new RowType(extractor.keyFields(tableSchema)),
rowType,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
similarity index 89%
rename from
paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
rename to
paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
index 0ae8cc3cd..dd8fb9a75 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyTableUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java
@@ -37,7 +37,7 @@ import java.util.stream.Collectors;
import static org.apache.paimon.schema.SystemColumns.KEY_FIELD_PREFIX;
/** Utils for creating changelog table with primary keys. */
-public class ChangelogWithKeyTableUtils {
+public class PrimaryKeyTableUtils {
public static RowType addKeyNamePrefix(RowType type) {
return new RowType(addKeyNamePrefix(type.getFields()));
@@ -61,7 +61,7 @@ public class ChangelogWithKeyTableUtils {
Options conf = Options.fromMap(tableSchema.options());
CoreOptions options = new CoreOptions(conf);
CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
- KeyValueFieldsExtractor extractor =
ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
+ KeyValueFieldsExtractor extractor =
PrimaryKeyFieldsExtractor.EXTRACTOR;
switch (mergeEngine) {
case DEDUPLICATE:
@@ -82,14 +82,13 @@ public class ChangelogWithKeyTableUtils {
}
}
- static class ChangelogWithKeyKeyValueFieldsExtractor implements
KeyValueFieldsExtractor {
+ static class PrimaryKeyFieldsExtractor implements KeyValueFieldsExtractor {
private static final long serialVersionUID = 1L;
- static final ChangelogWithKeyKeyValueFieldsExtractor EXTRACTOR =
- new ChangelogWithKeyKeyValueFieldsExtractor();
+ static final PrimaryKeyFieldsExtractor EXTRACTOR = new
PrimaryKeyFieldsExtractor();
- private ChangelogWithKeyKeyValueFieldsExtractor() {}
+ private PrimaryKeyFieldsExtractor() {}
@Override
public List<DataField> keyFields(TableSchema schema) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyColumnTypeFileDataTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
similarity index 95%
rename from
paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyColumnTypeFileDataTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
index edf5ef7ae..8ba25c661 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyColumnTypeFileDataTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java
@@ -34,7 +34,7 @@ import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
/** Column type evolution for file data in changelog with key table. */
-public class ChangelogWithKeyColumnTypeFileDataTest extends
ColumnTypeFileDataTestBase {
+public class PrimaryKeyColumnTypeFileDataTest extends
ColumnTypeFileDataTestBase {
@BeforeEach
public void before() throws Exception {
@@ -89,7 +89,7 @@ public class ChangelogWithKeyColumnTypeFileDataTest extends
ColumnTypeFileDataTe
@Override
protected FileStoreTable createFileStoreTable(Map<Long, TableSchema>
tableSchemas) {
SchemaManager schemaManager = new TestingSchemaManager(tablePath,
tableSchemas);
- return new ChangelogWithKeyFileStoreTable(fileIO, tablePath,
schemaManager.latest().get()) {
+ return new PrimaryKeyFileStoreTable(fileIO, tablePath,
schemaManager.latest().get()) {
@Override
protected SchemaManager schemaManager() {
return schemaManager;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileDataTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java
similarity index 97%
rename from
paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileDataTableTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java
index 79d9472b3..4c75b23dc 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileDataTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileDataTableTest.java
@@ -36,8 +36,8 @@ import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests of {@link ChangelogWithKeyFileStoreTable} for schema evolution. */
-public class ChangelogWithKeyFileDataTableTest extends FileDataFilterTestBase {
+/** Tests of {@link PrimaryKeyFileStoreTable} for schema evolution. */
+public class PrimaryKeyFileDataTableTest extends FileDataFilterTestBase {
@BeforeEach
public void before() throws Exception {
@@ -244,7 +244,7 @@ public class ChangelogWithKeyFileDataTableTest extends
FileDataFilterTestBase {
@Override
protected FileStoreTable createFileStoreTable(Map<Long, TableSchema>
tableSchemas) {
SchemaManager schemaManager = new TestingSchemaManager(tablePath,
tableSchemas);
- return new ChangelogWithKeyFileStoreTable(fileIO, tablePath,
schemaManager.latest().get()) {
+ return new PrimaryKeyFileStoreTable(fileIO, tablePath,
schemaManager.latest().get()) {
@Override
protected SchemaManager schemaManager() {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileMetaFilterTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java
similarity index 90%
rename from
paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileMetaFilterTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java
index d56e45188..99b23c247 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileMetaFilterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileMetaFilterTest.java
@@ -33,8 +33,8 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-/** Tests for meta files in {@link ChangelogWithKeyFileStoreTable} with schema
evolution. */
-public class ChangelogWithKeyFileMetaFilterTest extends FileMetaFilterTestBase
{
+/** Tests for meta files in {@link PrimaryKeyFileStoreTable} with schema
evolution. */
+public class PrimaryKeyFileMetaFilterTest extends FileMetaFilterTestBase {
@BeforeEach
public void before() throws Exception {
@@ -59,8 +59,8 @@ public class ChangelogWithKeyFileMetaFilterTest extends
FileMetaFilterTestBase {
checkFilterRowCount(toDataFileMetas(splits), 12L);
/**
- * TODO ChangelogWithKeyFileStoreTable doesn't support
value predicate and can't
- * get value stats. The test for filtering the primary key
and partition already
+ * TODO PrimaryKeyFileStoreTable doesn't support value
predicate and can't get
+ * value stats. The test for filtering the primary key and
partition already
* exists.
*/
},
@@ -98,8 +98,8 @@ public class ChangelogWithKeyFileMetaFilterTest extends
FileMetaFilterTestBase {
checkFilterRowCount(toDataFileMetas(splits), 12L);
/**
- * TODO ChangelogWithKeyFileStoreTable doesn't support
value predicate and can't
- * get value stats. The test for filtering the primary key
and partition already
+ * TODO PrimaryKeyFileStoreTable doesn't support value
predicate and can't get
+ * value stats. The test for filtering the primary key and
partition already
* exists.
*/
},
@@ -144,7 +144,7 @@ public class ChangelogWithKeyFileMetaFilterTest extends
FileMetaFilterTestBase {
@Override
protected FileStoreTable createFileStoreTable(Map<Long, TableSchema>
tableSchemas) {
SchemaManager schemaManager = new TestingSchemaManager(tablePath,
tableSchemas);
- return new ChangelogWithKeyFileStoreTable(fileIO, tablePath,
schemaManager.latest().get()) {
+ return new PrimaryKeyFileStoreTable(fileIO, tablePath,
schemaManager.latest().get()) {
@Override
protected SchemaManager schemaManager() {
return schemaManager;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
similarity index 99%
rename from
paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 5bbdfa220..a06af7b74 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -78,8 +78,8 @@ import static
org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** Tests for {@link ChangelogWithKeyFileStoreTable}. */
-public class ChangelogWithKeyFileStoreTableTest extends FileStoreTableTestBase
{
+/** Tests for {@link PrimaryKeyFileStoreTable}. */
+public class PrimaryKeyFileStoreTableTest extends FileStoreTableTestBase {
protected static final RowType COMPATIBILITY_ROW_TYPE =
RowType.of(
@@ -1119,8 +1119,7 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
Arrays.asList("pk", "pt0", "pt1"),
conf.toMap(),
""));
- return new ChangelogWithKeyFileStoreTable(
- FileIOFinder.find(tablePath), tablePath, tableSchema);
+ return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, tableSchema);
}
private FileStoreTable createFileStoreTable(Consumer<Options> configure,
RowType rowType)
@@ -1137,7 +1136,6 @@ public class ChangelogWithKeyFileStoreTableTest extends
FileStoreTableTestBase {
Arrays.asList("pt", "a"),
conf.toMap(),
""));
- return new ChangelogWithKeyFileStoreTable(
- FileIOFinder.find(tablePath), tablePath, tableSchema);
+ return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, tableSchema);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
similarity index 95%
rename from
paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
index 3a91bfd4a..8b3e3726f 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogWithKeyTableColumnTypeFileMetaTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java
@@ -37,8 +37,8 @@ import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
-/** File meta tests for column type evolution in {@link
ChangelogWithKeyFileStoreTable}. */
-public class ChangelogWithKeyTableColumnTypeFileMetaTest extends
ColumnTypeFileMetaTestBase {
+/** File meta tests for column type evolution in {@link
PrimaryKeyFileStoreTable}. */
+public class PrimaryKeyTableColumnTypeFileMetaTest extends
ColumnTypeFileMetaTestBase {
@BeforeEach
public void before() throws Exception {
@@ -48,7 +48,7 @@ public class ChangelogWithKeyTableColumnTypeFileMetaTest
extends ColumnTypeFileM
@Override
protected FileStoreTable createFileStoreTable(Map<Long, TableSchema>
tableSchemas) {
SchemaManager schemaManager = new TestingSchemaManager(tablePath,
tableSchemas);
- return new ChangelogWithKeyFileStoreTable(fileIO, tablePath,
schemaManager.latest().get()) {
+ return new PrimaryKeyFileStoreTable(fileIO, tablePath,
schemaManager.latest().get()) {
@Override
protected SchemaManager schemaManager() {
return schemaManager;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
index 114777609..adb0c6b8b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/WritePreemptMemoryTest.java
@@ -44,7 +44,7 @@ import java.util.function.Consumer;
import static org.assertj.core.api.Assertions.assertThat;
-/** Tests for {@link ChangelogWithKeyFileStoreTable}. */
+/** Tests for {@link PrimaryKeyFileStoreTable}. */
public class WritePreemptMemoryTest extends FileStoreTableTestBase {
@Test
@@ -57,7 +57,7 @@ public class WritePreemptMemoryTest extends
FileStoreTableTestBase {
testWritePreemptMemory(true);
}
- @Override // this has been tested in ChangelogWithKeyFileStoreTableTest
+ @Override // this has been tested in PrimaryKeyFileStoreTableTest
@Test
public void testReadFilter() {}
@@ -106,7 +106,7 @@ public class WritePreemptMemoryTest extends
FileStoreTableTestBase {
Arrays.asList("pt", "a"),
conf.toMap(),
""));
- return new
ChangelogWithKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, schema);
+ return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, schema);
}
@Override
@@ -127,6 +127,6 @@ public class WritePreemptMemoryTest extends
FileStoreTableTestBase {
Arrays.asList("pk", "pt0", "pt1"),
conf.toMap(),
""));
- return new
ChangelogWithKeyFileStoreTable(FileIOFinder.find(tablePath), tablePath, schema);
+ return new PrimaryKeyFileStoreTable(FileIOFinder.find(tablePath),
tablePath, schema);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
index 9e6b861dd..4193ce145 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java
@@ -30,7 +30,7 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
-import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
+import org.apache.paimon.table.PrimaryKeyFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.TableUtils;
import org.apache.paimon.table.sink.BatchWriteBuilder;
@@ -91,9 +91,9 @@ public class FlinkTableSink extends FlinkTableSinkBase
// Since only UPDATE_AFTER type messages can be received at present,
// AppendOnlyFileStoreTable cannot correctly handle old data, so they
are marked as
// unsupported. Similarly, it is not allowed to update the primary key
column when updating
- // the column of ChangelogWithKeyFileStoreTable, because the old data
cannot be handled
+ // the column of PrimaryKeyFileStoreTable, because the old data cannot
be handled
// correctly.
- if (table instanceof ChangelogWithKeyFileStoreTable) {
+ if (table instanceof PrimaryKeyFileStoreTable) {
Options options = Options.fromMap(table.options());
Set<String> primaryKeys = new HashSet<>(table.primaryKeys());
updatedColumns.forEach(
@@ -184,7 +184,7 @@ public class FlinkTableSink extends FlinkTableSinkBase
}
private void validateDeletable() {
- if (table instanceof ChangelogWithKeyFileStoreTable) {
+ if (table instanceof PrimaryKeyFileStoreTable) {
Options options = Options.fromMap(table.options());
if (options.get(MERGE_ENGINE) == MergeEngine.DEDUPLICATE) {
return;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
index 042b736b7..be6628bde 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSinkBase.java
@@ -27,8 +27,8 @@ import org.apache.paimon.flink.log.LogSinkProvider;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
-import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.PrimaryKeyFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -79,7 +79,7 @@ public abstract class FlinkTableSinkBase
// Don't check this, for example, only inserts are available from
the database, but the
// plan phase contains all changelogs
return requestedMode;
- } else if (table instanceof ChangelogWithKeyFileStoreTable) {
+ } else if (table instanceof PrimaryKeyFileStoreTable) {
Options options = Options.fromMap(table.options());
if (options.get(CHANGELOG_PRODUCER) == ChangelogProducer.INPUT) {
return requestedMode;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
index aab614f03..b113ab09c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java
@@ -28,7 +28,7 @@ import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.mergetree.SortBufferWriteBuffer;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.ChangelogWithKeyTableUtils;
+import org.apache.paimon.table.PrimaryKeyTableUtils;
import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
@@ -77,8 +77,7 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
public void open() throws Exception {
super.open();
- RowType keyType =
-
ChangelogWithKeyTableUtils.addKeyNamePrefix(schema.logicalPrimaryKeysType());
+ RowType keyType =
PrimaryKeyTableUtils.addKeyNamePrefix(schema.logicalPrimaryKeysType());
RowType valueType = schema.logicalRowType();
CoreOptions options = new CoreOptions(schema.options());
@@ -89,7 +88,7 @@ public class LocalMergeOperator extends
AbstractStreamOperator<InternalRow>
recordCount = 0;
sequenceGenerator = SequenceGenerator.create(schema, options);
- mergeFunction =
ChangelogWithKeyTableUtils.createMergeFunctionFactory(schema).create();
+ mergeFunction =
PrimaryKeyTableUtils.createMergeFunctionFactory(schema).create();
buffer =
new SortBufferWriteBuffer(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
index 7c38d9a8c..dadd4c413 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sorter/OrderSorter.java
@@ -33,7 +33,7 @@ import org.apache.flink.table.data.RowData;
import java.util.List;
-import static
org.apache.paimon.table.ChangelogWithKeyTableUtils.addKeyNamePrefix;
+import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix;
/** Alphabetical order sorter to sort records by the given `orderColNames`. */
public class OrderSorter extends TableSorter {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
index ef9fb5a12..db5b10cae 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java
@@ -32,7 +32,7 @@ import
org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
-import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
+import org.apache.paimon.table.PrimaryKeyFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Projection;
@@ -140,7 +140,7 @@ public class DataTableSource extends FlinkTableSource {
if (table instanceof AppendOnlyFileStoreTable) {
return ChangelogMode.insertOnly();
- } else if (table instanceof ChangelogWithKeyFileStoreTable) {
+ } else if (table instanceof PrimaryKeyFileStoreTable) {
Options options = Options.fromMap(table.options());
if (new CoreOptions(options).mergeEngine() ==
CoreOptions.MergeEngine.FIRST_ROW) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
similarity index 99%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index e49c830d0..eba3fae70 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ChangelogWithKeyFileStoreTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -57,7 +57,7 @@ import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingO
import static org.assertj.core.api.Assertions.assertThat;
/** Tests for changelog table with primary keys. */
-public class ChangelogWithKeyFileStoreTableITCase extends AbstractTestBase {
+public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase {
// ------------------------------------------------------------------------
// Test Utilities
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
index 0c5cb408b..2c079f379 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/SortCompactActionForDynamicBucketITCase.java
@@ -27,8 +27,8 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
-import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.PrimaryKeyFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
@@ -60,7 +60,7 @@ public class SortCompactActionForDynamicBucketITCase extends
ActionITCaseBase {
List<ManifestEntry> files = ((FileStoreTable)
getTable()).store().newScan().plan().files();
List<ManifestEntry> filesFilter =
- ((ChangelogWithKeyFileStoreTable) getTable())
+ ((PrimaryKeyFileStoreTable) getTable())
.store()
.newScan()
.withValueFilter(predicate)
@@ -72,7 +72,7 @@ public class SortCompactActionForDynamicBucketITCase extends
ActionITCaseBase {
List<ManifestEntry> filesZorder =
((FileStoreTable) getTable()).store().newScan().plan().files();
List<ManifestEntry> filesFilterZorder =
- ((ChangelogWithKeyFileStoreTable) getTable())
+ ((PrimaryKeyFileStoreTable) getTable())
.store()
.newScan()
.withValueFilter(predicate)
@@ -94,7 +94,7 @@ public class SortCompactActionForDynamicBucketITCase extends
ActionITCaseBase {
order(Arrays.asList("f2", "f1"));
List<ManifestEntry> files = ((FileStoreTable)
getTable()).store().newScan().plan().files();
List<ManifestEntry> filesFilter =
- ((ChangelogWithKeyFileStoreTable) getTable())
+ ((PrimaryKeyFileStoreTable) getTable())
.store()
.newScan()
.withValueFilter(predicate)
@@ -106,7 +106,7 @@ public class SortCompactActionForDynamicBucketITCase
extends ActionITCaseBase {
List<ManifestEntry> filesZorder =
((FileStoreTable) getTable()).store().newScan().plan().files();
List<ManifestEntry> filesFilterZorder =
- ((ChangelogWithKeyFileStoreTable) getTable())
+ ((PrimaryKeyFileStoreTable) getTable())
.store()
.newScan()
.withValueFilter(predicate)
@@ -131,7 +131,7 @@ public class SortCompactActionForDynamicBucketITCase
extends ActionITCaseBase {
List<ManifestEntry> files = ((FileStoreTable)
getTable()).store().newScan().plan().files();
List<ManifestEntry> filesFilter =
- ((ChangelogWithKeyFileStoreTable) getTable())
+ ((PrimaryKeyFileStoreTable) getTable())
.store()
.newScan()
.withValueFilter(predicate)
@@ -143,7 +143,7 @@ public class SortCompactActionForDynamicBucketITCase
extends ActionITCaseBase {
List<ManifestEntry> filesZorder =
((FileStoreTable) getTable()).store().newScan().plan().files();
List<ManifestEntry> filesFilterZorder =
- ((ChangelogWithKeyFileStoreTable) getTable())
+ ((PrimaryKeyFileStoreTable) getTable())
.store()
.newScan()
.withValueFilter(predicate)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/ChangelogWithKeyTableStatisticsTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
similarity index 89%
rename from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/ChangelogWithKeyTableStatisticsTest.java
rename to
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
index 489a91d65..bb58ea823 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/ChangelogWithKeyTableStatisticsTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java
@@ -25,17 +25,17 @@ import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.ChangelogWithKeyFileStoreTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.PrimaryKeyFileStoreTable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
-/** Statistics tests for {@link ChangelogWithKeyFileStoreTable}. */
-public class ChangelogWithKeyTableStatisticsTest extends
FileStoreTableStatisticsTestBase {
+/** Statistics tests for {@link PrimaryKeyFileStoreTable}. */
+public class PrimaryKeyTableStatisticsTest extends
FileStoreTableStatisticsTestBase {
- /** {@link ChangelogWithKeyFileStoreTable} does not support filter value.
*/
+ /** {@link PrimaryKeyFileStoreTable} does not support filter value. */
@Test
public void testTableFilterValueStatistics() throws Exception {
FileStoreTable table = writeData();