This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new be2996efe [spark] refactor delete with deletion vector to parallel
write deletion file (#3744)
be2996efe is described below
commit be2996efe24b3757099261d2287a923771f08555
Author: Yann Byron <[email protected]>
AuthorDate: Mon Jul 29 10:57:17 2024 +0800
[spark] refactor delete with deletion vector to parallel write deletion
file (#3744)
---
.../DeletionVectorIndexFileMaintainer.java | 68 +++++++++++++++++-
.../deletionvectors/DeletionVectorsIndexFile.java | 13 ++++
.../org/apache/paimon/index/IndexFileHandler.java | 78 +++++++++++++++++++-
.../org/apache/paimon/TestAppendFileStore.java | 7 +-
.../DeletionVectorIndexFileMaintainerTest.java | 2 +-
.../commands/DeleteFromPaimonTableCommand.scala | 29 ++++----
.../paimon/spark/commands/PaimonCommand.scala | 77 ++------------------
.../paimon/spark/commands/PaimonSparkWriter.scala | 83 ++++++++++------------
.../paimon/spark/commands/SparkDataFileMeta.scala | 6 --
.../spark/commands/UpdatePaimonTableCommand.scala | 2 +-
.../org/apache/paimon/spark/util/SQLHelper.scala | 55 ++++++++++++++
11 files changed, 277 insertions(+), 143 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
index 50e921f0d..39a0c7592 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainer.java
@@ -18,6 +18,8 @@
package org.apache.paimon.deletionvectors;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
@@ -38,15 +40,41 @@ public class DeletionVectorIndexFileMaintainer {
private final IndexFileHandler indexFileHandler;
+ private final BinaryRow partition;
+ private final int bucket;
private final Map<String, IndexManifestEntry> indexNameToEntry = new
HashMap<>();
private final Map<String, Map<String, DeletionFile>>
indexFileToDeletionFiles = new HashMap<>();
+ private final Map<String, String> dataFileToIndexFile = new HashMap<>();
private final Set<String> touchedIndexFiles = new HashSet<>();
+ private final DeletionVectorsMaintainer maintainer;
+
+ // the key of dataFileToDeletionFiles is the relative path again table's
location.
public DeletionVectorIndexFileMaintainer(
- IndexFileHandler indexFileHandler, Map<String, DeletionFile>
dataFileToDeletionFiles) {
+ IndexFileHandler indexFileHandler,
+ Long snapshotId,
+ BinaryRow partition,
+ int bucket,
+ boolean restore) {
this.indexFileHandler = indexFileHandler;
+ this.partition = partition;
+ this.bucket = bucket;
+ if (restore) {
+ this.maintainer =
+ new DeletionVectorsMaintainer.Factory(indexFileHandler)
+ .createOrRestore(snapshotId, partition, bucket);
+ } else {
+ this.maintainer = new
DeletionVectorsMaintainer.Factory(indexFileHandler).create();
+ }
+ Map<String, DeletionFile> dataFileToDeletionFiles =
+ indexFileHandler.scanDVIndex(snapshotId, partition, bucket);
+ init(dataFileToDeletionFiles);
+ }
+
+ @VisibleForTesting
+ public void init(Map<String, DeletionFile> dataFileToDeletionFiles) {
List<String> touchedIndexFileNames =
dataFileToDeletionFiles.values().stream()
.map(deletionFile -> new
Path(deletionFile.path()).getName())
@@ -66,7 +94,32 @@ public class DeletionVectorIndexFileMaintainer {
indexFileToDeletionFiles.put(indexFileName, new HashMap<>());
}
indexFileToDeletionFiles.get(indexFileName).put(dataFile,
deletionFile);
+ dataFileToIndexFile.put(dataFile, indexFileName);
+ }
+ }
+
+ public BinaryRow getPartition() {
+ return this.partition;
+ }
+
+ public int getBucket() {
+ return this.bucket;
+ }
+
+ public void notifyDeletionFiles(String dataFile, DeletionVector
deletionVector) {
+ DeletionVectorsIndexFile deletionVectorsIndexFile =
indexFileHandler.deletionVectorsIndex();
+ DeletionFile previous = null;
+ if (dataFileToIndexFile.containsKey(dataFile)) {
+ String indexFileName = dataFileToIndexFile.get(dataFile);
+ touchedIndexFiles.add(indexFileName);
+ if (indexFileToDeletionFiles.containsKey(indexFileName)) {
+ previous =
indexFileToDeletionFiles.get(indexFileName).remove(dataFile);
+ }
}
+ if (previous != null) {
+
deletionVector.merge(deletionVectorsIndexFile.readDeletionVector(dataFile,
previous));
+ }
+ maintainer.notifyNewDeletion(dataFile, deletionVector);
}
public void notifyDeletionFiles(Map<String, DeletionFile>
dataFileToDeletionFiles) {
@@ -80,6 +133,19 @@ public class DeletionVectorIndexFileMaintainer {
}
}
+ public List<IndexManifestEntry> persist() {
+ List<IndexManifestEntry> result = writeUnchangedDeletionVector();
+ List<IndexManifestEntry> newIndexFileEntries =
+ maintainer.writeDeletionVectorsIndex().stream()
+ .map(
+ fileMeta ->
+ new IndexManifestEntry(
+ FileKind.ADD, partition,
bucket, fileMeta))
+ .collect(Collectors.toList());
+ result.addAll(newIndexFileEntries);
+ return result;
+ }
+
public List<IndexManifestEntry> writeUnchangedDeletionVector() {
DeletionVectorsIndexFile deletionVectorsIndexFile =
indexFileHandler.deletionVectorsIndex();
List<IndexManifestEntry> newIndexEntries = new ArrayList<>();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index f21aeb5a7..ffb024734 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -121,6 +121,19 @@ public class DeletionVectorsIndexFile extends IndexFile {
return deletionVectors;
}
+ public DeletionVector readDeletionVector(String dataFile, DeletionFile
deletionFile) {
+ String indexFile = deletionFile.path();
+ try (SeekableInputStream inputStream = fileIO.newInputStream(new
Path(indexFile))) {
+ checkVersion(inputStream);
+ checkArgument(deletionFile.path().equals(indexFile));
+ inputStream.seek(deletionFile.offset());
+ DataInputStream dataInputStream = new DataInputStream(inputStream);
+ return readDeletionVector(dataInputStream, (int)
deletionFile.length());
+ } catch (Exception e) {
+ throw new RuntimeException("Unable to read deletion vector from
file: " + indexFile, e);
+ }
+ }
+
/**
* Write deletion vectors to a new file, the format of this file can be
referenced at: <a
* href="https://cwiki.apache.org/confluence/x/Tws4EQ">PIP-16</a>.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index afafb42ec..eed18af17 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -32,11 +32,14 @@ import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.SnapshotManager;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -45,6 +48,7 @@ import java.util.Set;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** Handle index files. */
public class IndexFileHandler {
@@ -81,6 +85,56 @@ public class IndexFileHandler {
return result.isEmpty() ? Optional.empty() :
Optional.of(result.get(0));
}
+ public Map<String, DeletionFile> scanDVIndex(
+ @Nullable Long snapshotId, BinaryRow partition, int bucket) {
+ if (snapshotId == null) {
+ return Collections.emptyMap();
+ }
+ Snapshot snapshot = snapshotManager.snapshot(snapshotId);
+ String indexManifest = snapshot.indexManifest();
+ if (indexManifest == null) {
+ return Collections.emptyMap();
+ }
+ Map<String, DeletionFile> result = new HashMap<>();
+ for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
+ IndexFileMeta meta = file.indexFile();
+ if (meta.indexType().equals(DELETION_VECTORS_INDEX)
+ && file.partition().equals(partition)
+ && file.bucket() == bucket) {
+ LinkedHashMap<String, Pair<Integer, Integer>> dvRanges =
+ meta.deletionVectorsRanges();
+ checkNotNull(dvRanges);
+ for (String dataFile : dvRanges.keySet()) {
+ Pair<Integer, Integer> pair = dvRanges.get(dataFile);
+ DeletionFile deletionFile =
+ new DeletionFile(
+ filePath(meta).toString(), pair.getLeft(),
pair.getRight());
+ result.put(dataFile, deletionFile);
+ }
+ }
+ }
+ return result;
+ }
+
+ public List<IndexManifestEntry> scan(String indexType) {
+ Snapshot snapshot = snapshotManager.latestSnapshot();
+ if (snapshot == null) {
+ return Collections.emptyList();
+ }
+ String indexManifest = snapshot.indexManifest();
+ if (indexManifest == null) {
+ return Collections.emptyList();
+ }
+
+ List<IndexManifestEntry> result = new ArrayList<>();
+ for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
+ if (file.indexFile().indexType().equals(indexType)) {
+ result.add(file);
+ }
+ }
+ return result;
+ }
+
public List<IndexFileMeta> scan(
long snapshotId, String indexType, BinaryRow partition, int
bucket) {
List<IndexFileMeta> result = new ArrayList<>();
@@ -143,6 +197,26 @@ public class IndexFileHandler {
return result;
}
+ public List<IndexManifestEntry> scanEntries(String indexType, BinaryRow
partition, int bucket) {
+ Snapshot snapshot = snapshotManager.latestSnapshot();
+ if (snapshot == null) {
+ return Collections.emptyList();
+ }
+ String indexManifest = snapshot.indexManifest();
+ if (indexManifest == null) {
+ return Collections.emptyList();
+ }
+ List<IndexManifestEntry> result = new ArrayList<>();
+ for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
+ if (file.indexFile().indexType().equals(indexType)
+ && file.partition().equals(partition)
+ && file.bucket() == bucket) {
+ result.add(file);
+ }
+ }
+ return result;
+ }
+
public Path filePath(IndexFileMeta file) {
return pathFactory.toPath(file.fileName());
}
@@ -218,8 +292,8 @@ public class IndexFileHandler {
}
public DeletionVectorIndexFileMaintainer createDVIndexFileMaintainer(
- Map<String, DeletionFile> dataFileToDeletionFiles) {
- return new DeletionVectorIndexFileMaintainer(this,
dataFileToDeletionFiles);
+ Long snapshotId, BinaryRow partition, int bucket, boolean restore)
{
+ return new DeletionVectorIndexFileMaintainer(this, snapshotId,
partition, bucket, restore);
}
public Map<String, DeletionVector>
readAllDeletionVectors(List<IndexFileMeta> fileMetas) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index d01210e47..c86b1cb40 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -119,8 +119,11 @@ public class TestAppendFileStore extends
AppendOnlyFileStore {
}
public DeletionVectorIndexFileMaintainer createDVIFMaintainer(
- Map<String, DeletionFile> dataFileToDeletionFiles) {
- return new DeletionVectorIndexFileMaintainer(fileHandler,
dataFileToDeletionFiles);
+ BinaryRow partition, int bucket, Map<String, DeletionFile>
dataFileToDeletionFiles) {
+ DeletionVectorIndexFileMaintainer maintainer =
+ new DeletionVectorIndexFileMaintainer(fileHandler, null,
partition, bucket, false);
+ maintainer.init(dataFileToDeletionFiles);
+ return maintainer;
}
public DeletionVectorsMaintainer createOrRestoreDVMaintainer(BinaryRow
partition, int bucket) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
index 20cb47557..f78e39dfb 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileMaintainerTest.java
@@ -69,7 +69,7 @@ public class DeletionVectorIndexFileMaintainerTest {
indexPathFactory,
commitMessage2.indexIncrement().newIndexFiles()));
DeletionVectorIndexFileMaintainer dvIFMaintainer =
- store.createDVIFMaintainer(dataFileToDeletionFiles);
+ store.createDVIFMaintainer(BinaryRow.EMPTY_ROW, 1,
dataFileToDeletionFiles);
// no dv should be rewritten, because nothing is changed.
List<IndexManifestEntry> res =
dvIFMaintainer.writeUnchangedDeletionVector();
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 4bcd9dce6..2aef8e576 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -25,6 +25,7 @@ import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
+import org.apache.paimon.spark.util.SQLHelper
import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage}
import org.apache.paimon.types.RowKind
@@ -49,7 +50,8 @@ case class DeleteFromPaimonTableCommand(
extends PaimonLeafRunnableCommand
with PaimonCommand
with ExpressionHelper
- with SupportsSubquery {
+ with SupportsSubquery
+ with SQLHelper {
private lazy val writer = PaimonSparkWriter(table)
@@ -124,21 +126,18 @@ case class DeleteFromPaimonTableCommand(
val dataFilePathToMeta = candidateFileMap(candidateDataSplits)
if (deletionVectorsEnabled) {
- // Step2: collect all the deletion vectors that marks the deleted rows.
- val deletionVectors = collectDeletionVectors(
- candidateDataSplits,
- dataFilePathToMeta,
- condition,
- relation,
- sparkSession)
-
- deletionVectors.cache()
- try {
- updateDeletionVector(deletionVectors, dataFilePathToMeta, writer)
- } finally {
- deletionVectors.unpersist()
- }
+ withSQLConf("spark.sql.adaptive.enabled" -> "false") {
+ // Step2: collect all the deletion vectors that marks the deleted rows.
+ val deletionVectors = collectDeletionVectors(
+ candidateDataSplits,
+ dataFilePathToMeta,
+ condition,
+ relation,
+ sparkSession)
+ // Step3: update the touched deletion vectors and index files
+ writer.persistDeletionVectors(deletionVectors)
+ }
} else {
// Step2: extract out the exactly files, which must have at least one
record to be updated.
val touchedFilePaths =
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index abd89b0b6..d5dd1e19e 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -22,14 +22,12 @@ import
org.apache.paimon.deletionvectors.{BitmapDeletionVector, DeletionVector}
import org.apache.paimon.fs.Path
import org.apache.paimon.index.IndexFileMeta
import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement,
IndexIncrement}
-import org.apache.paimon.manifest.IndexManifestEntry
import org.apache.paimon.spark.{PaimonSplitScan, SparkFilterConverter}
import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.spark.schema.PaimonMetadataColumn._
-import org.apache.paimon.table.BucketMode
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
import org.apache.paimon.table.source.DataSplit
import org.apache.paimon.types.RowType
@@ -148,62 +146,6 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper {
.toMap
}
- protected def getDeletedIndexFiles(
- dataFilePathToMeta: Map[String, SparkDataFileMeta],
- newDeletionVectors: Dataset[SparkDeletionVectors]
- ): Seq[IndexManifestEntry] = {
- val deletionFiles = dataFilePathToMeta.flatMap {
- case (relativePath, sdf) =>
- sdf.deletionFile match {
- case Some(deletionFile) =>
- Some((relativePath, deletionFile))
- case None => None
- }
- }
- val dvIndexFileMaintainer = fileStore
- .newIndexFileHandler()
- .createDVIndexFileMaintainer(deletionFiles.asJava)
-
- val pathFactory = fileStore.pathFactory()
- val touchedDataFileAndDeletionFiles = newDeletionVectors
- .collect()
- .flatMap {
- sdv =>
- val relativePaths = sdv.relativePaths(pathFactory)
- relativePaths.flatMap {
- relativePath =>
- dataFilePathToMeta(relativePath).deletionFile match {
- case Some(deletionFile) => Some(relativePath, deletionFile)
- case _ => None
- }
- }
- }
- .toMap
-
-
dvIndexFileMaintainer.notifyDeletionFiles(touchedDataFileAndDeletionFiles.asJava)
-
- dvIndexFileMaintainer.writeUnchangedDeletionVector().asScala
- }
-
- protected def updateDeletionVector(
- deletionVectors: Dataset[SparkDeletionVectors],
- dataFilePathToMeta: Map[String, SparkDataFileMeta],
- writer: PaimonSparkWriter): Seq[CommitMessage] = {
- // Step1: write the new deletion vectors
- val newIndexCommitMsg = writer.persistDeletionVectors(deletionVectors)
-
- // Step2: write the unchanged deletion vectors where store in touched dv
index files, and mark these touched index files as DELETE if needed.
- val rewriteIndexCommitMsg = fileStore.bucketMode() match {
- case BucketMode.BUCKET_UNAWARE =>
- val indexEntries = getDeletedIndexFiles(dataFilePathToMeta,
deletionVectors)
- writer.buildCommitMessageFromIndexManifestEntry(indexEntries)
- case _ =>
- Seq.empty[CommitMessage]
- }
-
- newIndexCommitMsg ++ rewriteIndexCommitMsg
- }
-
protected def collectDeletionVectors(
candidateDataSplits: Seq[DataSplit],
dataFilePathToMeta: Map[String, SparkDataFileMeta],
@@ -212,12 +154,12 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper {
sparkSession: SparkSession): Dataset[SparkDeletionVectors] = {
import sparkSession.implicits._
- val dataFileAndDeletionFile =
dataFilePathToMeta.mapValues(_.toSparkDeletionFile).toArray
+ val dataFileToPartitionAndBucket =
+ dataFilePathToMeta.mapValues(meta => (meta.partition,
meta.bucket)).toArray
val metadataCols = Seq(FILE_PATH, ROW_INDEX)
val filteredRelation = createNewScanPlan(candidateDataSplits, condition,
relation, metadataCols)
val store = table.store()
- val fileIO = table.fileIO()
val location = table.location
createDataset(sparkSession, filteredRelation)
.select(FILE_PATH_COLUMN, ROW_INDEX_COLUMN)
@@ -225,29 +167,22 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper {
.groupByKey(_._1)
.mapGroups {
case (filePath, iter) =>
- val fileNameToDeletionFile = dataFileAndDeletionFile.toMap
val dv = new BitmapDeletionVector()
while (iter.hasNext) {
dv.delete(iter.next()._2)
}
val relativeFilePath = location.toUri.relativize(new
URI(filePath)).toString
- val sparkDeletionFile = fileNameToDeletionFile(relativeFilePath)
- sparkDeletionFile.deletionFile match {
- case Some(deletionFile) =>
- dv.merge(DeletionVector.read(fileIO, deletionFile))
- case None =>
- }
-
+ val (partition, bucket) =
dataFileToPartitionAndBucket.toMap.apply(relativeFilePath)
val pathFactory = store.pathFactory()
val partitionAndBucket = pathFactory
- .relativePartitionAndBucketPath(sparkDeletionFile.partition,
sparkDeletionFile.bucket)
+ .relativePartitionAndBucketPath(partition, bucket)
.toString
SparkDeletionVectors(
partitionAndBucket,
- SerializationUtils.serializeBinaryRow(sparkDeletionFile.partition),
- sparkDeletionFile.bucket,
+ SerializationUtils.serializeBinaryRow(partition),
+ bucket,
Seq((new Path(filePath).getName, dv.serializeToBytes()))
)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 41f2db1a3..62fb4689d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -20,7 +20,7 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.CoreOptions.WRITE_ONLY
import org.apache.paimon.data.BinaryRow
-import org.apache.paimon.deletionvectors.{DeletionVector,
DeletionVectorsMaintainer}
+import org.apache.paimon.deletionvectors.{DeletionVector,
DeletionVectorIndexFileMaintainer}
import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
import org.apache.paimon.manifest.{FileKind, IndexManifestEntry}
@@ -200,60 +200,55 @@ case class PaimonSparkWriter(table: FileStoreTable) {
/**
* Write all the deletion vectors to the index files. If it's in unaware
mode, one index file maps
- * one deletion vector; else, one index file will contains all deletion
vector with the same
+ * deletion vectors; else, one index file will contains all deletion vector
with the same
* partition and bucket.
*/
def persistDeletionVectors(deletionVectors: Dataset[SparkDeletionVectors]):
Seq[CommitMessage] = {
val sparkSession = deletionVectors.sparkSession
import sparkSession.implicits._
-
+ val snapshotId = table.snapshotManager().latestSnapshotId();
val fileStore = table.store()
- val lastSnapshotId = table.snapshotManager().latestSnapshotId()
-
- def createOrRestoreDVMaintainer(
- partition: BinaryRow,
- bucket: Int): DeletionVectorsMaintainer = {
- val deletionVectorsMaintainerFactory =
- new DeletionVectorsMaintainer.Factory(fileStore.newIndexFileHandler())
- fileStore.bucketMode() match {
- case BucketMode.BUCKET_UNAWARE =>
- deletionVectorsMaintainerFactory.create()
- case _ =>
- deletionVectorsMaintainerFactory.createOrRestore(lastSnapshotId,
partition, bucket)
- }
- }
-
- def commitDeletionVector(
- sdv: SparkDeletionVectors,
- serializer: CommitMessageSerializer): Array[Byte] = {
- val partition = SerializationUtils.deserializeBinaryRow(sdv.partition)
- val maintainer = createOrRestoreDVMaintainer(partition, sdv.bucket)
- sdv.dataFileAndDeletionVector.foreach {
- case (dataFileName, dv) =>
- maintainer.notifyNewDeletion(dataFileName,
DeletionVector.deserializeFromBytes(dv))
- }
-
- val commitMessage = new CommitMessageImpl(
- partition,
- sdv.bucket,
- DataIncrement.emptyIncrement(),
- CompactIncrement.emptyIncrement(),
- new IndexIncrement(maintainer.writeDeletionVectorsIndex()))
-
- serializer.serialize(commitMessage)
- }
-
val serializedCommits = deletionVectors
.groupByKey(_.partitionAndBucket)
.mapGroups {
case (_, iter: Iterator[SparkDeletionVectors]) =>
- val serializer = new CommitMessageSerializer
- val grouped = iter.reduce {
- (sdv1, sdv2) =>
- sdv1.copy(dataFileAndDeletionVector =
- sdv1.dataFileAndDeletionVector ++
sdv2.dataFileAndDeletionVector)
+ val indexHandler = fileStore.newIndexFileHandler()
+ var dvIndexFileMaintainer: DeletionVectorIndexFileMaintainer = null
+ while (iter.hasNext) {
+ val sdv: SparkDeletionVectors = iter.next()
+ if (dvIndexFileMaintainer == null) {
+ val partition =
SerializationUtils.deserializeBinaryRow(sdv.partition)
+ dvIndexFileMaintainer = indexHandler
+ .createDVIndexFileMaintainer(
+ snapshotId,
+ partition,
+ sdv.bucket,
+ bucketMode != BucketMode.BUCKET_UNAWARE)
+ }
+ if (dvIndexFileMaintainer == null) {
+ throw new RuntimeException("can't create the dv maintainer.")
+ }
+
+ sdv.dataFileAndDeletionVector.foreach {
+ case (dataFileName, dv) =>
+ dvIndexFileMaintainer.notifyDeletionFiles(
+ dataFileName,
+ DeletionVector.deserializeFromBytes(dv))
+ }
}
- commitDeletionVector(grouped, serializer)
+ val indexEntries = dvIndexFileMaintainer.persist()
+
+ val (added, deleted) = indexEntries.asScala.partition(_.kind() ==
FileKind.ADD)
+
+ val commitMessage = new CommitMessageImpl(
+ dvIndexFileMaintainer.getPartition,
+ dvIndexFileMaintainer.getBucket,
+ DataIncrement.emptyIncrement(),
+ CompactIncrement.emptyIncrement(),
+ new IndexIncrement(added.map(_.indexFile).asJava,
deleted.map(_.indexFile).asJava)
+ )
+ val serializer = new CommitMessageSerializer
+ serializer.serialize(commitMessage)
}
serializedCommits
.collect()
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
index 1e0f1d6d3..b380d36c3 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
@@ -39,14 +39,8 @@ case class SparkDataFileMeta(
.toUri
.toString + "/" + dataFileMeta.fileName()
}
-
- def toSparkDeletionFile: SparkDeletionFile = {
- SparkDeletionFile(partition, bucket, deletionFile)
- }
}
-case class SparkDeletionFile(partition: BinaryRow, bucket: Int, deletionFile:
Option[DeletionFile])
-
object SparkDataFileMeta {
def convertToSparkDataFileMeta(
dataSplit: DataSplit,
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index ca12d6a1a..6c7d07bf5 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -104,7 +104,7 @@ case class UpdatePaimonTableCommand(
val addCommitMessage = writeOnlyUpdatedData(sparkSession,
touchedDataSplits)
// Step4: write these deletion vectors.
- val indexCommitMsg = updateDeletionVector(deletionVectors,
dataFilePathToMeta, writer)
+ val indexCommitMsg = writer.persistDeletionVectors(deletionVectors)
addCommitMessage ++ indexCommitMsg
} finally {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SQLHelper.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SQLHelper.scala
new file mode 100644
index 000000000..88243b36d
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/util/SQLHelper.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.util
+
+import org.apache.spark.sql.internal.SQLConf
+
+trait SQLHelper {
+
+ /**
+ * Sets all SQL configurations specified in `pairs`, calls `f`, and then
restores all SQL
+ * configurations.
+ */
+ protected def withSQLConf[T](pairs: (String, String)*)(f: => T): T = {
+ val conf = SQLConf.get
+ val (keys, values) = pairs.unzip
+ val currentValues = keys.map {
+ key =>
+ if (conf.contains(key)) {
+ Some(conf.getConfString(key))
+ } else {
+ None
+ }
+ }
+ (keys, values).zipped.foreach {
+ (k, v) =>
+ if (SQLConf.isStaticConfigKey(k)) {
+ throw new RuntimeException(s"Cannot modify the value of a static
config: $k")
+ }
+ conf.setConfString(k, v)
+ }
+ try f
+ finally {
+ keys.zip(currentValues).foreach {
+ case (key, Some(value)) => conf.setConfString(key, value)
+ case (key, None) => conf.unsetConf(key)
+ }
+ }
+ }
+}