luoyuxia commented on code in PR #1549:
URL: https://github.com/apache/fluss/pull/1549#discussion_r2281175629


##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/lake/LakeSplitGenerator.java:
##########
@@ -73,13 +79,16 @@ public LakeSplitGenerator(
             LakeSource<LakeSplit> lakeSource,
             OffsetsInitializer.BucketOffsetsRetriever bucketOffsetsRetriever,
             OffsetsInitializer stoppingOffsetInitializer,
-            int bucketCount) {
+            int bucketCount,
+            List<FieldEqual> partitionFilters) {

Review Comment:
   We don't need to modify `LakeSplitGenerator`, if we already push filter to 
`LakeSource`. The splits generate by ``LakeSource` should always contain the 
splits match the partition filter.



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java:
##########
@@ -449,11 +455,57 @@ && hasPrimaryKey()
                             getPartitionKeyTypes(),
                             acceptedFilters,
                             remainingFilters,
-                            FLINK_INTERNAL_VALUE);
+                            FLUSS_INTERNAL_VALUE);
+
             // partitions are filtered by string representations, convert the 
equals to string first
-            fieldEquals = stringifyFieldEquals(fieldEquals);
+            partitionFilters = stringifyFieldEquals(fieldEquals);
+
+            // lake source is not null
+            if (lakeSource != null) {
+                // and exist field equals, push down to lake source
+                if (!fieldEquals.isEmpty()) {
+
+                    // convert flink row type to fluss row type
+                    RowType flussRowType = 
FlinkConversions.toFlussRowType(tableOutputType);
+                    List<Predicate> lakePredicates = new ArrayList<>();
+
+                    for (FieldEqual fieldEqual : fieldEquals) {
+                        int idx = fieldEqual.fieldIndex;
+                        String fieldName = 
tableOutputType.getFieldNames().get(idx);
+                        com.alibaba.fluss.types.DataType flussDataType =
+                                flussRowType.getTypeAt(idx);
+
+                        // convert to Paimon literal for partition
+                        Object literal =
+                                toPaimonLiteralForPartition(flussDataType, 
fieldEqual.equalValue);
+
+                        if (literal == null) {
+                            continue;
+                        }
+
+                        lakePredicates.add(
+                                new LeafPredicate(
+                                        Equal.INSTANCE,
+                                        flussDataType,
+                                        idx,
+                                        fieldName,
+                                        Collections.singletonList(literal)));
+                    }
+
+                    if (!lakePredicates.isEmpty()) {
+                        final LakeSource.FilterPushDownResult 
filterPushDownResult =
+                                lakeSource.withFilters(lakePredicates);
+                        if (filterPushDownResult.acceptedPredicates().size()
+                                != lakePredicates.size()) {
+                            LOG.warn("Some partition filters are not accepted 
by the lake source");

Review Comment:
   If the lake source can't accept this predicate, we shouldn't consider this 
filter i accepted by Fluss, otherwise, it'll cause data wrong.  
   For example, it'll still read the data that doesn't match the predicates, 
but the flink mistake it as pushed down and won't filter it in Flink side, so 
that's data wrong
   ```
   return Result.of(Collections.emptyList(), filters);
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java:
##########
@@ -449,11 +455,57 @@ && hasPrimaryKey()
                             getPartitionKeyTypes(),
                             acceptedFilters,
                             remainingFilters,
-                            FLINK_INTERNAL_VALUE);
+                            FLUSS_INTERNAL_VALUE);
+
             // partitions are filtered by string representations, convert the 
equals to string first
-            fieldEquals = stringifyFieldEquals(fieldEquals);
+            partitionFilters = stringifyFieldEquals(fieldEquals);
+
+            // lake source is not null
+            if (lakeSource != null) {
+                // and exist field equals, push down to lake source
+                if (!fieldEquals.isEmpty()) {
+
+                    // convert flink row type to fluss row type
+                    RowType flussRowType = 
FlinkConversions.toFlussRowType(tableOutputType);
+                    List<Predicate> lakePredicates = new ArrayList<>();
+
+                    for (FieldEqual fieldEqual : fieldEquals) {
+                        int idx = fieldEqual.fieldIndex;
+                        String fieldName = 
tableOutputType.getFieldNames().get(idx);
+                        com.alibaba.fluss.types.DataType flussDataType =
+                                flussRowType.getTypeAt(idx);
+
+                        // convert to Paimon literal for partition
+                        Object literal =

Review Comment:
   We don't need to convert to paimon literal since fluss literal to paimon 
literal is already done in `FlussToPaimonPredicateConverter`



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java:
##########
@@ -540,6 +592,18 @@ private int[] getKeyRowProjection() {
         return projection;
     }
 
+    @Nullable
+    private Object toPaimonLiteralForPartition(

Review Comment:
   we can remove this.



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java:
##########
@@ -449,11 +455,57 @@ && hasPrimaryKey()
                             getPartitionKeyTypes(),
                             acceptedFilters,
                             remainingFilters,
-                            FLINK_INTERNAL_VALUE);
+                            FLUSS_INTERNAL_VALUE);
+
             // partitions are filtered by string representations, convert the 
equals to string first
-            fieldEquals = stringifyFieldEquals(fieldEquals);
+            partitionFilters = stringifyFieldEquals(fieldEquals);
+
+            // lake source is not null
+            if (lakeSource != null) {
+                // and exist field equals, push down to lake source
+                if (!fieldEquals.isEmpty()) {
+
+                    // convert flink row type to fluss row type
+                    RowType flussRowType = 
FlinkConversions.toFlussRowType(tableOutputType);
+                    List<Predicate> lakePredicates = new ArrayList<>();
+
+                    for (FieldEqual fieldEqual : fieldEquals) {

Review Comment:
   nit: 
   the peiece of code can be simplified to 
   ```
   PredicateBuilder predicateBuilder = new PredicateBuilder(flussRowType);
                       for (FieldEqual fieldEqual : fieldEquals) {
                           lakePredicates.add(
                                   predicateBuilder.equal(
                                           fieldEqual.fieldIndex, 
fieldEqual.equalValue));
                       }
   ```



##########
fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java:
##########
@@ -449,11 +455,57 @@ && hasPrimaryKey()
                             getPartitionKeyTypes(),
                             acceptedFilters,
                             remainingFilters,
-                            FLINK_INTERNAL_VALUE);
+                            FLUSS_INTERNAL_VALUE);
+
             // partitions are filtered by string representations, convert the 
equals to string first
-            fieldEquals = stringifyFieldEquals(fieldEquals);
+            partitionFilters = stringifyFieldEquals(fieldEquals);
+
+            // lake source is not null
+            if (lakeSource != null) {
+                // and exist field equals, push down to lake source
+                if (!fieldEquals.isEmpty()) {
+
+                    // convert flink row type to fluss row type
+                    RowType flussRowType = 
FlinkConversions.toFlussRowType(tableOutputType);
+                    List<Predicate> lakePredicates = new ArrayList<>();
+
+                    for (FieldEqual fieldEqual : fieldEquals) {
+                        int idx = fieldEqual.fieldIndex;
+                        String fieldName = 
tableOutputType.getFieldNames().get(idx);
+                        com.alibaba.fluss.types.DataType flussDataType =
+                                flussRowType.getTypeAt(idx);
+
+                        // convert to Paimon literal for partition
+                        Object literal =
+                                toPaimonLiteralForPartition(flussDataType, 
fieldEqual.equalValue);
+
+                        if (literal == null) {
+                            continue;
+                        }
+
+                        lakePredicates.add(
+                                new LeafPredicate(
+                                        Equal.INSTANCE,
+                                        flussDataType,
+                                        idx,
+                                        fieldName,
+                                        Collections.singletonList(literal)));
+                    }
+
+                    if (!lakePredicates.isEmpty()) {
+                        final LakeSource.FilterPushDownResult 
filterPushDownResult =
+                                lakeSource.withFilters(lakePredicates);
+                        if (filterPushDownResult.acceptedPredicates().size()
+                                != lakePredicates.size()) {
+                            LOG.warn("Some partition filters are not accepted 
by the lake source");
+                        }
+                    }
+                } else {
+                    // fill with empty list explicitly, when field equal is 
not exist
+                    lakeSource.withFilters(Collections.emptyList());

Review Comment:
   nit: don't need to call this method if no predicate



##########
fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/source/FlinkTableSourceITCase.java:
##########
@@ -1066,6 +1066,52 @@ void testStreamingReadMultiPartitionPushDown() throws 
Exception {
         assertResultsIgnoreOrder(rowIter, expectedRowValues, true);
     }
 
+    @Test
+    void testStreamingReadMultiPartitionPushDownReverseOrderCondition() throws 
Exception {

Review Comment:
   We actually want to verify when a table is lake enable, partition push down 
can still work, so you need to add test for lake enabled table. 
   I think you can add a case in test method 
`FlinkUnionReadLogTableITCase#testReadLogTableFullType`
   Add a test case for `select * from xx where p = xx`, where p is the 
partitioned key 
   when `isPartitioned` is true,
    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to