Zouxxyy commented on code in PR #6697:
URL: https://github.com/apache/paimon/pull/6697#discussion_r2634021745
##########
paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala:
##########
@@ -71,7 +77,28 @@ abstract class PaimonBaseScanBuilder
filter =>
val predicate = converter.convertIgnoreFailure(filter)
if (predicate == null) {
- postScan.append(filter)
+ val rowTypeWithRowId = new RowType(
+ false,
+ Collections.singletonList(new DataField(-1, ROW_ID.name(),
DataTypes.BIGINT())))
+ val converterWithRowId = new SparkFilterConverter(rowTypeWithRowId)
+ val newPredicate = converterWithRowId.convertIgnoreFailure(filter)
Review Comment:
1. It should pass the filter containing _ROW_ID as `pushedDataFilters` to
Paimon, while still adding it to the postFilter for Spark to handle. Therefore,
almost no changes are needed in Paimon's Spark connector—except possibly update
`new SparkFilterConverter(rowType)` with requiredSchema or rowType with row id,
and ensuring that `splitPartitionPredicatesAndDataPredicates` can correctly
recognize this filter.
2. Yes, that’s exactly where I intend to put it. CC @JingsongLi
```scala
scan.withFilter(filter)
.withReadType(readType)
.withPartitionFilter(partitionFilter)
// calculate rowRanges from filter
.withRowRanges(rowRanges);
```
I believe users would much prefer writing filters that include _ROW_ID when
using the ReadBuilder API, rather than having to understand what List<Range>
rowRanges is and convert to it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]