amogh-jahagirdar commented on code in PR #7558:
URL: https://github.com/apache/iceberg/pull/7558#discussion_r1195535191
##########
spark/v3.4/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala:
##########
@@ -148,24 +138,27 @@ case class MergeRowsExec(
}
}
- var lastMatchedRowId: InternalRow = null
+ val rowIdAttrOrdinal = if (performCardinalityCheck) {
+ child.output.indexWhere(attr => conf.resolver(attr.name, ROW_ID))
+ } else {
+ -1
+ }
+ val matchedRowIds = new Roaring64Bitmap()
def processRowWithCardinalityCheck(inputRow: InternalRow): InternalRow = {
val isSourceRowPresent = isSourceRowPresentPred.eval(inputRow)
val isTargetRowPresent = isTargetRowPresentPred.eval(inputRow)
if (isSourceRowPresent && isTargetRowPresent) {
- val currentRowId = rowIdProj.apply(inputRow)
- if (currentRowId == lastMatchedRowId) {
+ val currentRowId = inputRow.getLong(rowIdAttrOrdinal)
+ if (matchedRowIds.contains(currentRowId)) {
Review Comment:
Should we have a sanity check that `rowIdAttrOrdinal` is not -1 at this
point? Should not be since we only hit this case when we want to perform the
cardinality check but just to make sure that there's a valid row ID at this
point, and in case there's not a clear error message is surfaced.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]