SML0127 commented on code in PR #1549:
URL: https://github.com/apache/fluss/pull/1549#discussion_r2285575170
##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java:
##########
@@ -449,11 +455,57 @@ && hasPrimaryKey()
getPartitionKeyTypes(),
acceptedFilters,
remainingFilters,
- FLINK_INTERNAL_VALUE);
+ FLUSS_INTERNAL_VALUE);
+
// partitions are filtered by string representations, convert the
equals to string first
- fieldEquals = stringifyFieldEquals(fieldEquals);
+ partitionFilters = stringifyFieldEquals(fieldEquals);
+
+ // lake source is not null
+ if (lakeSource != null) {
+ // and exist field equals, push down to lake source
+ if (!fieldEquals.isEmpty()) {
+
+ // convert flink row type to fluss row type
+ RowType flussRowType =
FlinkConversions.toFlussRowType(tableOutputType);
+ List<Predicate> lakePredicates = new ArrayList<>();
+
+ for (FieldEqual fieldEqual : fieldEquals) {
+ int idx = fieldEqual.fieldIndex;
+ String fieldName =
tableOutputType.getFieldNames().get(idx);
+ com.alibaba.fluss.types.DataType flussDataType =
+ flussRowType.getTypeAt(idx);
+
+ // convert to Paimon literal for partition
+ Object literal =
+ toPaimonLiteralForPartition(flussDataType,
fieldEqual.equalValue);
+
+ if (literal == null) {
+ continue;
+ }
+
+ lakePredicates.add(
+ new LeafPredicate(
+ Equal.INSTANCE,
+ flussDataType,
+ idx,
+ fieldName,
+ Collections.singletonList(literal)));
+ }
+
+ if (!lakePredicates.isEmpty()) {
+ final LakeSource.FilterPushDownResult
filterPushDownResult =
+ lakeSource.withFilters(lakePredicates);
+ if (filterPushDownResult.acceptedPredicates().size()
+ != lakePredicates.size()) {
+ LOG.warn("Some partition filters are not accepted
by the lake source");
Review Comment:
Thank you! It’s a point I hadn’t considered at all. I’ve updated it to pass
no accepted filters back to Flink as you suggested.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]