YannByron commented on code in PR #2987:
URL: https://github.com/apache/fluss/pull/2987#discussion_r3042807422
##########
fluss-spark/fluss-spark-common/src/main/scala/org/apache/fluss/spark/read/FlussUpsertPartitionReader.scala:
##########
@@ -99,9 +99,13 @@ class FlussUpsertPartitionReader(
private def createSortMergeReader(): SortMergeReader = {
// Create key encoder for primary keys
+ val pkIndexes = tableInfo.getSchema.getPrimaryKeyIndexes
+ val pkFields = new java.util.ArrayList[DataField]()
+ pkIndexes.foreach(i => pkFields.add(rowType.getFields.get(i)))
+ val pkRowType = new RowType(pkFields)
val keyEncoder =
encode.KeyEncoder.ofPrimaryKeyEncoder(
- rowType,
Review Comment:
I think what is needed here is the row type of input data, not just the row
type of primary key.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]