This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a74615423 [spark][core] spark compact with deletion vector (#3971)
a74615423 is described below
commit a7461542392130bbb00aefe1aeb74bcf2edf992c
Author: Yann Byron <[email protected]>
AuthorDate: Tue Aug 20 18:51:31 2024 +0800
[spark][core] spark compact with deletion vector (#3971)
---
.../org/apache/paimon/AppendOnlyFileStore.java | 8 ++
.../org/apache/paimon/append/AppendOnlyWriter.java | 26 +++-
.../append/BucketedAppendCompactManager.java | 137 ++++++++++++++++-----
.../paimon/append/UnawareAppendCompactionTask.java | 55 ++++++++-
.../UnawareAppendTableCompactionCoordinator.java | 74 +++++++++--
.../org/apache/paimon/compact/CompactResult.java | 20 +++
.../DeletionVectorIndexFileWriter.java | 4 +
.../deletionvectors/DeletionVectorsIndexFile.java | 2 +-
.../append/AppendDeletionFileMaintainer.java | 17 ++-
.../BucketedAppendDeletionFileMaintainer.java | 26 +++-
.../UnawareAppendDeletionFileMaintainer.java | 54 ++++++--
.../paimon/manifest/IndexManifestFileHandler.java | 21 +++-
.../apache/paimon/mergetree/MergeTreeWriter.java | 2 +-
.../paimon/operation/AbstractFileStoreWrite.java | 10 +-
.../paimon/operation/AppendOnlyFileStoreWrite.java | 83 +++++++++----
.../paimon/operation/KeyValueFileStoreWrite.java | 1 +
.../paimon/table/AppendOnlyFileStoreTable.java | 4 +-
.../org/apache/paimon/utils/CommitIncrement.java | 15 +++
.../append/AppendOnlyTableCompactionITTest.java | 2 +-
.../apache/paimon/append/AppendOnlyWriterTest.java | 1 +
.../append/BucketedAppendCompactManagerTest.java | 1 +
.../apache/paimon/append/FullCompactTaskTest.java | 2 +-
.../append/AppendDeletionFileMaintainerTest.java | 6 +-
.../apache/paimon/format/FileFormatSuffixTest.java | 2 +-
.../flink/compact/UnawareBucketCompactor.java | 2 +-
.../paimon/spark/procedure/CompactProcedure.java | 2 +-
.../paimon/spark/commands/PaimonSparkWriter.scala | 2 +-
.../paimon/spark/sql/DeletionVectorTest.scala | 90 ++++++++++----
28 files changed, 531 insertions(+), 138 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 3cd7bb3b6..1e50da660 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -19,6 +19,7 @@
package org.apache.paimon;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.manifest.ManifestCacheFilter;
@@ -93,6 +94,11 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
@Override
public AppendOnlyFileStoreWrite newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
+ DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory =
null;
+ if (options.deletionVectorsEnabled()) {
+ deletionVectorsMaintainerFactory =
+ new
DeletionVectorsMaintainer.Factory(newIndexFileHandler());
+ }
return new AppendOnlyFileStoreWrite(
fileIO,
newRead(),
@@ -103,6 +109,8 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
snapshotManager(),
newScan(true).withManifestCacheFilter(manifestFilter),
options,
+ bucketMode(),
+ deletionVectorsMaintainerFactory,
tableName);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 136cfd246..aeb55d497 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -27,10 +27,12 @@ import org.apache.paimon.disk.RowBuffer;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.memory.MemoryOwner;
@@ -75,6 +77,8 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
private final List<DataFileMeta> deletedFiles;
private final List<DataFileMeta> compactBefore;
private final List<DataFileMeta> compactAfter;
+ private final List<IndexFileMeta> indexFilesBefore;
+ private final List<IndexFileMeta> indexFilesAfter;
private final LongCounter seqNumCounter;
private final String fileCompression;
private final String spillCompression;
@@ -121,6 +125,8 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
this.deletedFiles = new ArrayList<>();
this.compactBefore = new ArrayList<>();
this.compactAfter = new ArrayList<>();
+ this.indexFilesBefore = new ArrayList<>();
+ this.indexFilesAfter = new ArrayList<>();
this.seqNumCounter = new LongCounter(maxSequenceNumber + 1);
this.fileCompression = fileCompression;
this.spillCompression = spillCompression;
@@ -139,6 +145,10 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
deletedFiles.addAll(increment.newFilesIncrement().deletedFiles());
compactBefore.addAll(increment.compactIncrement().compactBefore());
compactAfter.addAll(increment.compactIncrement().compactAfter());
+ if (increment.indexIncrement() != null) {
+
indexFilesBefore.addAll(increment.indexIncrement().deletedIndexFiles());
+
indexFilesAfter.addAll(increment.indexIncrement().newIndexFiles());
+ }
}
}
@@ -276,6 +286,11 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
result -> {
compactBefore.addAll(result.before());
compactAfter.addAll(result.after());
+ if (result.indexIncrement() != null) {
+ indexFilesBefore.addAll(
+
result.indexIncrement().deletedIndexFiles());
+
indexFilesAfter.addAll(result.indexIncrement().newIndexFiles());
+ }
});
}
@@ -291,12 +306,21 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
new ArrayList<>(compactAfter),
Collections.emptyList());
+ IndexIncrement indexIncrement = null;
+ if (!indexFilesBefore.isEmpty() || !indexFilesAfter.isEmpty()) {
+ indexIncrement =
+ new IndexIncrement(
+ new ArrayList<>(indexFilesAfter), new
ArrayList<>(indexFilesBefore));
+ }
+
newFiles.clear();
deletedFiles.clear();
compactBefore.clear();
compactAfter.clear();
+ indexFilesBefore.clear();
+ indexFilesAfter.clear();
- return new CommitIncrement(dataIncrement, compactIncrement, null);
+ return new CommitIncrement(dataIncrement, compactIncrement,
indexIncrement, null);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
index d44793613..ea48e608e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
@@ -23,9 +23,15 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactFutureManager;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.compact.CompactTask;
+import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer;
+import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.operation.metrics.MetricUtils;
+import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
@@ -36,6 +42,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
@@ -52,6 +59,7 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
private static final int FULL_COMPACT_MIN_FILE = 3;
private final ExecutorService executor;
+ private final AppendDeletionFileMaintainer dvIndexFileMaintainer;
private final TreeSet<DataFileMeta> toCompact;
private final int minFileNum;
private final int maxFileNum;
@@ -65,12 +73,14 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
public BucketedAppendCompactManager(
ExecutorService executor,
List<DataFileMeta> restored,
+ @Nullable AppendDeletionFileMaintainer dvIndexFileMaintainer,
int minFileNum,
int maxFileNum,
long targetFileSize,
CompactRewriter rewriter,
@Nullable CompactionMetrics.Reporter metricsReporter) {
this.executor = executor;
+ this.dvIndexFileMaintainer = dvIndexFileMaintainer;
this.toCompact = new TreeSet<>(fileComparator(false));
this.toCompact.addAll(restored);
this.minFileNum = minFileNum;
@@ -94,13 +104,20 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
taskFuture == null,
"A compaction task is still running while the user "
+ "forces a new compaction. This is unexpected.");
- if (toCompact.size() < FULL_COMPACT_MIN_FILE) {
+ // if deletion vector enables, always trigger compaction.
+ if (toCompact.isEmpty()
+ || (dvIndexFileMaintainer == null && toCompact.size() <
FULL_COMPACT_MIN_FILE)) {
return;
}
taskFuture =
executor.submit(
- new FullCompactTask(toCompact, targetFileSize,
rewriter, metricsReporter));
+ new FullCompactTask(
+ dvIndexFileMaintainer,
+ toCompact,
+ targetFileSize,
+ rewriter,
+ metricsReporter));
compacting = new ArrayList<>(toCompact);
toCompact.clear();
}
@@ -113,7 +130,9 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
if (picked.isPresent()) {
compacting = picked.get();
taskFuture =
- executor.submit(new AutoCompactTask(compacting, rewriter,
metricsReporter));
+ executor.submit(
+ new AutoCompactTask(
+ dvIndexFileMaintainer, compacting,
rewriter, metricsReporter));
}
}
@@ -207,17 +226,20 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
/** A {@link CompactTask} impl for full compaction of append-only table. */
public static class FullCompactTask extends CompactTask {
- private final LinkedList<DataFileMeta> inputs;
+ private final AppendDeletionFileMaintainer dvIndexFileMaintainer;
+ private final LinkedList<DataFileMeta> toCompact;
private final long targetFileSize;
private final CompactRewriter rewriter;
public FullCompactTask(
+ AppendDeletionFileMaintainer dvIndexFileMaintainer,
Collection<DataFileMeta> inputs,
long targetFileSize,
CompactRewriter rewriter,
@Nullable CompactionMetrics.Reporter metricsReporter) {
super(metricsReporter);
- this.inputs = new LinkedList<>(inputs);
+ this.dvIndexFileMaintainer = dvIndexFileMaintainer;
+ this.toCompact = new LinkedList<>(inputs);
this.targetFileSize = targetFileSize;
this.rewriter = rewriter;
}
@@ -225,34 +247,42 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
@Override
protected CompactResult doCompact() throws Exception {
// remove large files
- while (!inputs.isEmpty()) {
- DataFileMeta file = inputs.peekFirst();
- if (file.fileSize() >= targetFileSize) {
- inputs.poll();
+ while (!toCompact.isEmpty()) {
+ DataFileMeta file = toCompact.peekFirst();
+ // the data file with deletion file always need to be
compacted.
+ if (file.fileSize() >= targetFileSize &&
!hasDeletionFile(file)) {
+ toCompact.poll();
continue;
}
break;
}
- // compute small files
- int big = 0;
- int small = 0;
- for (DataFileMeta file : inputs) {
- if (file.fileSize() >= targetFileSize) {
- big++;
+ // do compaction
+ if (dvIndexFileMaintainer != null) {
+ // if deletion vector enables, always trigger compaction.
+ return compact(dvIndexFileMaintainer, toCompact, rewriter);
+ } else {
+ // compute small files
+ int big = 0;
+ int small = 0;
+ for (DataFileMeta file : toCompact) {
+ if (file.fileSize() >= targetFileSize) {
+ big++;
+ } else {
+ small++;
+ }
+ }
+ if (small > big && toCompact.size() >= FULL_COMPACT_MIN_FILE) {
+ return compact(dvIndexFileMaintainer, toCompact, rewriter);
} else {
- small++;
+ return result(Collections.emptyList(),
Collections.emptyList());
}
}
+ }
- // do compaction
- List<DataFileMeta> compactBefore = new ArrayList<>();
- List<DataFileMeta> compactAfter = new ArrayList<>();
- if (small > big && inputs.size() >= FULL_COMPACT_MIN_FILE) {
- compactBefore = new ArrayList<>(inputs);
- compactAfter = rewriter.rewrite(inputs);
- }
- return result(new ArrayList<>(compactBefore), compactAfter);
+ private boolean hasDeletionFile(DataFileMeta file) {
+ return dvIndexFileMaintainer != null
+ && dvIndexFileMaintainer.getDeletionFile(file.fileName())
== null;
}
}
@@ -265,36 +295,75 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
*/
public static class AutoCompactTask extends CompactTask {
+ private final AppendDeletionFileMaintainer dvIndexFileMaintainer;
private final List<DataFileMeta> toCompact;
private final CompactRewriter rewriter;
public AutoCompactTask(
+ AppendDeletionFileMaintainer dvIndexFileMaintainer,
List<DataFileMeta> toCompact,
CompactRewriter rewriter,
@Nullable CompactionMetrics.Reporter metricsReporter) {
super(metricsReporter);
+ this.dvIndexFileMaintainer = dvIndexFileMaintainer;
this.toCompact = toCompact;
this.rewriter = rewriter;
}
@Override
protected CompactResult doCompact() throws Exception {
+ return compact(dvIndexFileMaintainer, toCompact, rewriter);
+ }
+ }
+
+ private static CompactResult compact(
+ AppendDeletionFileMaintainer dvIndexFileMaintainer,
+ List<DataFileMeta> toCompact,
+ CompactRewriter rewriter)
+ throws Exception {
+ if (dvIndexFileMaintainer == null) {
return result(toCompact, rewriter.rewrite(toCompact));
+ } else {
+ List<DeletionFile> deletionFiles = new ArrayList<>();
+ for (DataFileMeta dataFile : toCompact) {
+
deletionFiles.add(dvIndexFileMaintainer.getDeletionFile(dataFile.fileName()));
+ }
+ List<DataFileMeta> compactAfter = rewriter.rewrite(toCompact);
+ toCompact.forEach(f ->
dvIndexFileMaintainer.notifyRemovedDeletionVector(f.fileName()));
+
+ List<IndexManifestEntry> indexManifestEntries =
dvIndexFileMaintainer.persist();
+ if (indexManifestEntries.isEmpty()) {
+ return result(toCompact, compactAfter);
+ } else {
+ List<IndexFileMeta> indexFilesBefore = new ArrayList<>();
+ List<IndexFileMeta> indexFilesAfter = new ArrayList<>();
+ for (IndexManifestEntry entry : indexManifestEntries) {
+ if (entry.kind() == FileKind.ADD) {
+ indexFilesAfter.add(entry.indexFile());
+ } else {
+ indexFilesBefore.add(entry.indexFile());
+ }
+ }
+ return result(toCompact, indexFilesBefore, compactAfter,
indexFilesAfter);
+ }
}
}
private static CompactResult result(List<DataFileMeta> before,
List<DataFileMeta> after) {
- return new CompactResult() {
- @Override
- public List<DataFileMeta> before() {
- return before;
- }
+ return new CompactResult(before, after);
+ }
- @Override
- public List<DataFileMeta> after() {
- return after;
- }
- };
+ private static CompactResult result(
+ List<DataFileMeta> before,
+ @Nullable List<IndexFileMeta> indexFilesBefore,
+ List<DataFileMeta> after,
+ @Nullable List<IndexFileMeta> indexFilesAfter) {
+ CompactResult result = new CompactResult(before, after);
+ if (indexFilesBefore != null || indexFilesAfter != null) {
+ IndexIncrement indexIncrement = new
IndexIncrement(indexFilesAfter, indexFilesBefore);
+ result.setIndexIncrement(indexIncrement);
+ }
+ return result;
}
/** Compact rewriter for append-only table. */
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java
index 3d81cdbf6..5b949cb25 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendCompactionTask.java
@@ -19,10 +19,17 @@
package org.apache.paimon.append;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer;
+import
org.apache.paimon.deletionvectors.append.UnawareAppendDeletionFileMaintainer;
+import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.Preconditions;
@@ -31,6 +38,9 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;
/** Compaction task generated by {@link
UnawareAppendTableCompactionCoordinator}. */
public class UnawareAppendCompactionTask {
@@ -40,9 +50,7 @@ public class UnawareAppendCompactionTask {
private final List<DataFileMeta> compactAfter;
public UnawareAppendCompactionTask(BinaryRow partition, List<DataFileMeta>
files) {
- Preconditions.checkArgument(
- files != null && files.size() > 1,
- "AppendOnlyCompactionTask need more than one file input.");
+ Preconditions.checkArgument(files != null);
this.partition = partition;
compactBefore = new ArrayList<>(files);
compactAfter = new ArrayList<>();
@@ -60,8 +68,42 @@ public class UnawareAppendCompactionTask {
return compactAfter;
}
- public CommitMessage doCompact(AppendOnlyFileStoreWrite write) throws
Exception {
- compactAfter.addAll(write.compactRewrite(partition, 0, compactBefore));
+ public CommitMessage doCompact(FileStoreTable table,
AppendOnlyFileStoreWrite write)
+ throws Exception {
+ boolean dvEnabled = table.coreOptions().deletionVectorsEnabled();
+ Preconditions.checkArgument(
+ dvEnabled || compactBefore.size() > 1,
+ "AppendOnlyCompactionTask need more than one file input.");
+ IndexIncrement indexIncrement;
+ if (dvEnabled) {
+ UnawareAppendDeletionFileMaintainer dvIndexFileMaintainer =
+ (UnawareAppendDeletionFileMaintainer)
+ AppendDeletionFileMaintainer.forUnawareAppend(
+ table.store().newIndexFileHandler(),
+ table.snapshotManager().latestSnapshotId(),
+ partition);
+ compactAfter.addAll(
+ write.compactRewrite(
+ partition, UNAWARE_BUCKET, dvIndexFileMaintainer,
compactBefore));
+
+ compactBefore.forEach(
+ f -> {
+
dvIndexFileMaintainer.notifyRemovedDeletionVector(f.fileName());
+ });
+ List<IndexManifestEntry> indexEntries =
dvIndexFileMaintainer.persist();
+ Preconditions.checkArgument(
+ indexEntries.stream().noneMatch(i -> i.kind() ==
FileKind.ADD));
+ List<IndexFileMeta> removed =
+ indexEntries.stream()
+ .map(IndexManifestEntry::indexFile)
+ .collect(Collectors.toList());
+ indexIncrement = new IndexIncrement(Collections.emptyList(),
removed);
+ } else {
+ compactAfter.addAll(
+ write.compactRewrite(partition, UNAWARE_BUCKET, null,
compactBefore));
+ indexIncrement = new IndexIncrement(Collections.emptyList());
+ }
+
CompactIncrement compactIncrement =
new CompactIncrement(compactBefore, compactAfter,
Collections.emptyList());
return new CommitMessageImpl(
@@ -69,7 +111,8 @@ public class UnawareAppendCompactionTask {
0, // bucket 0 is bucket for unaware-bucket table for
compatibility with the old
// design
DataIncrement.emptyIncrement(),
- compactIncrement);
+ compactIncrement,
+ indexIncrement);
}
public int hashCode() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
index 732300bd3..48239f6f3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/UnawareAppendTableCompactionCoordinator.java
@@ -21,6 +21,10 @@ package org.apache.paimon.append;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer;
+import
org.apache.paimon.deletionvectors.append.UnawareAppendDeletionFileMaintainer;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
@@ -38,6 +42,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -60,12 +65,15 @@ public class UnawareAppendTableCompactionCoordinator {
protected static final int REMOVE_AGE = 10;
protected static final int COMPACT_AGE = 5;
+ @Nullable private final Long snapshotId;
private final InnerTableScan scan;
private final long targetFileSize;
private final long compactionFileSize;
private final int minFileNum;
private final int maxFileNum;
private final boolean streamingMode;
+ private final IndexFileHandler indexFileHandler;
+ private final boolean deletionVectorEnabled;
final Map<BinaryRow, PartitionCompactCoordinator>
partitionCompactCoordinators =
new HashMap<>();
@@ -90,6 +98,7 @@ public class UnawareAppendTableCompactionCoordinator {
if (filter != null) {
scan.withFilter(filter);
}
+ this.snapshotId = table.snapshotManager().latestSnapshotId();
this.streamingMode = isStreaming;
CoreOptions coreOptions = table.coreOptions();
this.targetFileSize = coreOptions.targetFileSize(false);
@@ -97,6 +106,8 @@ public class UnawareAppendTableCompactionCoordinator {
this.minFileNum = coreOptions.compactionMinFileNum();
// this is global compaction, avoid too many compaction tasks
this.maxFileNum = coreOptions.compactionMaxFileNum().orElse(50);
+ this.indexFileHandler = table.store().newIndexFileHandler();
+ this.deletionVectorEnabled = coreOptions.deletionVectorsEnabled();
}
public List<UnawareAppendCompactionTask> run() {
@@ -130,12 +141,30 @@ public class UnawareAppendTableCompactionCoordinator {
@VisibleForTesting
void notifyNewFiles(BinaryRow partition, List<DataFileMeta> files) {
+ UnawareAppendDeletionFileMaintainer dvIndexFileMaintainer;
+ if (deletionVectorEnabled) {
+ dvIndexFileMaintainer =
+ (UnawareAppendDeletionFileMaintainer)
+ AppendDeletionFileMaintainer.forUnawareAppend(
+ indexFileHandler, snapshotId, partition);
+ } else {
+ dvIndexFileMaintainer = null;
+ }
+ java.util.function.Predicate<DataFileMeta> filter =
+ file -> {
+ if (dvIndexFileMaintainer == null
+ ||
dvIndexFileMaintainer.getDeletionFile(file.fileName()) == null) {
+ return file.fileSize() < compactionFileSize;
+ }
+ // if a data file has a deletion file, always be to
compact.
+ return true;
+ };
+ List<DataFileMeta> toCompact =
files.stream().filter(filter).collect(Collectors.toList());
partitionCompactCoordinators
- .computeIfAbsent(partition, PartitionCompactCoordinator::new)
- .addFiles(
- files.stream()
- .filter(file -> file.fileSize() <
compactionFileSize)
- .collect(Collectors.toList()));
+ .computeIfAbsent(
+ partition,
+ pp -> new
PartitionCompactCoordinator(dvIndexFileMaintainer, partition))
+ .addFiles(toCompact);
}
@VisibleForTesting
@@ -181,11 +210,14 @@ public class UnawareAppendTableCompactionCoordinator {
/** Coordinator for a single partition. */
class PartitionCompactCoordinator {
+ private final UnawareAppendDeletionFileMaintainer
dvIndexFileMaintainer;
private final BinaryRow partition;
private final HashSet<DataFileMeta> toCompact = new HashSet<>();
int age = 0;
- public PartitionCompactCoordinator(BinaryRow partition) {
+ public PartitionCompactCoordinator(
+ UnawareAppendDeletionFileMaintainer dvIndexFileMaintainer,
BinaryRow partition) {
+ this.dvIndexFileMaintainer = dvIndexFileMaintainer;
this.partition = partition;
}
@@ -216,7 +248,12 @@ public class UnawareAppendTableCompactionCoordinator {
}
private List<List<DataFileMeta>> agePack() {
- List<List<DataFileMeta>> packed = pack();
+ List<List<DataFileMeta>> packed;
+ if (dvIndexFileMaintainer == null) {
+ packed = pack(toCompact);
+ } else {
+ packed = packInDeletionVectorVMode(toCompact);
+ }
if (packed.isEmpty()) {
// non-packed, we need to grow up age, and check whether to
compact once
if (++age > COMPACT_AGE && toCompact.size() > 1) {
@@ -230,7 +267,7 @@ public class UnawareAppendTableCompactionCoordinator {
return packed;
}
- private List<List<DataFileMeta>> pack() {
+ private List<List<DataFileMeta>> pack(Set<DataFileMeta> toCompact) {
// we compact smaller files first
// step 1, sort files by file size, pick the smaller first
ArrayList<DataFileMeta> files = new ArrayList<>(toCompact);
@@ -252,6 +289,27 @@ public class UnawareAppendTableCompactionCoordinator {
return result;
}
+ private List<List<DataFileMeta>>
packInDeletionVectorVMode(Set<DataFileMeta> toCompact) {
+ // we group the data files by their related index files.
+ Map<IndexFileMeta, List<DataFileMeta>> filesWithDV = new
HashMap<>();
+ Set<DataFileMeta> rest = new HashSet<>();
+ for (DataFileMeta dataFile : toCompact) {
+ IndexFileMeta indexFile =
dvIndexFileMaintainer.getIndexFile(dataFile.fileName());
+ if (indexFile == null) {
+ rest.add(dataFile);
+ } else {
+ filesWithDV.computeIfAbsent(indexFile, f -> new
ArrayList<>()).add(dataFile);
+ }
+ }
+
+ List<List<DataFileMeta>> result = new ArrayList<>();
+ result.addAll(filesWithDV.values());
+ if (rest.size() > 1) {
+ result.addAll(pack(rest));
+ }
+ return result;
+ }
+
/**
* A file bin for {@link PartitionCompactCoordinator} determine
whether ready to compact.
*/
diff --git
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java
index 08d7de5da..4de5d7315 100644
--- a/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java
+++ b/paimon-core/src/main/java/org/apache/paimon/compact/CompactResult.java
@@ -19,6 +19,8 @@
package org.apache.paimon.compact;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.IndexIncrement;
+import org.apache.paimon.utils.Preconditions;
import javax.annotation.Nullable;
@@ -32,6 +34,9 @@ public class CompactResult {
private final List<DataFileMeta> before;
private final List<DataFileMeta> after;
private final List<DataFileMeta> changelog;
+ // TODO: unify IndexIncrement and CompactDeletionFile for both primary-key
table and append-only
+ // table.
+ @Nullable private IndexIncrement indexIncrement;
@Nullable private CompactDeletionFile deletionFile;
@@ -66,10 +71,25 @@ public class CompactResult {
return changelog;
}
+ public void setIndexIncrement(@Nullable IndexIncrement indexIncrement) {
+ Preconditions.checkArgument(
+ deletionFile == null,
+ "indexIncrement and deletionFile can't be set at the same
time");
+ this.indexIncrement = indexIncrement;
+ }
+
public void setDeletionFile(@Nullable CompactDeletionFile deletionFile) {
+ Preconditions.checkArgument(
+ indexIncrement == null,
+ "indexIncrement and deletionFile can't be set at the same
time");
this.deletionFile = deletionFile;
}
+ @Nullable
+ public IndexIncrement indexIncrement() {
+ return indexIncrement;
+ }
+
@Nullable
public CompactDeletionFile deletionFile() {
return deletionFile;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
index a1d85b5ba..f8c8330f1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
@@ -54,6 +54,10 @@ public class DeletionVectorIndexFileWriter {
this.targetSizeInBytes = targetSizePerIndexFile.getBytes();
}
+ /**
+ * For unaware-bucket mode, this method will write out multiple index
files, else, it will write
+ * out only one index file.
+ */
public List<IndexFileMeta> write(Map<String, DeletionVector> input) throws
IOException {
if (input.isEmpty()) {
return emptyIndexFile();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index ffb024734..798404e00 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -121,7 +121,7 @@ public class DeletionVectorsIndexFile extends IndexFile {
return deletionVectors;
}
- public DeletionVector readDeletionVector(String dataFile, DeletionFile
deletionFile) {
+ public DeletionVector readDeletionVector(DeletionFile deletionFile) {
String indexFile = deletionFile.path();
try (SeekableInputStream inputStream = fileIO.newInputStream(new
Path(indexFile))) {
checkVersion(inputStream);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java
index 4922e45a6..c89eb4b54 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java
@@ -36,7 +36,8 @@ import static
org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;
* A maintainer to maintain deletion files for append table, the core methods:
*
* <ul>
- * <li>{@link #notifyDeletionFiles}: Mark the deletion of data files, create
new deletion vectors.
+ * <li>{@link #notifyNewDeletionVector}: Mark the deletion of data files,
create new deletion
+ * vectors.
* <li>{@link #persist}: persist deletion files to commit.
* </ul>
*/
@@ -46,7 +47,14 @@ public interface AppendDeletionFileMaintainer {
int getBucket();
- void notifyDeletionFiles(String dataFile, DeletionVector deletionVector);
+ DeletionFile getDeletionFile(String dataFile);
+
+ DeletionVector getDeletionVector(String dataFile);
+
+ void notifyNewDeletionVector(String dataFile, DeletionVector
deletionVector);
+
+ /** In compaction operation, notify that a deletion file of a data file is
dropped. */
+ void notifyRemovedDeletionVector(String dataFile);
List<IndexManifestEntry> persist();
@@ -60,7 +68,10 @@ public interface AppendDeletionFileMaintainer {
DeletionVectorsMaintainer maintainer =
new DeletionVectorsMaintainer.Factory(indexFileHandler)
.createOrRestore(snapshotId, partition, bucket);
- return new BucketedAppendDeletionFileMaintainer(partition, bucket,
maintainer);
+ Map<String, DeletionFile> deletionFiles =
+ indexFileHandler.scanDVIndex(snapshotId, partition, bucket);
+ return new BucketedAppendDeletionFileMaintainer(
+ partition, bucket, deletionFiles, maintainer);
}
static AppendDeletionFileMaintainer forUnawareAppend(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java
index 1b839575f..f8fcb218a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java
@@ -23,8 +23,10 @@ import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.source.DeletionFile;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
/** A {@link AppendDeletionFileMaintainer} of bucketed append table. */
@@ -32,12 +34,17 @@ public class BucketedAppendDeletionFileMaintainer
implements AppendDeletionFileM
private final BinaryRow partition;
private final int bucket;
+ private final Map<String, DeletionFile> dataFileToDeletionFile;
private final DeletionVectorsMaintainer maintainer;
BucketedAppendDeletionFileMaintainer(
- BinaryRow partition, int bucket, DeletionVectorsMaintainer
maintainer) {
+ BinaryRow partition,
+ int bucket,
+ Map<String, DeletionFile> deletionFiles,
+ DeletionVectorsMaintainer maintainer) {
this.partition = partition;
this.bucket = bucket;
+ this.dataFileToDeletionFile = deletionFiles;
this.maintainer = maintainer;
}
@@ -52,10 +59,25 @@ public class BucketedAppendDeletionFileMaintainer
implements AppendDeletionFileM
}
@Override
- public void notifyDeletionFiles(String dataFile, DeletionVector
deletionVector) {
+ public DeletionFile getDeletionFile(String dataFile) {
+ return dataFileToDeletionFile.get(dataFile);
+ }
+
+ @Override
+ public DeletionVector getDeletionVector(String dataFile) {
+ return this.maintainer.deletionVectorOf(dataFile).orElse(null);
+ }
+
+ @Override
+ public void notifyNewDeletionVector(String dataFile, DeletionVector
deletionVector) {
maintainer.mergeNewDeletion(dataFile, deletionVector);
}
+ @Override
+ public void notifyRemovedDeletionVector(String dataFile) {
+ maintainer.removeDeletionVectorOf(dataFile);
+ }
+
@Override
public List<IndexManifestEntry> persist() {
return maintainer.writeDeletionVectorsIndex().stream()
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
index de6baac9f..0fe0074d7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
@@ -46,6 +46,7 @@ public class UnawareAppendDeletionFileMaintainer implements
AppendDeletionFileMa
private final IndexFileHandler indexFileHandler;
private final BinaryRow partition;
+ private final Map<String, DeletionFile> dataFileToDeletionFile;
private final Map<String, IndexManifestEntry> indexNameToEntry = new
HashMap<>();
private final Map<String, Map<String, DeletionFile>>
indexFileToDeletionFiles = new HashMap<>();
@@ -61,6 +62,7 @@ public class UnawareAppendDeletionFileMaintainer implements
AppendDeletionFileMa
Map<String, DeletionFile> deletionFiles) {
this.indexFileHandler = indexFileHandler;
this.partition = partition;
+ this.dataFileToDeletionFile = deletionFiles;
// the deletion of data files is independent
// just create an empty maintainer
this.maintainer = new
DeletionVectorsMaintainer.Factory(indexFileHandler).create();
@@ -102,19 +104,29 @@ public class UnawareAppendDeletionFileMaintainer
implements AppendDeletionFileMa
return UNAWARE_BUCKET;
}
+ public DeletionFile getDeletionFile(String dataFile) {
+ return this.dataFileToDeletionFile.get(dataFile);
+ }
+
@Override
- public void notifyDeletionFiles(String dataFile, DeletionVector
deletionVector) {
- DeletionVectorsIndexFile deletionVectorsIndexFile =
indexFileHandler.deletionVectorsIndex();
- DeletionFile previous = null;
- if (dataFileToIndexFile.containsKey(dataFile)) {
- String indexFileName = dataFileToIndexFile.get(dataFile);
- touchedIndexFiles.add(indexFileName);
- if (indexFileToDeletionFiles.containsKey(indexFileName)) {
- previous =
indexFileToDeletionFiles.get(indexFileName).remove(dataFile);
- }
+ public DeletionVector getDeletionVector(String dataFile) {
+ DeletionFile deletionFile = getDeletionFile(dataFile);
+ if (deletionFile != null) {
+ return
indexFileHandler.deletionVectorsIndex().readDeletionVector(deletionFile);
}
+ return null;
+ }
+
+ public void notifyRemovedDeletionVector(String dataFile) {
+ getRemovedDeletionFile(dataFile);
+ }
+
+ @Override
+ public void notifyNewDeletionVector(String dataFile, DeletionVector
deletionVector) {
+ DeletionVectorsIndexFile deletionVectorsIndexFile =
indexFileHandler.deletionVectorsIndex();
+ DeletionFile previous = getRemovedDeletionFile(dataFile);
if (previous != null) {
-
deletionVector.merge(deletionVectorsIndexFile.readDeletionVector(dataFile,
previous));
+
deletionVector.merge(deletionVectorsIndexFile.readDeletionVector(previous));
}
maintainer.notifyNewDeletion(dataFile, deletionVector);
}
@@ -133,6 +145,28 @@ public class UnawareAppendDeletionFileMaintainer
implements AppendDeletionFileMa
return result;
}
+ private DeletionFile getRemovedDeletionFile(String dataFile) {
+ if (dataFileToIndexFile.containsKey(dataFile)) {
+ String indexFileName = dataFileToIndexFile.get(dataFile);
+ touchedIndexFiles.add(indexFileName);
+ if (indexFileToDeletionFiles.containsKey(indexFileName)) {
+ return
indexFileToDeletionFiles.get(indexFileName).remove(dataFile);
+ }
+ }
+ return null;
+ }
+
+ public IndexFileMeta getIndexFile(String dataFile) {
+ DeletionFile deletionFile = getDeletionFile(dataFile);
+ if (deletionFile == null) {
+ return null;
+ } else {
+ IndexManifestEntry entry =
+ this.indexNameToEntry.get(new
Path(deletionFile.path()).getName());
+ return entry == null ? null : entry.indexFile();
+ }
+ }
+
@VisibleForTesting
List<IndexManifestEntry> writeUnchangedDeletionVector() {
DeletionVectorsIndexFile deletionVectorsIndexFile =
indexFileHandler.deletionVectorsIndex();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
index 89fbb6080..b5719555f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
@@ -30,6 +30,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.stream.Collectors;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
@@ -140,12 +141,20 @@ public class IndexManifestFileHandler {
indexEntries.put(identifier(entry), entry);
}
- for (IndexManifestEntry entry : newIndexFiles) {
- if (entry.kind() == FileKind.ADD) {
- indexEntries.put(identifier(entry), entry);
- } else {
- indexEntries.remove(identifier(entry));
- }
+ // The deleted entry is processed first to avoid overwriting a new
entry.
+ List<IndexManifestEntry> removed =
+ newIndexFiles.stream()
+ .filter(f -> f.kind() == FileKind.DELETE)
+ .collect(Collectors.toList());
+ List<IndexManifestEntry> added =
+ newIndexFiles.stream()
+ .filter(f -> f.kind() == FileKind.ADD)
+ .collect(Collectors.toList());
+ for (IndexManifestEntry entry : removed) {
+ indexEntries.remove(identifier(entry));
+ }
+ for (IndexManifestEntry entry : added) {
+ indexEntries.put(identifier(entry), entry);
}
return new ArrayList<>(indexEntries.values());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index aa60c8846..66ba6743f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -317,7 +317,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
compactChangelog.clear();
this.compactDeletionFile = null;
- return new CommitIncrement(dataIncrement, compactIncrement,
drainDeletionFile);
+ return new CommitIncrement(dataIncrement, compactIncrement, null,
drainDeletionFile);
}
private void trySyncLatestCompaction(boolean blocking) throws Exception {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 459fec027..5e68e10cb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -208,6 +208,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
WriterContainer<T> writerContainer = entry.getValue();
CommitIncrement increment =
writerContainer.writer.prepareCommit(waitCompaction);
+ List<IndexFileMeta> deletedIndexFiles = new ArrayList<>();
List<IndexFileMeta> newIndexFiles = new ArrayList<>();
if (writerContainer.indexMaintainer != null) {
newIndexFiles.addAll(writerContainer.indexMaintainer.prepareCommit());
@@ -216,13 +217,17 @@ public abstract class AbstractFileStoreWrite<T>
implements FileStoreWrite<T> {
if (compactDeletionFile != null) {
compactDeletionFile.getOrCompute().ifPresent(newIndexFiles::add);
}
+ if (increment.indexIncrement() != null) {
+
newIndexFiles.addAll(increment.indexIncrement().newIndexFiles());
+
deletedIndexFiles.addAll(increment.indexIncrement().deletedIndexFiles());
+ }
CommitMessageImpl committable =
new CommitMessageImpl(
partition,
bucket,
increment.newFilesIncrement(),
increment.compactIncrement(),
- new IndexIncrement(newIndexFiles));
+ new IndexIncrement(newIndexFiles,
deletedIndexFiles));
result.add(committable);
if (committable.isEmpty()) {
@@ -333,6 +338,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
for (State<T> state : states) {
RecordWriter<T> writer =
createWriter(
+ state.baseSnapshotId,
state.partition,
state.bucket,
state.dataFiles,
@@ -408,6 +414,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
ignorePreviousFiles ? null : latestSnapshotId,
partition, bucket);
RecordWriter<T> writer =
createWriter(
+ latestSnapshotId,
partition.copy(),
bucket,
restoreFiles,
@@ -460,6 +467,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
protected void notifyNewWriter(RecordWriter<T> writer) {}
protected abstract RecordWriter<T> createWriter(
+ @Nullable Long snapshotId,
BinaryRow partition,
int bucket,
List<DataFileMeta> restoreFiles,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index f4df43107..0dcf2c1bf 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -28,10 +28,12 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.options.MemorySize;
@@ -58,6 +60,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
/** {@link FileStoreWrite} for {@link AppendOnlyFileStore}. */
public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow> {
@@ -81,10 +84,9 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
private final MemorySize maxDiskSize;
private final SimpleColStatsCollector.Factory[] statsCollectors;
private final FileIndexOptions fileIndexOptions;
-
+ private final BucketMode bucketMode;
private boolean forceBufferSpill = false;
private boolean skipCompaction;
- private BucketMode bucketMode = BucketMode.HASH_FIXED;
public AppendOnlyFileStoreWrite(
FileIO fileIO,
@@ -96,19 +98,36 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
SnapshotManager snapshotManager,
FileStoreScan scan,
CoreOptions options,
+ BucketMode bucketMode,
+ @Nullable DeletionVectorsMaintainer.Factory
deletionVectorsMaintainerFactory,
String tableName) {
- super(commitUser, snapshotManager, scan, options, null, null,
tableName);
+ super(
+ commitUser,
+ snapshotManager,
+ scan,
+ options,
+ null,
+ deletionVectorsMaintainerFactory,
+ tableName);
this.fileIO = fileIO;
this.read = read;
this.schemaId = schemaId;
this.rowType = rowType;
this.fileFormat = options.fileFormat();
this.pathFactory = pathFactory;
+ this.bucketMode = bucketMode;
this.targetFileSize = options.targetFileSize(false);
this.compactionMinFileNum = options.compactionMinFileNum();
this.compactionMaxFileNum = options.compactionMaxFileNum().orElse(5);
this.commitForceCompact = options.commitForceCompact();
- this.skipCompaction = options.writeOnly();
+ // AppendOnlyFileStoreWrite is sensitive with bucket mode. It will act
difference in
+ // unaware-bucket mode (no compaction and force empty-writer).
+ if (bucketMode == BucketMode.BUCKET_UNAWARE) {
+ super.withIgnorePreviousFiles(true);
+ skipCompaction = true;
+ } else {
+ this.skipCompaction = options.writeOnly();
+ }
this.fileCompression = options.fileCompression();
this.spillCompression = options.spillCompression();
this.useWriteBuffer = options.useWriteBufferForAppend();
@@ -121,23 +140,41 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
@Override
protected RecordWriter<InternalRow> createWriter(
+ @Nullable Long snapshotId,
BinaryRow partition,
int bucket,
List<DataFileMeta> restoredFiles,
long restoredMaxSeqNumber,
@Nullable CommitIncrement restoreIncrement,
ExecutorService compactExecutor,
- @Nullable DeletionVectorsMaintainer ignore) {
+ @Nullable DeletionVectorsMaintainer dvMaintainer) {
+ AppendDeletionFileMaintainer dvIndexFileMaintainer;
+ if (!skipCompaction && dvMaintainer != null) {
+ dvIndexFileMaintainer =
+ AppendDeletionFileMaintainer.forBucketedAppend(
+ dvMaintainer.indexFileHandler(), snapshotId,
partition, bucket);
+ } else {
+ dvIndexFileMaintainer = null;
+ }
+ // let writer and compact manager hold the same reference
+ // and make restore files mutable to update
+ DataFilePathFactory factory =
pathFactory.createDataFilePathFactory(partition, bucket);
CompactManager compactManager =
skipCompaction
? new NoopCompactManager()
: new BucketedAppendCompactManager(
compactExecutor,
restoredFiles,
+ dvIndexFileMaintainer,
compactionMinFileNum,
compactionMaxFileNum,
targetFileSize,
- toCompact -> compactRewrite(partition, bucket,
toCompact),
+ toCompact ->
+ compactRewrite(
+ partition,
+ bucket,
+ dvIndexFileMaintainer,
+ toCompact),
compactionMetrics == null
? null
:
compactionMetrics.createReporter(partition, bucket));
@@ -166,17 +203,11 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
options.asyncFileWrite());
}
- /** TODO remove this, and pass deletion vectors. */
- public List<DataFileMeta> compactRewrite(
- BinaryRow partition, int bucket, List<DataFileMeta> toCompact)
throws Exception {
- return compactRewrite(partition, bucket, toCompact, null);
- }
-
public List<DataFileMeta> compactRewrite(
BinaryRow partition,
int bucket,
- List<DataFileMeta> toCompact,
- @Nullable List<IOExceptionSupplier<DeletionVector>> dvFactories)
+ @Nullable AppendDeletionFileMaintainer dvIndexFileMaintainer,
+ List<DataFileMeta> toCompact)
throws Exception {
if (toCompact.isEmpty()) {
return Collections.emptyList();
@@ -188,6 +219,18 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
bucket,
new LongCounter(toCompact.get(0).minSequenceNumber()),
FileSource.COMPACT);
+ List<IOExceptionSupplier<DeletionVector>> dvFactories =
+ dvIndexFileMaintainer == null
+ ? null
+ : toCompact.stream()
+ .map(
+ f ->
+
(IOExceptionSupplier<DeletionVector>)
+ () ->
+
dvIndexFileMaintainer
+
.getDeletionVector(
+
f.fileName()))
+ .collect(Collectors.toList());
try {
rewriter.write(createFilesIterator(partition, bucket, toCompact,
dvFactories));
} catch (Exception e) {
@@ -199,7 +242,6 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
collectedExceptions = ExceptionUtils.firstOrSuppressed(e,
collectedExceptions);
}
}
-
if (collectedExceptions != null) {
throw collectedExceptions;
}
@@ -232,17 +274,6 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
return new RecordReaderIterator<>(read.createReader(partition, bucket,
files, dvFactories));
}
- public AppendOnlyFileStoreWrite withBucketMode(BucketMode bucketMode) {
- // AppendOnlyFileStoreWrite is sensitive with bucket mode. It will act
difference in
- // unaware-bucket mode (no compaction and force empty-writer).
- this.bucketMode = bucketMode;
- if (bucketMode == BucketMode.BUCKET_UNAWARE) {
- super.withIgnorePreviousFiles(true);
- skipCompaction = true;
- }
- return this;
- }
-
@Override
public void withIgnorePreviousFiles(boolean ignorePrevious) {
// in unaware bucket mode, we need all writers to be empty
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 5f21d3c1a..9f4570cf2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -172,6 +172,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
@Override
protected MergeTreeWriter createWriter(
+ @Nullable Long snapshotId,
BinaryRow partition,
int bucket,
List<DataFileMeta> restoreFiles,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 4b6dd9b08..c9a32a251 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -138,9 +138,7 @@ class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
@Override
public TableWriteImpl<InternalRow> newWrite(
String commitUser, ManifestCacheFilter manifestFilter) {
- // if this table is unaware-bucket table, we skip compaction and
restored files searching
- AppendOnlyFileStoreWrite writer =
- store().newWrite(commitUser,
manifestFilter).withBucketMode(bucketMode());
+ AppendOnlyFileStoreWrite writer = store().newWrite(commitUser,
manifestFilter);
return new TableWriteImpl<>(
rowType(),
writer,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java
b/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java
index 3c16378f8..18c7d5675 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/CommitIncrement.java
@@ -21,6 +21,7 @@ package org.apache.paimon.utils;
import org.apache.paimon.compact.CompactDeletionFile;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.io.IndexIncrement;
import javax.annotation.Nullable;
@@ -29,14 +30,23 @@ public class CommitIncrement {
private final DataIncrement dataIncrement;
private final CompactIncrement compactIncrement;
+
+ // TODO: unify IndexIncrement and CompactDeletionFile for both primary-key
table and append-only
+ // table.
+ @Nullable private final IndexIncrement indexIncrement;
@Nullable private final CompactDeletionFile compactDeletionFile;
public CommitIncrement(
DataIncrement dataIncrement,
CompactIncrement compactIncrement,
+ @Nullable IndexIncrement indexIncrement,
@Nullable CompactDeletionFile compactDeletionFile) {
+ Preconditions.checkArgument(
+ indexIncrement == null || compactDeletionFile == null,
+ "indexIncrement and compactDeletionFile can't be set at the
same time");
this.dataIncrement = dataIncrement;
this.compactIncrement = compactIncrement;
+ this.indexIncrement = indexIncrement;
this.compactDeletionFile = compactDeletionFile;
}
@@ -48,6 +58,11 @@ public class CommitIncrement {
return compactIncrement;
}
+ @Nullable
+ public IndexIncrement indexIncrement() {
+ return indexIncrement;
+ }
+
@Nullable
public CompactDeletionFile compactDeletionFile() {
return compactDeletionFile;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java
index d23528e3f..82ce2e45a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionITTest.java
@@ -166,7 +166,7 @@ public class AppendOnlyTableCompactionITTest {
throws Exception {
List<CommitMessage> result = new ArrayList<>();
for (UnawareAppendCompactionTask task : tasks) {
- result.add(task.doCompact(write));
+ result.add(task.doCompact(appendOnlyFileStoreTable, write));
}
return result;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index d5c98d667..85385280e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -595,6 +595,7 @@ public class AppendOnlyWriterTest {
Executors.newSingleThreadScheduledExecutor(
new
ExecutorThreadFactory("compaction-thread")),
toCompact,
+ null,
MIN_FILE_NUM,
MAX_FILE_NUM,
targetFileSize,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java
index fd374cc17..b9f4f7f4a 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/BucketedAppendCompactManagerTest.java
@@ -203,6 +203,7 @@ public class BucketedAppendCompactManagerTest {
new BucketedAppendCompactManager(
null, // not used
toCompactBeforePick,
+ null,
minFileNum,
maxFileNum,
targetFileSize,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
index b2af4138c..e7c3cce01 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/FullCompactTaskTest.java
@@ -123,7 +123,7 @@ public class FullCompactTaskTest {
Collection<DataFileMeta> inputs,
long targetFileSize,
BucketedAppendCompactManager.CompactRewriter rewriter) {
- super(inputs, targetFileSize, rewriter, null);
+ super(null, inputs, targetFileSize, rewriter, null);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
index 2ebc30cf9..6c674352b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
@@ -74,13 +74,13 @@ class AppendDeletionFileMaintainerTest {
store.createDVIFMaintainer(BinaryRow.EMPTY_ROW,
dataFileToDeletionFiles);
// no dv should be rewritten, because nothing is changed.
- List<IndexManifestEntry> res =
dvIFMaintainer.writeUnchangedDeletionVector();
+ List<IndexManifestEntry> res = dvIFMaintainer.persist();
assertThat(res.size()).isEqualTo(0);
// the dv of f3 is updated, and the index file that contains the dv of
f3 should be marked
// as REMOVE.
FileIO fileIO = LocalFileIO.create();
- dvIFMaintainer.notifyDeletionFiles(
+ dvIFMaintainer.notifyNewDeletionVector(
"f3", DeletionVector.read(fileIO,
dataFileToDeletionFiles.get("f3")));
res = dvIFMaintainer.writeUnchangedDeletionVector();
assertThat(res.size()).isEqualTo(1);
@@ -88,7 +88,7 @@ class AppendDeletionFileMaintainerTest {
// the dv of f1 and f2 are in one index file, and the dv of f1 is
updated.
// the dv of f2 need to be rewritten, and this index file should be
marked as REMOVE.
- dvIFMaintainer.notifyDeletionFiles(
+ dvIFMaintainer.notifyNewDeletionVector(
"f1", DeletionVector.read(fileIO,
dataFileToDeletionFiles.get("f1")));
res = dvIFMaintainer.writeUnchangedDeletionVector();
assertThat(res.size()).isEqualTo(3);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index cb02b4343..13ac9736e 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -79,7 +79,7 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
SCHEMA,
0,
new BucketedAppendCompactManager(
- null, toCompact, 4, 10, 10, null, null), //
not used
+ null, toCompact, null, 4, 10, 10, null, null),
// not used
null,
false,
dataFilePathFactory,
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java
index ab37ff3b2..8c2b56777 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java
@@ -60,7 +60,7 @@ public class UnawareBucketCompactor {
}
public void processElement(UnawareAppendCompactionTask task) throws
Exception {
- result.add(compactExecutorsupplier.get().submit(() ->
task.doCompact(write)));
+ result.add(compactExecutorsupplier.get().submit(() ->
task.doCompact(table, write)));
}
public void close() throws Exception {
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 378418b2d..a6b11e7db 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -396,7 +396,7 @@ public class CompactProcedure extends BaseProcedure {
taskIterator.next());
messages.add(
messageSer.serialize(
-
task.doCompact(write)));
+
task.doCompact(table, write)));
}
return messages.iterator();
} finally {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index d0b2e86ea..dddbaf24b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -273,7 +273,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
sdv.dataFileAndDeletionVector.foreach {
case (dataFileName, dv) =>
- dvIndexFileMaintainer.notifyDeletionFiles(
+ dvIndexFileMaintainer.notifyNewDeletionVector(
dataFileName,
DeletionVector.deserializeFromBytes(dv))
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index 26d07ce06..719117bcc 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -58,7 +58,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
new
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')")
- val deletionVectors1 = getLatestDeletionVectors(table,
dvMaintainerFactory)
+ val deletionVectors1 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
Assertions.assertEquals(0, deletionVectors1.size)
val cond1 = "id = 2"
@@ -67,7 +67,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
checkAnswer(
spark.sql(s"SELECT * from T ORDER BY id"),
Row(1, "a") :: Row(2, "b_2") :: Row(3, "c") :: Nil)
- val deletionVectors2 = getLatestDeletionVectors(table,
dvMaintainerFactory)
+ val deletionVectors2 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
Assertions.assertEquals(1, deletionVectors2.size)
deletionVectors2
.foreach {
@@ -79,7 +79,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
checkAnswer(
spark.sql(s"SELECT * from T ORDER BY id"),
Row(1, "a") :: Row(2, "b_2") :: Row(3, "c") :: Row(4, "d") ::
Row(5, "e") :: Nil)
- val deletionVectors3 = getLatestDeletionVectors(table,
dvMaintainerFactory)
+ val deletionVectors3 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
Assertions.assertTrue(deletionVectors2 == deletionVectors3)
val cond2 = "id % 2 = 1"
@@ -94,6 +94,16 @@ class DeletionVectorTest extends PaimonSparkTestBase {
Row(1, "_all") :: Row(2, "_all") :: Row(3, "_all") :: Row(4,
"_all") :: Row(
5,
"_all") :: Nil)
+
+ spark.sql("CALL sys.compact('T')")
+ val deletionVectors4 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
+ // After compaction, deletionVectors should be empty
+ Assertions.assertTrue(deletionVectors4.isEmpty)
+ checkAnswer(
+ spark.sql(s"SELECT * from T ORDER BY id"),
+ Row(1, "_all") :: Row(2, "_all") :: Row(3, "_all") :: Row(4,
"_all") :: Row(
+ 5,
+ "_all") :: Nil)
}
}
}
@@ -120,7 +130,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
spark.sql(
"INSERT INTO T VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c',
'2025'), (4, 'd', '2025')")
- val deletionVectors1 = getLatestDeletionVectors(table,
dvMaintainerFactory)
+ val deletionVectors1 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
Assertions.assertEquals(0, deletionVectors1.size)
val cond1 = "id = 2"
@@ -169,6 +179,17 @@ class DeletionVectorTest extends PaimonSparkTestBase {
case (filePath, dv) =>
rowMetaInfo2(filePath).foreach(index =>
Assertions.assertTrue(dv.isDeleted(index)))
}
+
+ spark.sql("CALL sys.compact('T')")
+ val deletionVectors5 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
+ // After compaction, deletionVectors should be empty
+ Assertions.assertTrue(deletionVectors5.isEmpty)
+ checkAnswer(
+ spark.sql(s"SELECT * from T ORDER BY id"),
+ Row(1, "a", "2024") :: Row(2, "b_2", "2024") :: Row(3, "c_2",
"2025") :: Row(
+ 4,
+ "d_2",
+ "2025") :: Nil)
}
}
}
@@ -194,14 +215,14 @@ class DeletionVectorTest extends PaimonSparkTestBase {
new
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b')")
- val deletionVectors1 = getLatestDeletionVectors(table,
dvMaintainerFactory)
+ val deletionVectors1 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
Assertions.assertEquals(0, deletionVectors1.size)
val cond1 = "id = 2"
val rowMetaInfo1 = getFilePathAndRowIndex(cond1)
spark.sql(s"DELETE FROM T WHERE $cond1")
checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a")
:: Nil)
- val deletionVectors2 = getLatestDeletionVectors(table,
dvMaintainerFactory)
+ val deletionVectors2 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
Assertions.assertEquals(1, deletionVectors2.size)
deletionVectors2
.foreach {
@@ -213,12 +234,18 @@ class DeletionVectorTest extends PaimonSparkTestBase {
checkAnswer(
spark.sql(s"SELECT * from T ORDER BY id"),
Row(1, "a") :: Row(2, "bb") :: Row(3, "c") :: Row(4, "d") :: Nil)
- val deletionVectors3 = getLatestDeletionVectors(table,
dvMaintainerFactory)
+ val deletionVectors3 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
Assertions.assertTrue(deletionVectors2 == deletionVectors3)
val cond2 = "id % 2 = 1"
spark.sql(s"DELETE FROM T WHERE $cond2")
checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(2, "bb")
:: Row(4, "d") :: Nil)
+
+ spark.sql("CALL sys.compact('T')")
+ val deletionVectors4 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
+ // After compaction, deletionVectors should be empty
+ Assertions.assertTrue(deletionVectors4.isEmpty)
+ checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(2, "bb")
:: Row(4, "d") :: Nil)
}
}
}
@@ -243,9 +270,16 @@ class DeletionVectorTest extends PaimonSparkTestBase {
val dvMaintainerFactory =
new
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+ def getDeletionVectors(ptValues: Seq[String]): Map[String,
DeletionVector] = {
+ getLatestDeletionVectors(
+ table,
+ dvMaintainerFactory,
+ ptValues.map(BinaryRow.singleColumn))
+ }
+
spark.sql(
"INSERT INTO T VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c',
'2025'), (4, 'd', '2025')")
- val deletionVectors1 = getLatestDeletionVectors(table,
dvMaintainerFactory)
+ val deletionVectors1 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
Assertions.assertEquals(0, deletionVectors1.size)
val cond1 = "id = 2"
@@ -254,11 +288,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
checkAnswer(
spark.sql(s"SELECT * from T ORDER BY id"),
Row(1, "a", "2024") :: Row(3, "c", "2025") :: Row(4, "d", "2025")
:: Nil)
- val deletionVectors2 =
- getLatestDeletionVectors(
- table,
- dvMaintainerFactory,
- Seq(BinaryRow.singleColumn("2024"),
BinaryRow.singleColumn("2025")))
+ val deletionVectors2 = getDeletionVectors(Seq("2024", "2025"))
Assertions.assertEquals(1, deletionVectors2.size)
deletionVectors2
.foreach {
@@ -272,22 +302,28 @@ class DeletionVectorTest extends PaimonSparkTestBase {
checkAnswer(
spark.sql(s"SELECT * from T ORDER BY id"),
Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
- val deletionVectors3 =
- getLatestDeletionVectors(
- table,
- dvMaintainerFactory,
- Seq(BinaryRow.singleColumn("2024")))
+ val deletionVectors3 = getDeletionVectors(Seq("2024"))
Assertions.assertTrue(deletionVectors2 == deletionVectors3)
- val deletionVectors4 =
- getLatestDeletionVectors(
- table,
- dvMaintainerFactory,
- Seq(BinaryRow.singleColumn("2024"),
BinaryRow.singleColumn("2025")))
+ val deletionVectors4 = getDeletionVectors(Seq("2024", "2025"))
deletionVectors4
.foreach {
case (filePath, dv) =>
rowMetaInfo2(filePath).foreach(index =>
Assertions.assertTrue(dv.isDeleted(index)))
}
+
+ spark.sql("""CALL sys.compact(table => 'T', partitions => "pt =
'2024'")""")
+ Assertions.assertTrue(getDeletionVectors(Seq("2024")).isEmpty)
+ Assertions.assertTrue(getDeletionVectors(Seq("2025")).nonEmpty)
+ checkAnswer(
+ spark.sql(s"SELECT * from T ORDER BY id"),
+ Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
+
+ spark.sql("""CALL sys.compact(table => 'T', where => "pt =
'2025'")""")
+ Assertions.assertTrue(getDeletionVectors(Seq("2025")).isEmpty)
+ Assertions.assertTrue(getDeletionVectors(Seq("2025")).isEmpty)
+ checkAnswer(
+ spark.sql(s"SELECT * from T ORDER BY id"),
+ Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
}
}
}
@@ -317,7 +353,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
val dvMaintainerFactory =
new
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
- val deletionVectors1 = getLatestDeletionVectors(table,
dvMaintainerFactory)
+ val deletionVectors1 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
// 1, 3 deleted, their row positions are 0, 2
Assertions.assertEquals(1, deletionVectors1.size)
deletionVectors1
@@ -330,7 +366,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
// Compact
// f3 (1, 2, 3), no deletion
spark.sql("CALL sys.compact('T')")
- val deletionVectors2 = getLatestDeletionVectors(table,
dvMaintainerFactory)
+ val deletionVectors2 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
// After compaction, deletionVectors should be empty
Assertions.assertTrue(deletionVectors2.isEmpty)
@@ -342,7 +378,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
spark.sql(s"SELECT * from T ORDER BY id"),
Row(1, "a_new1") :: Row(2, "b_new2") :: Row(3, "c_new1") :: Nil)
- val deletionVectors3 = getLatestDeletionVectors(table,
dvMaintainerFactory)
+ val deletionVectors3 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
// 2 deleted, row positions is 1
Assertions.assertEquals(1, deletionVectors3.size)
deletionVectors3
@@ -488,7 +524,7 @@ class DeletionVectorTest extends PaimonSparkTestBase {
new Path(path).getName
}
- private def getLatestDeletionVectors(
+ private def getAllLatestDeletionVectors(
table: FileStoreTable,
dvMaintainerFactory: DeletionVectorsMaintainer.Factory): Map[String,
DeletionVector] = {
getLatestDeletionVectors(table, dvMaintainerFactory,
Seq(BinaryRow.EMPTY_ROW))