This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 01bc12a0b [core] Rename emptyWriter to ignorePreviousFiles (#1381)
01bc12a0b is described below
commit 01bc12a0b40287f41bfa25e0ff45ed84cc476d78
Author: dohongdayi <[email protected]>
AuthorDate: Thu Jun 15 13:58:34 2023 +0800
[core] Rename emptyWriter to ignorePreviousFiles (#1381)
---
.../org/apache/paimon/operation/AbstractFileStoreWrite.java | 12 ++++++------
.../java/org/apache/paimon/operation/FileStoreWrite.java | 6 +++---
.../org/apache/paimon/table/AppendOnlyFileStoreTable.java | 2 +-
.../org/apache/paimon/table/sink/BatchWriteBuilderImpl.java | 2 +-
.../java/org/apache/paimon/table/sink/InnerTableWrite.java | 2 +-
.../java/org/apache/paimon/table/sink/TableWriteImpl.java | 4 ++--
.../src/test/java/org/apache/paimon/TestFileStore.java | 7 ++++---
.../java/org/apache/paimon/table/FileStoreTableTestBase.java | 4 ++--
.../snapshot/ContinuousCompactorFollowUpScannerTest.java | 2 +-
.../main/java/org/apache/paimon/flink/sink/FlinkSink.java | 10 +++++-----
.../paimon/flink/sink/GlobalFullCompactionSinkWrite.java | 4 ++--
.../org/apache/paimon/flink/sink/StoreSinkWriteImpl.java | 8 ++++----
12 files changed, 32 insertions(+), 31 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 093b96fe7..c66a1d1ae 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -70,7 +70,7 @@ public abstract class AbstractFileStoreWrite<T>
protected final Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers;
private ExecutorService lazyCompactExecutor;
- private boolean emptyWriter = false;
+ private boolean ignorePreviousFiles = false;
protected AbstractFileStoreWrite(
String commitUser,
@@ -97,8 +97,8 @@ public abstract class AbstractFileStoreWrite<T>
}
@Override
- public void fromEmptyWriter(boolean emptyWriter) {
- this.emptyWriter = emptyWriter;
+ public void withIgnorePreviousFiles(boolean ignorePreviousFiles) {
+ this.ignorePreviousFiles = ignorePreviousFiles;
}
@Override
@@ -303,19 +303,19 @@ public abstract class AbstractFileStoreWrite<T>
writers.put(partition.copy(), buckets);
}
return buckets.computeIfAbsent(
- bucket, k -> createWriterContainer(partition.copy(), bucket,
emptyWriter));
+ bucket, k -> createWriterContainer(partition.copy(), bucket,
ignorePreviousFiles));
}
@VisibleForTesting
public WriterContainer<T> createWriterContainer(
- BinaryRow partition, int bucket, boolean emptyWriter) {
+ BinaryRow partition, int bucket, boolean ignorePreviousFiles) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating writer for partition {}, bucket {}",
partition, bucket);
}
Long latestSnapshotId = snapshotManager.latestSnapshotId();
List<DataFileMeta> restoreFiles = new ArrayList<>();
- if (!emptyWriter && latestSnapshotId != null) {
+ if (!ignorePreviousFiles && latestSnapshotId != null) {
restoreFiles = scanExistingFileMetas(latestSnapshotId, partition,
bucket);
}
IndexMaintainer<T> indexMaintainer =
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index 1e921186c..d7cd64250 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -47,11 +47,11 @@ public interface FileStoreWrite<T> {
FileStoreWrite<T> withMemoryPool(MemorySegmentPool memoryPool);
/**
- * Set writer to be empty, the writer will not search restored files.
+ * Set whether the write operation should ignore previously stored files.
*
- * @param emptyWriter set flag to tag the writer is empty.
+ * @param ignorePreviousFiles whether the write operation should ignore
previously stored files.
*/
- void fromEmptyWriter(boolean emptyWriter);
+ void withIgnorePreviousFiles(boolean ignorePreviousFiles);
/**
* Write the data to the store according to the partition and bucket.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index cd342db5f..c5ceb8eab 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -134,7 +134,7 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
// if this table is non-bucket table, we skip compaction and restored
files searching
if (bucketMode() == BucketMode.UNAWARE) {
writer.skipCompaction();
- writer.fromEmptyWriter(true);
+ writer.withIgnorePreviousFiles(true);
}
return new TableWriteImpl<>(
writer,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index 437a6561d..4acd6be33 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -58,7 +58,7 @@ public class BatchWriteBuilderImpl implements
BatchWriteBuilder {
@Override
public BatchTableWrite newWrite() {
- return table.newWrite(commitUser).fromEmptyWriter(staticPartition !=
null);
+ return
table.newWrite(commitUser).withIgnorePreviousFiles(staticPartition != null);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
index 87aca40c1..69eb61f97 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableWrite.java
@@ -21,5 +21,5 @@ package org.apache.paimon.table.sink;
/** Inner {@link TableWrite} contains overwrite setter. */
public interface InnerTableWrite extends StreamTableWrite, BatchTableWrite {
- InnerTableWrite fromEmptyWriter(boolean emptyWriter);
+ InnerTableWrite withIgnorePreviousFiles(boolean ignorePreviousFiles);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index e3b524b9b..1103509b2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -56,8 +56,8 @@ public class TableWriteImpl<T>
}
@Override
- public TableWriteImpl<T> fromEmptyWriter(boolean emptyWriter) {
- write.fromEmptyWriter(emptyWriter);
+ public TableWriteImpl<T> withIgnorePreviousFiles(boolean
ignorePreviousFiles) {
+ write.withIgnorePreviousFiles(ignorePreviousFiles);
return this;
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 68ce6e190..3ab8227c3 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -239,7 +239,7 @@ public class TestFileStore extends KeyValueFileStore {
List<KeyValue> kvs,
Function<KeyValue, BinaryRow> partitionCalculator,
Function<KeyValue, Integer> bucketCalculator,
- boolean emptyWriter,
+ boolean ignorePreviousFiles,
Long identifier,
Long watermark,
List<IndexFileMeta> indexFiles,
@@ -257,7 +257,7 @@ public class TestFileStore extends KeyValueFileStore {
if (w == null) {
RecordWriter<KeyValue> writer =
write.createWriterContainer(
- partition, bucket,
emptyWriter)
+ partition, bucket,
ignorePreviousFiles)
.writer;
((MemoryOwner) writer)
.setMemoryPool(
@@ -280,7 +280,8 @@ public class TestFileStore extends KeyValueFileStore {
writers.entrySet()) {
for (Map.Entry<Integer, RecordWriter<KeyValue>> entryWithBucket :
entryWithPartition.getValue().entrySet()) {
- CommitIncrement increment =
entryWithBucket.getValue().prepareCommit(emptyWriter);
+ CommitIncrement increment =
+
entryWithBucket.getValue().prepareCommit(ignorePreviousFiles);
committable.addFileCommittable(
new CommitMessageImpl(
entryWithPartition.getKey(),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index f180e7e79..610f8cc6b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -282,7 +282,7 @@ public abstract class FileStoreTableTestBase {
}
// overwrite data
- try (StreamTableWrite write =
table.newWrite(commitUser).fromEmptyWriter(true);
+ try (StreamTableWrite write =
table.newWrite(commitUser).withIgnorePreviousFiles(true);
InnerTableCommit commit = table.newCommit(commitUser)) {
for (InternalRow row : overwriteData) {
write.write(row);
@@ -314,7 +314,7 @@ public abstract class FileStoreTableTestBase {
commit.commit(0, write.prepareCommit(true, 0));
write.close();
- write = table.newWrite(commitUser).fromEmptyWriter(true);
+ write = table.newWrite(commitUser).withIgnorePreviousFiles(true);
commit = table.newCommit(commitUser);
write.write(rowData(2, 21, 201L));
Map<String, String> overwritePartition = new HashMap<>();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
index 58928b243..9e4ac18f4 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
@@ -67,7 +67,7 @@ public class ContinuousCompactorFollowUpScannerTest extends
ScannerTestBase {
Map<String, String> overwritePartition = new HashMap<>();
overwritePartition.put("pt", "1");
- write = table.newWrite(commitUser).fromEmptyWriter(true);
+ write = table.newWrite(commitUser).withIgnorePreviousFiles(true);
commit = table.newCommit(commitUser).withOverwrite(overwritePartition);
write.write(rowData(1, 10, 101L));
write.write(rowData(1, 20, 201L));
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index a88523219..37b1f16ed 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -59,11 +59,11 @@ public abstract class FlinkSink<T> implements Serializable {
private static final String GLOBAL_COMMITTER_NAME = "Global Committer";
protected final FileStoreTable table;
- private final boolean emptyWriter;
+ private final boolean ignorePreviousFiles;
- public FlinkSink(FileStoreTable table, boolean emptyWriter) {
+ public FlinkSink(FileStoreTable table, boolean ignorePreviousFiles) {
this.table = table;
- this.emptyWriter = emptyWriter;
+ this.ignorePreviousFiles = ignorePreviousFiles;
}
private StoreSinkWrite.Provider createWriteProvider(CheckpointConfig
checkpointConfig) {
@@ -97,7 +97,7 @@ public abstract class FlinkSink<T> implements Serializable {
commitUser,
state,
ioManager,
- emptyWriter,
+ ignorePreviousFiles,
waitCompaction,
finalDeltaCommits,
memoryPool);
@@ -110,7 +110,7 @@ public abstract class FlinkSink<T> implements Serializable {
commitUser,
state,
ioManager,
- emptyWriter,
+ ignorePreviousFiles,
waitCompaction,
memoryPool);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
index 8ed101b8f..fd568ae05 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
@@ -69,11 +69,11 @@ public class GlobalFullCompactionSinkWrite extends
StoreSinkWriteImpl {
String commitUser,
StoreSinkWriteState state,
IOManager ioManager,
- boolean emptyWriter,
+ boolean ignorePreviousFiles,
boolean waitCompaction,
int deltaCommits,
@Nullable MemorySegmentPool memoryPool) {
- super(table, commitUser, state, ioManager, emptyWriter,
waitCompaction, memoryPool);
+ super(table, commitUser, state, ioManager, ignorePreviousFiles,
waitCompaction, memoryPool);
this.deltaCommits = deltaCommits;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 3b2baf0b3..3b376d707 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -48,7 +48,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
protected final String commitUser;
protected final StoreSinkWriteState state;
private final IOManager ioManager;
- private final boolean emptyWriter;
+ private final boolean ignorePreviousFiles;
private final boolean waitCompaction;
@Nullable private final MemorySegmentPool memoryPool;
@@ -59,13 +59,13 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
String commitUser,
StoreSinkWriteState state,
IOManager ioManager,
- boolean emptyWriter,
+ boolean ignorePreviousFiles,
boolean waitCompaction,
@Nullable MemorySegmentPool memoryPool) {
this.commitUser = commitUser;
this.state = state;
this.ioManager = ioManager;
- this.emptyWriter = emptyWriter;
+ this.ignorePreviousFiles = ignorePreviousFiles;
this.waitCompaction = waitCompaction;
this.memoryPool = memoryPool;
this.write = newTableWrite(table);
@@ -83,7 +83,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
: new HeapMemorySegmentPool(
table.coreOptions().writeBufferSize(),
table.coreOptions().pageSize()))
- .fromEmptyWriter(emptyWriter);
+ .withIgnorePreviousFiles(ignorePreviousFiles);
}
@Override