Zouxxyy commented on code in PR #4001:
URL: https://github.com/apache/paimon/pull/4001#discussion_r1723556265


##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala:
##########
@@ -81,61 +87,116 @@ case class MergeIntoPaimonTable(
 
   private def performMergeForPkTable(sparkSession: SparkSession): 
Seq[CommitMessage] = {
     writer.write(
-      constructChangedRows(sparkSession, createDataset(sparkSession, 
filteredTargetPlan)))
+      constructChangedRows(
+        sparkSession,
+        createDataset(sparkSession, filteredTargetPlan),
+        remainDeletedRow = true))
   }
 
   private def performMergeForNonPkTable(sparkSession: SparkSession): 
Seq[CommitMessage] = {
     val targetDS = createDataset(sparkSession, filteredTargetPlan)
     val sourceDS = createDataset(sparkSession, sourceTable)
 
-    val targetFilePaths: Array[String] = findTouchedFiles(targetDS, 
sparkSession)
-
-    val touchedFilePathsSet = mutable.Set.empty[String]
-    def hasUpdate(actions: Seq[MergeAction]): Boolean = {
-      actions.exists {
-        case _: UpdateAction | _: DeleteAction => true
-        case _ => false
+    // Step1: get the candidate data splits which are filtered by Paimon 
Predicate.
+    val candidateDataSplits =
+      findCandidateDataSplits(targetOnlyCondition.getOrElse(TrueLiteral), 
relation.output)
+    val dataFilePathToMeta = candidateFileMap(candidateDataSplits)
+
+    if (deletionVectorsEnabled) {
+      // Step2: generate dataset that should contains ROW_KIND, FILE_PATH, 
ROW_INDEX columns
+      val metadataCols = Seq(FILE_PATH, ROW_INDEX)
+      val filteredRelation = createDataset(
+        sparkSession,
+        createNewScanPlan(
+          candidateDataSplits,
+          targetOnlyCondition.getOrElse(TrueLiteral),
+          relation,
+          metadataCols))
+      val ds = constructChangedRows(
+        sparkSession,
+        filteredRelation,
+        remainDeletedRow = true,
+        metadataCols = metadataCols)
+
+      ds.cache()
+      try {
+        val rowKindAttribute = ds.queryExecution.analyzed.output
+          .find(attr => sparkSession.sessionState.conf.resolver(attr.name, 
ROW_KIND_COL))
+          .getOrElse(throw new RuntimeException("Can not find _row_kind_ 
column."))
+
+        // Step3: filter rows that should be marked as DELETED in Deletion 
Vector mode.
+        val toDeleteRowsFilter = Or(
+          EqualTo(rowKindAttribute, Literal(RowKind.DELETE.toByteValue)),
+          EqualTo(rowKindAttribute, Literal(RowKind.UPDATE_AFTER.toByteValue)))
+        val dvDS = ds.where(new Column(toDeleteRowsFilter))

Review Comment:
   can be simplified to
   
   ```scala
   val dvDS = ds.where(s"$ROW_KIND_COL = ${RowKind.DELETE.toByteValue} or 
$ROW_KIND_COL = ${RowKind.UPDATE_AFTER.toByteValue}")
   ```



##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala:
##########
@@ -261,19 +330,20 @@ object MergeIntoPaimonTable {
     }
 
     private def fromTouchedFile(row: InternalRow): Boolean = {
-      file_touched_col_index != -1 && row.getBoolean(file_touched_col_index)
+      fileTouchedColumnIndex != -1 && row.getBoolean(fileTouchedColumnIndex)
     }
 
     private def unusedRow(row: InternalRow): Boolean = {
-      row.getByte(row_kind_col_index) == NOOP_ROW_KIND_VALUE
+      row.getByte(rowKindColumnIndex) == NOOP_ROW_KIND_VALUE
     }
 
     def processPartition(rowIterator: Iterator[Row]): Iterator[Row] = {
       val targetRowHasNoMatchPred = generatePredicate(targetRowHasNoMatch)
       val sourceRowHasNoMatchPred = generatePredicate(sourceRowHasNoMatch)
       val matchedPreds = matchedConditions.map(generatePredicate)
       val matchedProjs = matchedOutputs.map(generateProjection)
-      val notMatchedBySourcePreds = 
notMatchedBySourceConditions.map(generatePredicate)
+      val notMatchedBySourcePreds: Seq[BasePredicate] =

Review Comment:
   this can be reverted



##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala:
##########
@@ -81,61 +87,116 @@ case class MergeIntoPaimonTable(
 
   private def performMergeForPkTable(sparkSession: SparkSession): 
Seq[CommitMessage] = {
     writer.write(
-      constructChangedRows(sparkSession, createDataset(sparkSession, 
filteredTargetPlan)))
+      constructChangedRows(
+        sparkSession,
+        createDataset(sparkSession, filteredTargetPlan),
+        remainDeletedRow = true))
   }
 
   private def performMergeForNonPkTable(sparkSession: SparkSession): 
Seq[CommitMessage] = {
     val targetDS = createDataset(sparkSession, filteredTargetPlan)
     val sourceDS = createDataset(sparkSession, sourceTable)
 
-    val targetFilePaths: Array[String] = findTouchedFiles(targetDS, 
sparkSession)
-
-    val touchedFilePathsSet = mutable.Set.empty[String]
-    def hasUpdate(actions: Seq[MergeAction]): Boolean = {
-      actions.exists {
-        case _: UpdateAction | _: DeleteAction => true
-        case _ => false
+    // Step1: get the candidate data splits which are filtered by Paimon 
Predicate.
+    val candidateDataSplits =
+      findCandidateDataSplits(targetOnlyCondition.getOrElse(TrueLiteral), 
relation.output)
+    val dataFilePathToMeta = candidateFileMap(candidateDataSplits)
+
+    if (deletionVectorsEnabled) {
+      // Step2: generate dataset that should contains ROW_KIND, FILE_PATH, 
ROW_INDEX columns
+      val metadataCols = Seq(FILE_PATH, ROW_INDEX)
+      val filteredRelation = createDataset(
+        sparkSession,
+        createNewScanPlan(
+          candidateDataSplits,
+          targetOnlyCondition.getOrElse(TrueLiteral),
+          relation,
+          metadataCols))
+      val ds = constructChangedRows(
+        sparkSession,
+        filteredRelation,
+        remainDeletedRow = true,
+        metadataCols = metadataCols)
+
+      ds.cache()
+      try {
+        val rowKindAttribute = ds.queryExecution.analyzed.output
+          .find(attr => sparkSession.sessionState.conf.resolver(attr.name, 
ROW_KIND_COL))
+          .getOrElse(throw new RuntimeException("Can not find _row_kind_ 
column."))
+
+        // Step3: filter rows that should be marked as DELETED in Deletion 
Vector mode.
+        val toDeleteRowsFilter = Or(
+          EqualTo(rowKindAttribute, Literal(RowKind.DELETE.toByteValue)),
+          EqualTo(rowKindAttribute, Literal(RowKind.UPDATE_AFTER.toByteValue)))
+        val dvDS = ds.where(new Column(toDeleteRowsFilter))
+        val deletionVectors = collectDeletionVectors(dataFilePathToMeta, dvDS, 
sparkSession)
+        val indexCommitMsg = writer.persistDeletionVectors(deletionVectors)
+
+        // Step4: filter rows that should be written as the inserted/updated 
data.
+        val toWriteRowsFilter = Or(
+          EqualTo(rowKindAttribute, Literal(RowKind.INSERT.toByteValue)),
+          EqualTo(rowKindAttribute, Literal(RowKind.UPDATE_AFTER.toByteValue)))
+        val toWriteDS = ds

Review Comment:
   simplified too



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to