This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 5625e47e48dd6a26002c7b20d1dad4a26f97a7fb Author: Zouxxyy <[email protected]> AuthorDate: Wed Oct 29 10:26:11 2025 +0800 [spark] Fix write non-pk dv table with external paths (#6487) --- .../spark/commands/PaimonRowLevelCommand.scala | 31 ++++++-------------- .../paimon/spark/commands/PaimonSparkWriter.scala | 5 ++-- .../paimon/spark/commands/SparkDataFileMeta.scala | 13 +++++---- .../spark/commands/SparkDeletionVector.scala | 23 ++++----------- .../spark/commands/UpdatePaimonTableCommand.scala | 7 ++--- .../paimon/spark/sql/DeletionVectorTest.scala | 33 +++++++++++++++++++++- 6 files changed, 60 insertions(+), 52 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala index 300ca20ff8..11dd947009 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonRowLevelCommand.scala @@ -71,12 +71,6 @@ trait PaimonRowLevelCommand } } - /** Gets a relative path against the table path. */ - protected def relativePath(absolutePath: String): String = { - val location = table.location().toUri - location.relativize(new URI(absolutePath)).toString - } - protected def findCandidateDataSplits( condition: Expression, output: Seq[Attribute]): Seq[DataSplit] = { @@ -121,7 +115,6 @@ trait PaimonRowLevelCommand .distinct() .as[String] .collect() - .map(relativePath) } protected def extractFilesAndCreateNewScan( @@ -136,15 +129,14 @@ trait PaimonRowLevelCommand (files, newRelation) } - /** Notice that, the key is a relative path, not just the file name. */ + /** Notice that, the key is a file path, not just the file name. */ protected def candidateFileMap( candidateDataSplits: Seq[DataSplit]): Map[String, SparkDataFileMeta] = { val totalBuckets = coreOptions.bucket() val candidateDataFiles = candidateDataSplits .flatMap(dataSplit => convertToSparkDataFileMeta(dataSplit, totalBuckets)) - val fileStorePathFactory = fileStore.pathFactory() candidateDataFiles - .map(file => (file.relativePath(fileStorePathFactory), file)) + .map(file => (file.filePath(), file)) .toMap } @@ -165,11 +157,12 @@ trait PaimonRowLevelCommand dataset: Dataset[Row], sparkSession: SparkSession): Dataset[SparkDeletionVector] = { import sparkSession.implicits._ - val dataFileToPartitionAndBucket = - dataFilePathToMeta.mapValues(meta => (meta.partition, meta.bucket)).toArray + // convert to a serializable map + val dataFileToPartitionAndBucket = dataFilePathToMeta.map { + case (k, v) => k -> (v.bucketPath, v.partition, v.bucket) + } val my_table = table - val location = my_table.location val dvBitmap64 = my_table.coreOptions().deletionVectorBitmap64() dataset .select(DV_META_COLUMNS.map(col): _*) @@ -183,18 +176,12 @@ trait PaimonRowLevelCommand dv.delete(iter.next()._2) } - val relativeFilePath = location.toUri.relativize(new URI(filePath)).toString - val (partition, bucket) = dataFileToPartitionAndBucket.toMap.apply(relativeFilePath) - val pathFactory = my_table.store().pathFactory() - val relativeBucketPath = pathFactory - .relativeBucketPath(partition, bucket) - .toString - + val (bucketPath, partition, bucket) = dataFileToPartitionAndBucket.apply(filePath) SparkDeletionVector( - relativeBucketPath, + bucketPath, SerializationUtils.serializeBinaryRow(partition), bucket, - new Path(filePath).getName, + filePath, DeletionVector.serializeToBytes(dv) ) } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index 6d0563b364..ee3104e9b3 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -25,6 +25,7 @@ import org.apache.paimon.crosspartition.{IndexBootstrap, KeyPartOrRow} import org.apache.paimon.data.serializer.InternalSerializers import org.apache.paimon.deletionvectors.DeletionVector import org.apache.paimon.deletionvectors.append.BaseAppendDeleteFileMaintainer +import org.apache.paimon.fs.Path import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner} import org.apache.paimon.io.{CompactIncrement, DataIncrement} import org.apache.paimon.manifest.FileKind @@ -310,7 +311,7 @@ case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean = val sparkSession = deletionVectors.sparkSession import sparkSession.implicits._ val serializedCommits = deletionVectors - .groupByKey(_.partitionAndBucket) + .groupByKey(_.bucketPath) .mapGroups { (_, iter: Iterator[SparkDeletionVector]) => val indexHandler = table.store().newIndexFileHandler() @@ -334,7 +335,7 @@ case class PaimonSparkWriter(table: FileStoreTable, writeRowTracking: Boolean = } dvIndexFileMaintainer.notifyNewDeletionVector( - sdv.dataFileName, + new Path(sdv.dataFilePath).getName, DeletionVector.deserializeFromBytes(sdv.deletionVector)) } val indexEntries = dvIndexFileMaintainer.persist() diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala index 569a84a74c..921d2e4735 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDataFileMeta.scala @@ -27,17 +27,19 @@ import org.apache.paimon.utils.FileStorePathFactory import scala.collection.JavaConverters._ case class SparkDataFileMeta( + bucketPath: String, partition: BinaryRow, bucket: Int, totalBuckets: Int, dataFileMeta: DataFileMeta, deletionFile: Option[DeletionFile] = None) { - def relativePath(fileStorePathFactory: FileStorePathFactory): String = { - fileStorePathFactory - .relativeBucketPath(partition, bucket) - .toUri - .toString + "/" + dataFileMeta.fileName() + def filePath(): String = { + if (dataFileMeta.externalPath().isPresent) { + dataFileMeta.externalPath().get() + } else { + bucketPath + "/" + dataFileMeta.fileName() + } } } @@ -52,6 +54,7 @@ object SparkDataFileMeta { dataSplit.dataFiles().asScala.map { file => SparkDataFileMeta( + dataSplit.bucketPath(), dataSplit.partition, dataSplit.bucket, totalBuckets, diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVector.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVector.scala index 9fc7fdadcb..53453af913 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVector.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/SparkDeletionVector.scala @@ -18,9 +18,8 @@ package org.apache.paimon.spark.commands -import org.apache.paimon.fs.Path import org.apache.paimon.table.source.DataSplit -import org.apache.paimon.utils.{FileStorePathFactory, SerializationUtils} +import org.apache.paimon.utils.SerializationUtils import scala.collection.JavaConverters._ @@ -29,31 +28,21 @@ import scala.collection.JavaConverters._ * or DeletionVector. */ case class SparkDeletionVector( - partitionAndBucket: String, + bucketPath: String, partition: Array[Byte], bucket: Int, - dataFileName: String, + dataFilePath: String, deletionVector: Array[Byte] -) { - def relativePath(pathFactory: FileStorePathFactory): String = { - val prefix = pathFactory - .relativeBucketPath(SerializationUtils.deserializeBinaryRow(partition), bucket) - .toUri - .toString + "/" - prefix + dataFileName - } -} +) object SparkDeletionVector { def toDataSplit( deletionVector: SparkDeletionVector, - root: Path, - pathFactory: FileStorePathFactory, dataFilePathToMeta: Map[String, SparkDataFileMeta]): DataSplit = { - val meta = dataFilePathToMeta(deletionVector.relativePath(pathFactory)) + val meta = dataFilePathToMeta(deletionVector.dataFilePath) DataSplit .builder() - .withBucketPath(root + "/" + deletionVector.partitionAndBucket) + .withBucketPath(deletionVector.bucketPath) .withPartition(SerializationUtils.deserializeBinaryRow(deletionVector.partition)) .withBucket(deletionVector.bucket) .withTotalBuckets(meta.totalBuckets) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala index 8839d5c8ac..1258ebc449 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala @@ -81,8 +81,6 @@ case class UpdatePaimonTableCommand( logDebug("No file need to rewrote. It's an empty Commit.") Seq.empty[CommitMessage] } else { - val pathFactory = fileStore.pathFactory() - if (deletionVectorsEnabled) { // Step2: collect all the deletion vectors that marks the deleted rows. val deletionVectors = collectDeletionVectors( @@ -95,9 +93,8 @@ case class UpdatePaimonTableCommand( deletionVectors.cache() try { // Step3: write these updated data - val touchedDataSplits = deletionVectors.collect().map { - SparkDeletionVector.toDataSplit(_, root, pathFactory, dataFilePathToMeta) - } + val touchedDataSplits = + deletionVectors.collect().map(SparkDeletionVector.toDataSplit(_, dataFilePathToMeta)) val addCommitMessage = writeOnlyUpdatedData(sparkSession, touchedDataSplits) // Step4: write these deletion vectors. diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala index 68e741fb13..e3a5896ab1 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala @@ -660,7 +660,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe assert(dvMeta.cardinality() == 334) } - test("Paimon deletionVector: delete from non-pk table with data file path") { + test("Paimon deletionVector: delete from non-pk table with data file directory") { sql(s""" |CREATE TABLE T (id INT) |TBLPROPERTIES ( @@ -677,6 +677,37 @@ class DeletionVectorTest extends PaimonSparkTestBase with AdaptiveSparkPlanHelpe checkAnswer(sql("SELECT count(*) FROM T"), Row(49665)) } + test("Paimon deletionVector: delete from non-pk table with data file external paths") { + withTempDir { + tmpDir => + { + sql(s""" + |CREATE TABLE T (id INT, v INT) + |TBLPROPERTIES ( + | 'deletion-vectors.enabled' = 'true', + | 'deletion-vectors.bitmap64' = '${Random.nextBoolean()}', + | 'bucket-key' = 'id', + | 'bucket' = '1', + | 'data-file.external-paths' = 'file://${tmpDir.getCanonicalPath}', + | 'data-file.external-paths.strategy' = 'round-robin' + |) + |""".stripMargin) + sql("INSERT INTO T SELECT /*+ REPARTITION(1) */ id, id FROM range (1, 50000)") + sql("DELETE FROM T WHERE id >= 111 and id <= 444") + checkAnswer(sql("SELECT count(*) FROM T"), Row(49665)) + checkAnswer(sql("SELECT sum(v) FROM T"), Row(1249882315L)) + + sql("UPDATE T SET v = v + 1 WHERE id >= 555 and id <= 666") + checkAnswer(sql("SELECT count(*) FROM T"), Row(49665)) + checkAnswer(sql("SELECT sum(v) FROM T"), Row(1249882427L)) + + sql("UPDATE T SET v = v + 1 WHERE id >= 600 and id <= 800") + checkAnswer(sql("SELECT count(*) FROM T"), Row(49665)) + checkAnswer(sql("SELECT sum(v) FROM T"), Row(1249882628L)) + } + } + } + test("Paimon deletionVector: work v1 with v2") { sql(s""" |CREATE TABLE T (id INT)
