steFaiz commented on code in PR #8175:
URL: https://github.com/apache/paimon/pull/8175#discussion_r3393206852


##########
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala:
##########
@@ -351,19 +352,123 @@ case class MergeIntoPaimonDataEvolutionTable(
       .map { case (_, attrs) => attrs.head }
       .toSeq
 
-    val assignments = metadataColumns.map(column => Assignment(column, column))
-    val output = updateColumnsSorted ++ metadataColumns
+    // Find raw blob update columns and avoid reading them from target table
+    val blobInlineFields = table.coreOptions().blobInlineField().asScala.toSet
+    val rawBlobFieldNames = table
+      .rowType()
+      .getFields
+      .asScala
+      .filter(
+        field =>
+          field.`type`().is(BLOB) &&
+            !blobInlineFields.exists(inlineField => resolver(inlineField, 
field.name())))
+      .map(_.name())
+      .toSet
+
+    def isRawBlobUpdateColumn(attr: AttributeReference): Boolean = {
+      rawBlobFieldNames.exists(rawBlobFieldName => resolver(rawBlobFieldName, 
attr.name))
+    }
+
+    // The final output is composed by updated columns, metadata columns and 
blob marker columns.
+    // Marker columns are used to mark whether a blob field should be written 
with placeholder
+    val rawBlobUpdateColumns = 
updateColumnsSorted.filter(isRawBlobUpdateColumn)
+    val rawBlobMarkerNamesByColumn = rawBlobUpdateColumns.zipWithIndex.map {

Review Comment:
   Thanks! Fixed, now picking new names will loop and increment the index util 
find some non-existing columns



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to