This is an automated email from the ASF dual-hosted git repository.
biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c4762ccdd [core] Refactor AppendDeletionFileMaintainer to separate
bucketed and unaware (#3950)
c4762ccdd is described below
commit c4762ccddd198da0b355a69cd7c083ab18b116fa
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 13 18:08:09 2024 +0800
[core] Refactor AppendDeletionFileMaintainer to separate bucketed and
unaware (#3950)
---
.../deletionvectors/DeletionVectorsMaintainer.java | 16 +++++
.../append/AppendDeletionFileMaintainer.java | 72 ++++++++++++++++++++++
.../BucketedAppendDeletionFileMaintainer.java | 65 +++++++++++++++++++
.../UnawareAppendDeletionFileMaintainer.java} | 71 +++++++++------------
.../org/apache/paimon/index/IndexFileHandler.java | 6 --
.../java/org/apache/paimon/table/BucketMode.java | 4 +-
.../table/sink/UnawareBucketRowKeyExtractor.java | 3 +-
.../org/apache/paimon/TestAppendFileStore.java | 12 ++--
.../AppendDeletionFileMaintainerTest.java} | 17 ++---
.../paimon/spark/commands/PaimonSparkWriter.scala | 19 +++---
10 files changed, 216 insertions(+), 69 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
index 8079d977c..8dd6af1de 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
@@ -75,6 +75,22 @@ public class DeletionVectorsMaintainer {
modified = true;
}
+ /**
+ * Merge a new deletion which marks the specified deletion vector with the
given file name, if
+ * the previous deletion vector exist, merge the old one.
+ *
+ * @param fileName The name of the file where the deletion occurred.
+ * @param deletionVector The deletion vector
+ */
+ public void mergeNewDeletion(String fileName, DeletionVector
deletionVector) {
+ DeletionVector old = deletionVectors.get(fileName);
+ if (old != null) {
+ deletionVector.merge(old);
+ }
+ deletionVectors.put(fileName, deletionVector);
+ modified = true;
+ }
+
/**
* Removes the specified file's deletion vector, this method is typically
used for remove before
* files' deletion vector in compaction.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java
new file mode 100644
index 000000000..4922e45a6
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors.append;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.source.DeletionFile;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;
+
+/**
+ * A maintainer to maintain deletion files for append table, the core methods:
+ *
+ * <ul>
+ * <li>{@link #notifyDeletionFiles}: Mark the deletion of data files, create
new deletion vectors.
+ * <li>{@link #persist}: persist deletion files to commit.
+ * </ul>
+ */
+public interface AppendDeletionFileMaintainer {
+
+ BinaryRow getPartition();
+
+ int getBucket();
+
+ void notifyDeletionFiles(String dataFile, DeletionVector deletionVector);
+
+ List<IndexManifestEntry> persist();
+
+ static AppendDeletionFileMaintainer forBucketedAppend(
+ IndexFileHandler indexFileHandler,
+ @Nullable Long snapshotId,
+ BinaryRow partition,
+ int bucket) {
+ // bucket should have only one deletion file, so here we should read
old deletion vectors,
+ // overwrite the entire deletion file of the bucket when writing
deletes.
+ DeletionVectorsMaintainer maintainer =
+ new DeletionVectorsMaintainer.Factory(indexFileHandler)
+ .createOrRestore(snapshotId, partition, bucket);
+ return new BucketedAppendDeletionFileMaintainer(partition, bucket,
maintainer);
+ }
+
+ static AppendDeletionFileMaintainer forUnawareAppend(
+ IndexFileHandler indexFileHandler, @Nullable Long snapshotId,
BinaryRow partition) {
+ Map<String, DeletionFile> deletionFiles =
+ indexFileHandler.scanDVIndex(snapshotId, partition,
UNAWARE_BUCKET);
+ return new UnawareAppendDeletionFileMaintainer(indexFileHandler,
partition, deletionFiles);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java
new file mode 100644
index 000000000..1b839575f
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeletionFileMaintainer.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors.append;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** A {@link AppendDeletionFileMaintainer} of bucketed append table. */
+public class BucketedAppendDeletionFileMaintainer implements
AppendDeletionFileMaintainer {
+
+ private final BinaryRow partition;
+ private final int bucket;
+ private final DeletionVectorsMaintainer maintainer;
+
+ BucketedAppendDeletionFileMaintainer(
+ BinaryRow partition, int bucket, DeletionVectorsMaintainer
maintainer) {
+ this.partition = partition;
+ this.bucket = bucket;
+ this.maintainer = maintainer;
+ }
+
+ @Override
+ public BinaryRow getPartition() {
+ return this.partition;
+ }
+
+ @Override
+ public int getBucket() {
+ return this.bucket;
+ }
+
+ @Override
+ public void notifyDeletionFiles(String dataFile, DeletionVector
deletionVector) {
+ maintainer.mergeNewDeletion(dataFile, deletionVector);
+ }
+
+ @Override
+ public List<IndexManifestEntry> persist() {
+ return maintainer.writeDeletionVectorsIndex().stream()
+ .map(fileMeta -> new IndexManifestEntry(FileKind.ADD,
partition, bucket, fileMeta))
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
similarity index 74%
rename from
paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
rename to
paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
index 39a0c7592..de6baac9f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/UnawareAppendDeletionFileMaintainer.java
@@ -16,10 +16,13 @@
* limitations under the License.
*/
-package org.apache.paimon.deletionvectors;
+package org.apache.paimon.deletionvectors.append;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
+import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
@@ -35,13 +38,14 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-/** DeletionVectorIndexFileMaintainer. */
-public class DeletionVectorIndexFileMaintainer {
+import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;
+
+/** A {@link AppendDeletionFileMaintainer} of unaware bucket append table. */
+public class UnawareAppendDeletionFileMaintainer implements
AppendDeletionFileMaintainer {
private final IndexFileHandler indexFileHandler;
private final BinaryRow partition;
- private final int bucket;
private final Map<String, IndexManifestEntry> indexNameToEntry = new
HashMap<>();
private final Map<String, Map<String, DeletionFile>>
indexFileToDeletionFiles = new HashMap<>();
@@ -51,26 +55,16 @@ public class DeletionVectorIndexFileMaintainer {
private final DeletionVectorsMaintainer maintainer;
- // the key of dataFileToDeletionFiles is the relative path again table's
location.
- public DeletionVectorIndexFileMaintainer(
+ UnawareAppendDeletionFileMaintainer(
IndexFileHandler indexFileHandler,
- Long snapshotId,
BinaryRow partition,
- int bucket,
- boolean restore) {
+ Map<String, DeletionFile> deletionFiles) {
this.indexFileHandler = indexFileHandler;
this.partition = partition;
- this.bucket = bucket;
- if (restore) {
- this.maintainer =
- new DeletionVectorsMaintainer.Factory(indexFileHandler)
- .createOrRestore(snapshotId, partition, bucket);
- } else {
- this.maintainer = new
DeletionVectorsMaintainer.Factory(indexFileHandler).create();
- }
- Map<String, DeletionFile> dataFileToDeletionFiles =
- indexFileHandler.scanDVIndex(snapshotId, partition, bucket);
- init(dataFileToDeletionFiles);
+ // the deletion of data files is independent
+ // just create an empty maintainer
+ this.maintainer = new
DeletionVectorsMaintainer.Factory(indexFileHandler).create();
+ init(deletionFiles);
}
@VisibleForTesting
@@ -98,14 +92,17 @@ public class DeletionVectorIndexFileMaintainer {
}
}
+ @Override
public BinaryRow getPartition() {
return this.partition;
}
+ @Override
public int getBucket() {
- return this.bucket;
+ return UNAWARE_BUCKET;
}
+ @Override
public void notifyDeletionFiles(String dataFile, DeletionVector
deletionVector) {
DeletionVectorsIndexFile deletionVectorsIndexFile =
indexFileHandler.deletionVectorsIndex();
DeletionFile previous = null;
@@ -122,17 +119,7 @@ public class DeletionVectorIndexFileMaintainer {
maintainer.notifyNewDeletion(dataFile, deletionVector);
}
- public void notifyDeletionFiles(Map<String, DeletionFile>
dataFileToDeletionFiles) {
- for (String dataFile : dataFileToDeletionFiles.keySet()) {
- DeletionFile deletionFile = dataFileToDeletionFiles.get(dataFile);
- String indexFileName = new Path(deletionFile.path()).getName();
- touchedIndexFiles.add(indexFileName);
- if (indexFileToDeletionFiles.containsKey(indexFileName)) {
- indexFileToDeletionFiles.get(indexFileName).remove(dataFile);
- }
- }
- }
-
+ @Override
public List<IndexManifestEntry> persist() {
List<IndexManifestEntry> result = writeUnchangedDeletionVector();
List<IndexManifestEntry> newIndexFileEntries =
@@ -140,13 +127,14 @@ public class DeletionVectorIndexFileMaintainer {
.map(
fileMeta ->
new IndexManifestEntry(
- FileKind.ADD, partition,
bucket, fileMeta))
+ FileKind.ADD, partition,
UNAWARE_BUCKET, fileMeta))
.collect(Collectors.toList());
result.addAll(newIndexFileEntries);
return result;
}
- public List<IndexManifestEntry> writeUnchangedDeletionVector() {
+ @VisibleForTesting
+ List<IndexManifestEntry> writeUnchangedDeletionVector() {
DeletionVectorsIndexFile deletionVectorsIndexFile =
indexFileHandler.deletionVectorsIndex();
List<IndexManifestEntry> newIndexEntries = new ArrayList<>();
for (String indexFile : indexFileToDeletionFiles.keySet()) {
@@ -162,14 +150,13 @@ public class DeletionVectorIndexFileMaintainer {
deletionVectorsIndexFile.readDeletionVector(
dataFileToDeletionFiles));
newIndexFiles.forEach(
- newIndexFile -> {
- newIndexEntries.add(
- new IndexManifestEntry(
- FileKind.ADD,
- oldEntry.partition(),
- oldEntry.bucket(),
- newIndexFile));
- });
+ newIndexFile ->
+ newIndexEntries.add(
+ new IndexManifestEntry(
+ FileKind.ADD,
+ oldEntry.partition(),
+ oldEntry.bucket(),
+ newIndexFile)));
}
// mark the touched index file as removed.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index ee6cbe769..bdf47b16a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -21,7 +21,6 @@ package org.apache.paimon.index;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
-import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
@@ -271,11 +270,6 @@ public class IndexFileHandler {
indexManifestFile.delete(indexManifest);
}
- public DeletionVectorIndexFileMaintainer createDVIndexFileMaintainer(
- Long snapshotId, BinaryRow partition, int bucket, boolean restore)
{
- return new DeletionVectorIndexFileMaintainer(this, snapshotId,
partition, bucket, restore);
- }
-
public Map<String, DeletionVector>
readAllDeletionVectors(List<IndexFileMeta> fileMetas) {
for (IndexFileMeta indexFile : fileMetas) {
checkArgument(
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
b/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
index 2fcc8822a..c3b7ca1ab 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/BucketMode.java
@@ -57,5 +57,7 @@ public enum BucketMode {
* Ignoring bucket concept, although all data is written to bucket-0, the
parallelism of reads
* and writes is unrestricted. This mode only works for append-only table.
*/
- BUCKET_UNAWARE
+ BUCKET_UNAWARE;
+
+ public static final int UNAWARE_BUCKET = 0;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/UnawareBucketRowKeyExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/UnawareBucketRowKeyExtractor.java
index e99133899..ad8a3cd83 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/UnawareBucketRowKeyExtractor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/UnawareBucketRowKeyExtractor.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -37,6 +38,6 @@ public class UnawareBucketRowKeyExtractor extends
RowKeyExtractor {
@Override
public int bucket() {
- return 0;
+ return BucketMode.UNAWARE_BUCKET;
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index c86b1cb40..a68779226 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -19,8 +19,9 @@
package org.apache.paimon;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer;
+import
org.apache.paimon.deletionvectors.append.UnawareAppendDeletionFileMaintainer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
@@ -118,10 +119,11 @@ public class TestAppendFileStore extends
AppendOnlyFileStore {
return fileHandler.scan(lastSnapshotId, DELETION_VECTORS_INDEX,
partition, bucket);
}
- public DeletionVectorIndexFileMaintainer createDVIFMaintainer(
- BinaryRow partition, int bucket, Map<String, DeletionFile>
dataFileToDeletionFiles) {
- DeletionVectorIndexFileMaintainer maintainer =
- new DeletionVectorIndexFileMaintainer(fileHandler, null,
partition, bucket, false);
+ public UnawareAppendDeletionFileMaintainer createDVIFMaintainer(
+ BinaryRow partition, Map<String, DeletionFile>
dataFileToDeletionFiles) {
+ UnawareAppendDeletionFileMaintainer maintainer =
+ (UnawareAppendDeletionFileMaintainer)
+
AppendDeletionFileMaintainer.forUnawareAppend(fileHandler, null, partition);
maintainer.init(dataFileToDeletionFiles);
return maintainer;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
similarity index 91%
rename from
paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
index f78e39dfb..2ebc30cf9 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerTest.java
@@ -16,10 +16,13 @@
* limitations under the License.
*/
-package org.apache.paimon.deletionvectors;
+package org.apache.paimon.deletionvectors.append;
import org.apache.paimon.TestAppendFileStore;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
@@ -39,8 +42,7 @@ import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for DeletionVectorIndexFileMaintainer. */
-public class DeletionVectorIndexFileMaintainerTest {
+class AppendDeletionFileMaintainerTest {
@TempDir java.nio.file.Path tempDir;
@@ -68,8 +70,8 @@ public class DeletionVectorIndexFileMaintainerTest {
createDeletionFileMapFromIndexFileMetas(
indexPathFactory,
commitMessage2.indexIncrement().newIndexFiles()));
- DeletionVectorIndexFileMaintainer dvIFMaintainer =
- store.createDVIFMaintainer(BinaryRow.EMPTY_ROW, 1,
dataFileToDeletionFiles);
+ UnawareAppendDeletionFileMaintainer dvIFMaintainer =
+ store.createDVIFMaintainer(BinaryRow.EMPTY_ROW,
dataFileToDeletionFiles);
// no dv should be rewritten, because nothing is changed.
List<IndexManifestEntry> res =
dvIFMaintainer.writeUnchangedDeletionVector();
@@ -77,8 +79,9 @@ public class DeletionVectorIndexFileMaintainerTest {
// the dv of f3 is updated, and the index file that contains the dv of
f3 should be marked
// as REMOVE.
+ FileIO fileIO = LocalFileIO.create();
dvIFMaintainer.notifyDeletionFiles(
- Collections.singletonMap("f3",
dataFileToDeletionFiles.get("f3")));
+ "f3", DeletionVector.read(fileIO,
dataFileToDeletionFiles.get("f3")));
res = dvIFMaintainer.writeUnchangedDeletionVector();
assertThat(res.size()).isEqualTo(1);
assertThat(res.get(0).kind()).isEqualTo(FileKind.DELETE);
@@ -86,7 +89,7 @@ public class DeletionVectorIndexFileMaintainerTest {
// the dv of f1 and f2 are in one index file, and the dv of f1 is
updated.
// the dv of f2 need to be rewritten, and this index file should be
marked as REMOVE.
dvIFMaintainer.notifyDeletionFiles(
- Collections.singletonMap("f1",
dataFileToDeletionFiles.get("f1")));
+ "f1", DeletionVector.read(fileIO,
dataFileToDeletionFiles.get("f1")));
res = dvIFMaintainer.writeUnchangedDeletionVector();
assertThat(res.size()).isEqualTo(3);
IndexManifestEntry entry =
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index d12909b8a..7bdd0ce60 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -19,7 +19,8 @@
package org.apache.paimon.spark.commands
import org.apache.paimon.CoreOptions.WRITE_ONLY
-import org.apache.paimon.deletionvectors.{DeletionVector,
DeletionVectorIndexFileMaintainer}
+import org.apache.paimon.deletionvectors.DeletionVector
+import org.apache.paimon.deletionvectors.append.{AppendDeletionFileMaintainer,
UnawareAppendDeletionFileMaintainer}
import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
import org.apache.paimon.manifest.{FileKind, IndexManifestEntry}
@@ -27,6 +28,7 @@ import org.apache.paimon.spark.{SparkRow, SparkTableWrite}
import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL,
ROW_KIND_COL}
import org.apache.paimon.spark.util.SparkRowUtils
import org.apache.paimon.table.{BucketMode, FileStoreTable}
+import org.apache.paimon.table.BucketMode.BUCKET_UNAWARE
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage,
CommitMessageImpl, CommitMessageSerializer, RowPartitionKeyExtractor}
import org.apache.paimon.utils.SerializationUtils
@@ -179,7 +181,7 @@ case class PaimonSparkWriter(table: FileStoreTable) {
numAssigners,
encoderGroupWithBucketCol))
}
- case BucketMode.BUCKET_UNAWARE =>
+ case BUCKET_UNAWARE =>
// Topology: input ->
writeWithoutBucket()
case BucketMode.HASH_FIXED =>
@@ -211,17 +213,20 @@ case class PaimonSparkWriter(table: FileStoreTable) {
.mapGroups {
case (_, iter: Iterator[SparkDeletionVectors]) =>
val indexHandler = table.store().newIndexFileHandler()
- var dvIndexFileMaintainer: DeletionVectorIndexFileMaintainer = null
+ var dvIndexFileMaintainer: AppendDeletionFileMaintainer = null
while (iter.hasNext) {
val sdv: SparkDeletionVectors = iter.next()
if (dvIndexFileMaintainer == null) {
val partition =
SerializationUtils.deserializeBinaryRow(sdv.partition)
- dvIndexFileMaintainer = indexHandler
- .createDVIndexFileMaintainer(
+ dvIndexFileMaintainer = if (bucketMode == BUCKET_UNAWARE) {
+ AppendDeletionFileMaintainer.forUnawareAppend(indexHandler,
snapshotId, partition)
+ } else {
+ AppendDeletionFileMaintainer.forBucketedAppend(
+ indexHandler,
snapshotId,
partition,
- sdv.bucket,
- bucketMode != BucketMode.BUCKET_UNAWARE)
+ sdv.bucket)
+ }
}
if (dvIndexFileMaintainer == null) {
throw new RuntimeException("can't create the dv maintainer.")