This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a2363e621 [spark] spark delete with deletion vector (#3390)
a2363e621 is described below
commit a2363e6210d4c1a289f895ee4ed96cb9188c757b
Author: Yann Byron <[email protected]>
AuthorDate: Mon May 27 16:38:52 2024 +0800
[spark] spark delete with deletion vector (#3390)
---
.../org/apache/paimon/utils/RoaringBitmap32.java | 13 ++
.../deletionvectors/BitmapDeletionVector.java | 13 ++
.../deletionvectors/DeletionVectorsMaintainer.java | 16 ++
.../org/apache/paimon/index/IndexFileHandler.java | 7 +
.../org/apache/paimon/schema/SchemaValidation.java | 8 +-
.../apache/paimon/table/source/DeletionFile.java | 5 +-
.../org/apache/paimon/spark/PaimonSplitScan.scala | 10 +-
.../commands/DeleteFromPaimonTableCommand.scala | 88 ++++++----
.../paimon/spark/commands/PaimonCommand.scala | 112 +++++++++++-
.../paimon/spark/commands/PaimonSparkWriter.scala | 98 ++++++++++-
.../paimon/spark/commands/SparkDataFileMeta.scala | 30 +++-
...StoreTable.scala => SparkDeletionVectors.scala} | 27 ++-
.../paimon/spark/commands/WithFileStoreTable.scala | 7 +
.../paimon/spark/schema/PaimonMetadataColumn.scala | 5 +
.../paimon/spark/sql/DeletionVectorTest.scala | 192 ++++++++++++++++++---
15 files changed, 547 insertions(+), 84 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
index 6458c825a..ee83d1232 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
@@ -23,6 +23,7 @@ import org.roaringbitmap.RoaringBitmap;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.Objects;
/** A compressed bitmap for 32-bit integer. */
public class RoaringBitmap32 {
@@ -67,4 +68,16 @@ public class RoaringBitmap32 {
public void deserialize(DataInput in) throws IOException {
roaringBitmap.deserialize(in);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RoaringBitmap32 that = (RoaringBitmap32) o;
+ return Objects.equals(this.roaringBitmap, that.roaringBitmap);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
index 2948c10d6..a2c592596 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java
@@ -24,6 +24,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutputStream;
import java.io.IOException;
+import java.util.Objects;
/**
* A {@link DeletionVector} based on {@link RoaringBitmap32}, it only supports
files with row count
@@ -104,4 +105,16 @@ public class BitmapDeletionVector implements
DeletionVector {
"The file has too many rows, RoaringBitmap32 only supports
files with row count not exceeding 2147483647.");
}
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BitmapDeletionVector that = (BitmapDeletionVector) o;
+ return Objects.equals(this.roaringBitmap, that.roaringBitmap);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
index ce9ad40a1..461e092e0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.IndexManifestEntry;
import javax.annotation.Nullable;
@@ -30,6 +31,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
@@ -138,6 +140,20 @@ public class DeletionVectorsMaintainer {
return createOrRestore(deletionVectors);
}
+ @VisibleForTesting
+ public DeletionVectorsMaintainer createOrRestore(
+ @Nullable Long snapshotId, BinaryRow partition) {
+ List<IndexFileMeta> indexFiles =
+ snapshotId == null
+ ? Collections.emptyList()
+ : handler.scan(snapshotId, DELETION_VECTORS_INDEX,
partition).stream()
+ .map(IndexManifestEntry::indexFile)
+ .collect(Collectors.toList());
+ Map<String, DeletionVector> deletionVectors =
+ new HashMap<>(handler.readAllDeletionVectors(indexFiles));
+ return createOrRestore(deletionVectors);
+ }
+
public DeletionVectorsMaintainer create() {
return createOrRestore(new HashMap<>());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 2d3dee20e..cdb10ae13 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -21,10 +21,12 @@ package org.apache.paimon.index;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
+import org.apache.paimon.deletionvectors.DeletionVectorIndexFileMaintainer;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
+import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.utils.IntIterator;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
@@ -185,6 +187,11 @@ public class IndexFileHandler {
indexManifestFile.delete(indexManifest);
}
+ public DeletionVectorIndexFileMaintainer createDVIndexFileMaintainer(
+ Map<String, DeletionFile> dataFileToDeletionFiles) {
+ return new DeletionVectorIndexFileMaintainer(this,
dataFileToDeletionFiles);
+ }
+
public Map<String, DeletionVector>
readAllDeletionVectors(List<IndexFileMeta> fileMetas) {
Map<String, DeletionVector> deletionVectors = new HashMap<>();
for (IndexFileMeta indexFile : fileMetas) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 25b320515..03fbcfae8 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -184,7 +184,7 @@ public class SchemaValidation {
field));
if (options.deletionVectorsEnabled()) {
- validateForDeletionVectors(schema, options);
+ validateForDeletionVectors(options);
}
}
@@ -461,11 +461,7 @@ public class SchemaValidation {
}
}
- private static void validateForDeletionVectors(TableSchema schema,
CoreOptions options) {
- checkArgument(
- !schema.primaryKeys().isEmpty(),
- "Deletion vectors mode is only supported for tables with
primary keys.");
-
+ private static void validateForDeletionVectors(CoreOptions options) {
checkArgument(
options.changelogProducer() == ChangelogProducer.NONE
|| options.changelogProducer() ==
ChangelogProducer.LOOKUP,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java
index 13089bb02..94dfc6157 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DeletionFile.java
@@ -26,6 +26,7 @@ import org.apache.paimon.io.DataOutputView;
import javax.annotation.Nullable;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -44,7 +45,9 @@ import java.util.Optional;
* </ul>
*/
@Public
-public class DeletionFile {
+public class DeletionFile implements Serializable {
+
+ private static final long serialVersionUID = 1L;
private final String path;
private final long offset;
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
index e86f4caf6..37991d0be 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonSplitScan.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark
+import org.apache.paimon.spark.schema.PaimonMetadataColumn
import org.apache.paimon.table.Table
import org.apache.paimon.table.source.{DataSplit, Split}
@@ -25,12 +26,15 @@ import org.apache.spark.sql.connector.read.{Batch, Scan}
import org.apache.spark.sql.types.StructType
/** For internal use only. */
-case class PaimonSplitScan(table: Table, dataSplits: Array[DataSplit]) extends
Scan {
-
+case class PaimonSplitScan(
+ table: Table,
+ dataSplits: Array[DataSplit],
+ metadataColumns: Seq[PaimonMetadataColumn] = Seq.empty)
+ extends Scan {
override def readSchema(): StructType =
SparkTypeUtils.fromPaimonRowType(table.rowType())
override def toBatch: Batch = {
- PaimonBatch(dataSplits.asInstanceOf[Array[Split]], table.newReadBuilder)
+ PaimonBatch(dataSplits.asInstanceOf[Array[Split]], table.newReadBuilder,
metadataColumns)
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 7ce78e79c..86d2aee8b 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -20,13 +20,12 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.CoreOptions
import org.apache.paimon.CoreOptions.MergeEngine
-import org.apache.paimon.spark.{InsertInto, SparkTable}
import org.apache.paimon.spark.PaimonSplitScan
import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import org.apache.paimon.spark.leafnode.PaimonLeafRunnableCommand
import org.apache.paimon.spark.schema.SparkSystemColumns.ROW_KIND_COL
-import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.{BucketMode, FileStoreTable}
import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage}
import org.apache.paimon.types.RowKind
import org.apache.paimon.utils.RowDataPartitionComputer
@@ -56,7 +55,7 @@ case class DeleteFromPaimonTableCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
- val commit = table.store.newCommit(UUID.randomUUID.toString)
+ val commit = fileStore.newCommit(UUID.randomUUID.toString)
if (condition == null || condition == TrueLiteral) {
commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
} else {
@@ -100,7 +99,7 @@ case class DeleteFromPaimonTableCommand(
val commitMessages = if (usePrimaryKeyDelete()) {
performPrimaryKeyDelete(sparkSession)
} else {
- performDeleteCopyOnWrite(sparkSession)
+ performNonPrimaryKeyDelete(sparkSession)
}
writer.commit(commitMessages)
}
@@ -119,39 +118,70 @@ case class DeleteFromPaimonTableCommand(
writer.write(df)
}
- def performDeleteCopyOnWrite(sparkSession: SparkSession): Seq[CommitMessage]
= {
+ def performNonPrimaryKeyDelete(sparkSession: SparkSession):
Seq[CommitMessage] = {
+ val pathFactory = fileStore.pathFactory()
// Step1: the candidate data splits which are filtered by Paimon Predicate.
val candidateDataSplits = findCandidateDataSplits(condition,
relation.output)
- val fileNameToMeta = candidateFileMap(candidateDataSplits)
+ val dataFilePathToMeta = candidateFileMap(candidateDataSplits)
- // Step2: extract out the exactly files, which must have at least one
record to be updated.
- val touchedFilePaths = findTouchedFiles(candidateDataSplits, condition,
relation, sparkSession)
+ if (deletionVectorsEnabled) {
+ // Step2: collect all the deletion vectors that marks the deleted rows.
+ val deletionVectors = collectDeletionVectors(
+ candidateDataSplits,
+ dataFilePathToMeta,
+ condition,
+ relation,
+ sparkSession)
+
+ deletionVectors.cache()
+ try {
+ // Step3: write these deletion vectors.
+ val newIndexCommitMsg = writer.persistDeletionVectors(deletionVectors)
+
+ // Step4: mark the touched index files as DELETE if needed.
+ val rewriteIndexCommitMsg = fileStore.bucketMode() match {
+ case BucketMode.BUCKET_UNAWARE =>
+ val indexEntries = getDeletedIndexFiles(dataFilePathToMeta,
deletionVectors)
+ writer.buildCommitMessageFromIndexManifestEntry(indexEntries)
+ case _ =>
+ Seq.empty[CommitMessage]
+ }
- // Step3: the smallest range of data files that need to be rewritten.
- val touchedFiles = touchedFilePaths.map {
- file => fileNameToMeta.getOrElse(file, throw new
RuntimeException(s"Missing file: $file"))
- }
+ newIndexCommitMsg ++ rewriteIndexCommitMsg
+ } finally {
+ deletionVectors.unpersist()
+ }
- // Step4: build a dataframe that contains the unchanged data, and write
out them.
- val touchedDataSplits = SparkDataFileMeta.convertToDataSplits(
- touchedFiles,
- rawConvertible = true,
- table.store().pathFactory())
- val toRewriteScanRelation = Filter(
- Not(condition),
- Compatibility.createDataSourceV2ScanRelation(
- relation,
- PaimonSplitScan(table, touchedDataSplits),
- relation.output))
- val data = createDataset(sparkSession, toRewriteScanRelation)
+ } else {
+ // Step2: extract out the exactly files, which must have at least one
record to be updated.
+ val touchedFilePaths =
+ findTouchedFiles(candidateDataSplits, condition, relation,
sparkSession)
+
+ // Step3: the smallest range of data files that need to be rewritten.
+ val touchedFiles = touchedFilePaths.map {
+ file =>
+ dataFilePathToMeta.getOrElse(file, throw new
RuntimeException(s"Missing file: $file"))
+ }
- // only write new files, should have no compaction
- val addCommitMessage = writer.writeOnly().write(data)
+ // Step4: build a dataframe that contains the unchanged data, and write
out them.
+ val touchedDataSplits =
+ SparkDataFileMeta.convertToDataSplits(touchedFiles, rawConvertible =
true, pathFactory)
+ val toRewriteScanRelation = Filter(
+ Not(condition),
+ Compatibility.createDataSourceV2ScanRelation(
+ relation,
+ PaimonSplitScan(table, touchedDataSplits),
+ relation.output))
+ val data = createDataset(sparkSession, toRewriteScanRelation)
- // Step5: convert the deleted files that need to be wrote to commit
message.
- val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
+ // only write new files, should have no compaction
+ val addCommitMessage = writer.writeOnly().write(data)
- addCommitMessage ++ deletedCommitMessage
+ // Step5: convert the deleted files that need to be wrote to commit
message.
+ val deletedCommitMessage = buildDeletedCommitMessage(touchedFiles)
+
+ addCommitMessage ++ deletedCommitMessage
+ }
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
index 863310e55..915def57d 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonCommand.scala
@@ -18,23 +18,27 @@
package org.apache.paimon.spark.commands
+import org.apache.paimon.deletionvectors.{BitmapDeletionVector, DeletionVector}
+import org.apache.paimon.fs.Path
import org.apache.paimon.index.IndexFileMeta
import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement,
IndexIncrement}
+import org.apache.paimon.manifest.IndexManifestEntry
import org.apache.paimon.spark.{PaimonSplitScan, SparkFilterConverter}
import org.apache.paimon.spark.catalyst.Compatibility
import org.apache.paimon.spark.catalyst.analysis.expressions.ExpressionHelper
import
org.apache.paimon.spark.commands.SparkDataFileMeta.convertToSparkDataFileMeta
+import org.apache.paimon.spark.schema.PaimonMetadataColumn._
import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl}
import org.apache.paimon.table.source.DataSplit
import org.apache.paimon.types.RowType
-import org.apache.paimon.utils.Preconditions
+import org.apache.paimon.utils.SerializationUtils
+import org.apache.spark.sql.{Dataset, SparkSession}
import org.apache.spark.sql.PaimonUtils.createDataset
-import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
-import org.apache.spark.sql.catalyst.plans.logical.{Filter =>
FilterLogicalNode}
-import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation,
DataSourceV2ScanRelation}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter =>
FilterLogicalNode, Project}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.sources.{AlwaysTrue, And, EqualNullSafe, Filter}
@@ -135,17 +139,113 @@ trait PaimonCommand extends WithFileStoreTable with
ExpressionHelper {
.map(relativePath)
}
+ /** Notice that, the key is a relative path, not just the file name. */
protected def candidateFileMap(
candidateDataSplits: Seq[DataSplit]): Map[String, SparkDataFileMeta] = {
- val totalBuckets = table.coreOptions().bucket()
+ val totalBuckets = coreOptions.bucket()
val candidateDataFiles = candidateDataSplits
.flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit,
totalBuckets))
- val fileStorePathFactory = table.store().pathFactory()
+ val fileStorePathFactory = fileStore.pathFactory()
candidateDataFiles
.map(file => (file.relativePath(fileStorePathFactory), file))
.toMap
}
+ protected def getDeletedIndexFiles(
+ dataFilePathToMeta: Map[String, SparkDataFileMeta],
+ newDeletionVectors: Dataset[SparkDeletionVectors]
+ ): Seq[IndexManifestEntry] = {
+ val deletionFiles = dataFilePathToMeta.flatMap {
+ case (relativePath, sdf) =>
+ sdf.deletionFile match {
+ case Some(deletionFile) =>
+ Some((relativePath, deletionFile))
+ case None => None
+ }
+ }
+ val dvIndexFileMaintainer = fileStore
+ .newIndexFileHandler()
+ .createDVIndexFileMaintainer(deletionFiles.asJava)
+
+ val pathFactory = fileStore.pathFactory()
+ val touchedDataFileAndDeletionFiles = newDeletionVectors
+ .collect()
+ .flatMap {
+ sdv =>
+ val relativePaths = sdv.relativePaths(pathFactory)
+ relativePaths.flatMap {
+ relativePath =>
+ dataFilePathToMeta(relativePath).deletionFile match {
+ case Some(deletionFile) => Some(relativePath, deletionFile)
+ case _ => None
+ }
+ }
+ }
+ .toMap
+
+
dvIndexFileMaintainer.notifyDeletionFiles(touchedDataFileAndDeletionFiles.asJava)
+ dvIndexFileMaintainer.writeUnchangedDeletionVector().asScala
+ }
+ protected def collectDeletionVectors(
+ candidateDataSplits: Seq[DataSplit],
+ dataFilePathToMeta: Map[String, SparkDataFileMeta],
+ condition: Expression,
+ relation: DataSourceV2Relation,
+ sparkSession: SparkSession): Dataset[SparkDeletionVectors] = {
+ import sparkSession.implicits._
+
+ val dataFileAndDeletionFile =
dataFilePathToMeta.mapValues(_.toSparkDeletionFile).toArray
+ val metadataCols = Seq(FILE_PATH, ROW_INDEX)
+ val metadataProj = metadataCols.map(_.toAttribute)
+ val scan = PaimonSplitScan(table, candidateDataSplits.toArray,
metadataCols)
+ val filteredRelation = {
+ Project(
+ metadataProj,
+ FilterLogicalNode(
+ condition,
+ Compatibility.createDataSourceV2ScanRelation(
+ relation,
+ scan,
+ relation.output ++ metadataProj)))
+ }
+
+ val store = table.store()
+ val fileIO = table.fileIO()
+ val location = table.location
+ createDataset(sparkSession, filteredRelation)
+ .select(FILE_PATH_COLUMN, ROW_INDEX_COLUMN)
+ .as[(String, Long)]
+ .groupByKey(_._1)
+ .mapGroups {
+ case (filePath, iter) =>
+ val fileNameToDeletionFile = dataFileAndDeletionFile.toMap
+ val dv = new BitmapDeletionVector()
+ while (iter.hasNext) {
+ dv.delete(iter.next()._2)
+ }
+
+ val relativeFilePath = location.toUri.relativize(new
URI(filePath)).toString
+ val sparkDeletionFile = fileNameToDeletionFile(relativeFilePath)
+ sparkDeletionFile.deletionFile match {
+ case Some(deletionFile) =>
+ dv.merge(DeletionVector.read(fileIO, deletionFile))
+ case None =>
+ }
+
+ val pathFactory = store.pathFactory()
+ val partitionAndBucket = pathFactory
+ .relativePartitionAndBucketPath(sparkDeletionFile.partition,
sparkDeletionFile.bucket)
+ .toString
+
+ SparkDeletionVectors(
+ partitionAndBucket,
+ SerializationUtils.serializeBinaryRow(sparkDeletionFile.partition),
+ sparkDeletionFile.bucket,
+ Seq((new Path(filePath).getName, dv.serializeToBytes()))
+ )
+ }
+ }
+
protected def buildDeletedCommitMessage(
deletedFiles: Array[SparkDataFileMeta]): Seq[CommitMessage] = {
deletedFiles
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index b9abbb02d..e97d88a0a 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -19,15 +19,17 @@
package org.apache.paimon.spark.commands
import org.apache.paimon.CoreOptions.WRITE_ONLY
+import org.apache.paimon.data.BinaryRow
+import org.apache.paimon.deletionvectors.{DeletionVector,
DeletionVectorsMaintainer}
import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner}
+import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement}
+import org.apache.paimon.manifest.{FileKind, IndexManifestEntry}
import org.apache.paimon.spark.{SparkRow, SparkTableWrite}
-import org.apache.paimon.spark.schema.SparkSystemColumns
import org.apache.paimon.spark.schema.SparkSystemColumns.{BUCKET_COL,
ROW_KIND_COL}
import org.apache.paimon.spark.util.SparkRowUtils
import org.apache.paimon.table.{BucketMode, FileStoreTable}
-import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage,
CommitMessageSerializer, RowPartitionKeyExtractor}
-import org.apache.paimon.utils.Preconditions
-import org.apache.paimon.utils.Preconditions.checkArgument
+import org.apache.paimon.table.sink.{BatchWriteBuilder, CommitMessage,
CommitMessageImpl, CommitMessageSerializer, RowPartitionKeyExtractor}
+import org.apache.paimon.utils.SerializationUtils
import org.apache.spark.{Partitioner, TaskContext}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
@@ -196,6 +198,94 @@ case class PaimonSparkWriter(table: FileStoreTable) {
.toSeq
}
+ /**
+ * Write all the deletion vectors to the index files. If it's in unaware
mode, one index file maps
+ * one deletion vector; else, one index file will contains all deletion
vector with the same
+ * partition and bucket.
+ */
+ def persistDeletionVectors(deletionVectors: Dataset[SparkDeletionVectors]):
Seq[CommitMessage] = {
+ val sparkSession = deletionVectors.sparkSession
+ import sparkSession.implicits._
+
+ val fileStore = table.store()
+ val lastSnapshotId = table.snapshotManager().latestSnapshotId()
+
+ def createOrRestoreDVMaintainer(
+ partition: BinaryRow,
+ bucket: Int): DeletionVectorsMaintainer = {
+ val deletionVectorsMaintainerFactory =
+ new DeletionVectorsMaintainer.Factory(fileStore.newIndexFileHandler())
+ fileStore.bucketMode() match {
+ case BucketMode.BUCKET_UNAWARE =>
+ deletionVectorsMaintainerFactory.create()
+ case _ =>
+ deletionVectorsMaintainerFactory.createOrRestore(lastSnapshotId,
partition, bucket)
+ }
+ }
+
+ def commitDeletionVector(
+ sdv: SparkDeletionVectors,
+ serializer: CommitMessageSerializer): Array[Byte] = {
+ val partition = SerializationUtils.deserializeBinaryRow(sdv.partition)
+ val maintainer = createOrRestoreDVMaintainer(partition, sdv.bucket)
+ sdv.dataFileAndDeletionVector.foreach {
+ case (dataFileName, dv) =>
+ maintainer.notifyNewDeletion(dataFileName,
DeletionVector.deserializeFromBytes(dv))
+ }
+
+ val commitMessage = new CommitMessageImpl(
+ partition,
+ sdv.bucket,
+ DataIncrement.emptyIncrement(),
+ CompactIncrement.emptyIncrement(),
+ new IndexIncrement(maintainer.prepareCommit()))
+
+ serializer.serialize(commitMessage)
+ }
+
+ val serializedCommits = fileStore.bucketMode() match {
+ case BucketMode.BUCKET_UNAWARE =>
+ deletionVectors.mapPartitions {
+ iter: Iterator[SparkDeletionVectors] =>
+ val serializer = new CommitMessageSerializer
+ iter.map(commitDeletionVector(_, serializer))
+ }
+ case _ =>
+ deletionVectors
+ .groupByKey(_.partitionAndBucket)
+ .mapGroups {
+ case (_, iter: Iterator[SparkDeletionVectors]) =>
+ val serializer = new CommitMessageSerializer
+ val grouped = iter
+ .reduce(
+ (sdv1, sdv2) =>
+ sdv1.copy(dataFileAndDeletionVector =
+ sdv1.dataFileAndDeletionVector ++
sdv2.dataFileAndDeletionVector))
+ commitDeletionVector(grouped, serializer)
+ }
+ }
+ serializedCommits
+ .collect()
+ .map(deserializeCommitMessage(serializer, _))
+ }
+
+ def buildCommitMessageFromIndexManifestEntry(
+ indexManifestEntries: Seq[IndexManifestEntry]): Seq[CommitMessage] = {
+ indexManifestEntries
+ .groupBy(entry => (entry.partition(), entry.bucket()))
+ .map {
+ case ((partition, bucket), entries) =>
+ val (added, removed) = entries.partition(_.kind() == FileKind.ADD)
+ new CommitMessageImpl(
+ partition,
+ bucket,
+ DataIncrement.emptyIncrement(),
+ CompactIncrement.emptyIncrement(),
+ new IndexIncrement(added.map(_.indexFile()).asJava,
removed.map(_.indexFile()).asJava))
+ }
+ .toSeq
+ }
+
def commit(commitMessages: Seq[CommitMessage]): Unit = {
val tableCommit = writeBuilder.newCommit()
try {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
index 8299f6b8c..1e0f1d6d3 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala
@@ -20,7 +20,8 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.io.DataFileMeta
-import org.apache.paimon.table.source.DataSplit
+import org.apache.paimon.spark.PaimonImplicits
+import org.apache.paimon.table.source.{DataSplit, DeletionFile}
import org.apache.paimon.utils.FileStorePathFactory
import scala.collection.JavaConverters._
@@ -29,7 +30,8 @@ case class SparkDataFileMeta(
partition: BinaryRow,
bucket: Int,
totalBuckets: Int,
- dataFileMeta: DataFileMeta) {
+ dataFileMeta: DataFileMeta,
+ deletionFile: Option[DeletionFile] = None) {
def relativePath(fileStorePathFactory: FileStorePathFactory): String = {
fileStorePathFactory
@@ -37,14 +39,30 @@ case class SparkDataFileMeta(
.toUri
.toString + "/" + dataFileMeta.fileName()
}
+
+ def toSparkDeletionFile: SparkDeletionFile = {
+ SparkDeletionFile(partition, bucket, deletionFile)
+ }
}
+case class SparkDeletionFile(partition: BinaryRow, bucket: Int, deletionFile:
Option[DeletionFile])
+
object SparkDataFileMeta {
def convertToSparkDataFileMeta(
dataSplit: DataSplit,
totalBuckets: Int): Seq[SparkDataFileMeta] = {
+ import PaimonImplicits._
+
+ val dvFactory =
+ DeletionFile.factory(dataSplit.dataFiles(),
dataSplit.deletionFiles().orElse(null))
dataSplit.dataFiles().asScala.map {
- file => SparkDataFileMeta(dataSplit.partition, dataSplit.bucket,
totalBuckets, file)
+ file =>
+ SparkDataFileMeta(
+ dataSplit.partition,
+ dataSplit.bucket,
+ totalBuckets,
+ file,
+ dvFactory.create(file.fileName()))
}
}
@@ -56,10 +74,14 @@ object SparkDataFileMeta {
.groupBy(file => (file.partition, file.bucket))
.map {
case ((partition, bucket), files) =>
+ val (dataFiles, deletionFiles) = files.map {
+ file => (file.dataFileMeta, file.deletionFile.orNull)
+ }.unzip
new DataSplit.Builder()
.withPartition(partition)
.withBucket(bucket)
- .withDataFiles(files.map(_.dataFileMeta).toList.asJava)
+ .withDataFiles(dataFiles.toList.asJava)
+ .withDataDeletionFiles(deletionFiles.toList.asJava)
.rawConvertible(rawConvertible)
.withBucketPath(pathFactory.bucketPath(partition, bucket).toString)
.build()
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
similarity index 55%
copy from
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
copy to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
index 1d447281e..5f6c8c347 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVectors.scala
@@ -18,14 +18,23 @@
package org.apache.paimon.spark.commands
-import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.types.RowType
+import org.apache.paimon.utils.{FileStorePathFactory, SerializationUtils}
-private[spark] trait WithFileStoreTable {
-
- def table: FileStoreTable
-
- def withPrimaryKeys: Boolean = !table.primaryKeys().isEmpty
-
- def rowType: RowType = table.rowType()
+/**
+ * This class will be used as Dataset's pattern type. So here use Array[Byte]
instead of BinaryRow
+ * or DeletionVector.
+ */
+case class SparkDeletionVectors(
+ partitionAndBucket: String,
+ partition: Array[Byte],
+ bucket: Int,
+ dataFileAndDeletionVector: Seq[(String, Array[Byte])]
+) {
+ def relativePaths(fileStorePathFactory: FileStorePathFactory): Seq[String] =
{
+ val prefix = fileStorePathFactory
+
.relativePartitionAndBucketPath(SerializationUtils.deserializeBinaryRow(partition),
bucket)
+ .toUri
+ .toString + "/"
+ dataFileAndDeletionVector.map(prefix + _._1)
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
index 1d447281e..5b2e704da 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WithFileStoreTable.scala
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.commands
+import org.apache.paimon.{CoreOptions, FileStore}
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.types.RowType
@@ -28,4 +29,10 @@ private[spark] trait WithFileStoreTable {
def withPrimaryKeys: Boolean = !table.primaryKeys().isEmpty
def rowType: RowType = table.rowType()
+
+ def coreOptions: CoreOptions = table.coreOptions()
+
+ def fileStore: FileStore[_] = table.store()
+
+ def deletionVectorsEnabled: Boolean = coreOptions.deletionVectorsEnabled()
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
index 74ce7090d..a82454d98 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/schema/PaimonMetadataColumn.scala
@@ -21,6 +21,7 @@ package org.apache.paimon.spark.schema
import org.apache.paimon.spark.SparkTypeUtils
import org.apache.paimon.types.DataField
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.connector.catalog.MetadataColumn
import org.apache.spark.sql.types.{DataType, LongType, StringType}
@@ -30,6 +31,10 @@ case class PaimonMetadataColumn(id: Int, override val name:
String, override val
def toPaimonDataField: DataField = {
new DataField(id, name, SparkTypeUtils.toPaimonType(dataType));
}
+
+ def toAttribute: AttributeReference = {
+ AttributeReference(name, dataType)()
+ }
}
object PaimonMetadataColumn {
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index f45a22cae..02d898f97 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -20,15 +20,139 @@ package org.apache.paimon.spark.sql
import org.apache.paimon.data.BinaryRow
import org.apache.paimon.deletionvectors.{DeletionVector,
DeletionVectorsMaintainer}
+import org.apache.paimon.fs.Path
import org.apache.paimon.spark.PaimonSparkTestBase
+import org.apache.paimon.table.FileStoreTable
import org.apache.spark.sql.Row
import org.junit.jupiter.api.Assertions
+import scala.collection.JavaConverters._
import scala.util.Random
class DeletionVectorTest extends PaimonSparkTestBase {
+ import testImplicits._
+
+ bucketModes.foreach {
+ bucket =>
+ test(
+ s"Paimon DeletionVector: delete for append non-partitioned table with
bucket = $bucket") {
+ withTable("T") {
+ val bucketKey = if (bucket > 1) {
+ ", 'bucket-key' = 'id'"
+ } else {
+ ""
+ }
+ spark.sql(
+ s"""
+ |CREATE TABLE T (id INT, name STRING)
+ |TBLPROPERTIES ('deletion-vectors.enabled' = 'true', 'bucket' =
'$bucket' $bucketKey)
+ |""".stripMargin)
+
+ val table = loadTable("T")
+ val dvMaintainerFactory =
+ new
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+
+ spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b')")
+ val deletionVectors1 = getLatestDeletionVectors(table,
dvMaintainerFactory)
+ Assertions.assertEquals(0, deletionVectors1.size)
+
+ val cond1 = "id = 2"
+ val rowMetaInfo1 = getFilePathAndRowIndex(cond1)
+ spark.sql(s"DELETE FROM T WHERE $cond1")
+ checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(1, "a")
:: Nil)
+ val deletionVectors2 = getLatestDeletionVectors(table,
dvMaintainerFactory)
+ Assertions.assertEquals(1, deletionVectors2.size)
+ deletionVectors2
+ .foreach {
+ case (filePath, dv) =>
+ rowMetaInfo1(filePath).foreach(index =>
Assertions.assertTrue(dv.isDeleted(index)))
+ }
+
+ spark.sql("INSERT INTO T VALUES (2, 'bb'), (3, 'c'), (4, 'd')")
+ checkAnswer(
+ spark.sql(s"SELECT * from T ORDER BY id"),
+ Row(1, "a") :: Row(2, "bb") :: Row(3, "c") :: Row(4, "d") :: Nil)
+ val deletionVectors3 = getLatestDeletionVectors(table,
dvMaintainerFactory)
+ Assertions.assertTrue(deletionVectors2 == deletionVectors3)
+
+ val cond2 = "id % 2 = 1"
+ spark.sql(s"DELETE FROM T WHERE $cond2")
+ checkAnswer(spark.sql(s"SELECT * from T ORDER BY id"), Row(2, "bb")
:: Row(4, "d") :: Nil)
+ }
+ }
+ }
+
+ bucketModes.foreach {
+ bucket =>
+ test(s"Paimon DeletionVector: delete for append partitioned table with
bucket = $bucket") {
+ withTable("T") {
+ val bucketKey = if (bucket > 1) {
+ ", 'bucket-key' = 'id'"
+ } else {
+ ""
+ }
+ spark.sql(
+ s"""
+ |CREATE TABLE T (id INT, name STRING, pt STRING)
+ |PARTITIONED BY(pt)
+ |TBLPROPERTIES ('deletion-vectors.enabled' = 'true', 'bucket' =
'$bucket' $bucketKey)
+ |""".stripMargin)
+
+ val table = loadTable("T")
+ val dvMaintainerFactory =
+ new
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+
+ spark.sql(
+ "INSERT INTO T VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c',
'2025'), (4, 'd', '2025')")
+ val deletionVectors1 = getLatestDeletionVectors(table,
dvMaintainerFactory)
+ Assertions.assertEquals(0, deletionVectors1.size)
+
+ val cond1 = "id = 2"
+ val rowMetaInfo1 = getFilePathAndRowIndex(cond1)
+ spark.sql(s"DELETE FROM T WHERE $cond1")
+ checkAnswer(
+ spark.sql(s"SELECT * from T ORDER BY id"),
+ Row(1, "a", "2024") :: Row(3, "c", "2025") :: Row(4, "d", "2025")
:: Nil)
+ val deletionVectors2 =
+ getLatestDeletionVectors(
+ table,
+ dvMaintainerFactory,
+ Seq(BinaryRow.singleColumn("2024"),
BinaryRow.singleColumn("2025")))
+ Assertions.assertEquals(1, deletionVectors2.size)
+ deletionVectors2
+ .foreach {
+ case (filePath, dv) =>
+ rowMetaInfo1(filePath).foreach(index =>
Assertions.assertTrue(dv.isDeleted(index)))
+ }
+
+ val cond2 = "id = 3"
+ val rowMetaInfo2 = rowMetaInfo1 ++ getFilePathAndRowIndex(cond2)
+ spark.sql(s"DELETE FROM T WHERE $cond2")
+ checkAnswer(
+ spark.sql(s"SELECT * from T ORDER BY id"),
+ Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
+ val deletionVectors3 =
+ getLatestDeletionVectors(
+ table,
+ dvMaintainerFactory,
+ Seq(BinaryRow.singleColumn("2024")))
+ Assertions.assertTrue(deletionVectors2 == deletionVectors3)
+ val deletionVectors4 =
+ getLatestDeletionVectors(
+ table,
+ dvMaintainerFactory,
+ Seq(BinaryRow.singleColumn("2024"),
BinaryRow.singleColumn("2025")))
+ deletionVectors4
+ .foreach {
+ case (filePath, dv) =>
+ rowMetaInfo2(filePath).foreach(index =>
Assertions.assertTrue(dv.isDeleted(index)))
+ }
+ }
+ }
+ }
+
test("Paimon deletionVector: deletion vector write verification") {
withTable("T") {
spark.sql(s"""
@@ -54,27 +178,20 @@ class DeletionVectorTest extends PaimonSparkTestBase {
val dvMaintainerFactory =
new
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
- def restoreDeletionVectors(): java.util.Map[String, DeletionVector] = {
- dvMaintainerFactory
- .createOrRestore(table.snapshotManager().latestSnapshotId(),
BinaryRow.EMPTY_ROW, 0)
- .deletionVectors()
- }
-
- val deletionVectors1 = restoreDeletionVectors()
+ val deletionVectors1 = getLatestDeletionVectors(table,
dvMaintainerFactory)
// 1, 3 deleted, their row positions are 0, 2
- Assertions.assertEquals(1, deletionVectors1.size())
+ Assertions.assertEquals(1, deletionVectors1.size)
deletionVectors1
- .entrySet()
- .forEach(
- e => {
- Assertions.assertTrue(e.getValue.isDeleted(0))
- Assertions.assertTrue(e.getValue.isDeleted(2))
- })
+ .foreach {
+ case (_, dv) =>
+ Assertions.assertTrue(dv.isDeleted(0))
+ Assertions.assertTrue(dv.isDeleted(2))
+ }
// Compact
// f3 (1, 2, 3), no deletion
spark.sql("CALL sys.compact('T')")
- val deletionVectors2 = restoreDeletionVectors()
+ val deletionVectors2 = getLatestDeletionVectors(table,
dvMaintainerFactory)
// After compaction, deletionVectors should be empty
Assertions.assertTrue(deletionVectors2.isEmpty)
@@ -86,15 +203,14 @@ class DeletionVectorTest extends PaimonSparkTestBase {
spark.sql(s"SELECT * from T ORDER BY id"),
Row(1, "a_new1") :: Row(2, "b_new2") :: Row(3, "c_new1") :: Nil)
- val deletionVectors3 = restoreDeletionVectors()
+ val deletionVectors3 = getLatestDeletionVectors(table,
dvMaintainerFactory)
// 2 deleted, row positions is 1
- Assertions.assertEquals(1, deletionVectors3.size())
+ Assertions.assertEquals(1, deletionVectors3.size)
deletionVectors3
- .entrySet()
- .forEach(
- e => {
- Assertions.assertTrue(e.getValue.isDeleted(1))
- })
+ .foreach {
+ case (_, dv) =>
+ Assertions.assertTrue(dv.isDeleted(1))
+ }
}
}
@@ -186,4 +302,36 @@ class DeletionVectorTest extends PaimonSparkTestBase {
checkResult(dvTbl, resultTbl)
}
}
+
+ private def getPathName(path: String): String = {
+ new Path(path).getName
+ }
+
+ private def getLatestDeletionVectors(
+ table: FileStoreTable,
+ dvMaintainerFactory: DeletionVectorsMaintainer.Factory): Map[String,
DeletionVector] = {
+ getLatestDeletionVectors(table, dvMaintainerFactory,
Seq(BinaryRow.EMPTY_ROW))
+ }
+
+ private def getLatestDeletionVectors(
+ table: FileStoreTable,
+ dvMaintainerFactory: DeletionVectorsMaintainer.Factory,
+ partitions: Seq[BinaryRow]): Map[String, DeletionVector] = {
+ partitions.flatMap {
+ partition =>
+ dvMaintainerFactory
+ .createOrRestore(table.snapshotManager().latestSnapshotId(),
partition)
+ .deletionVectors()
+ .asScala
+ }.toMap
+ }
+
+ private def getFilePathAndRowIndex(condition: String): Map[String,
Array[Long]] = {
+ spark
+ .sql(s"SELECT __paimon_file_path, __paimon_row_index FROM T WHERE
$condition ORDER BY id")
+ .as[(String, Long)]
+ .collect()
+ .groupBy(_._1)
+ .map(kv => (getPathName(kv._1), kv._2.map(_._2)))
+ }
}