wuchong commented on code in PR #2951:
URL: https://github.com/apache/fluss/pull/2951#discussion_r3048948898


##########
fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java:
##########
@@ -247,6 +270,17 @@ public boolean isProjectionPushDowned() {
         return projectionPushDowned;
     }
 
+    /** Get the target schema ID. */
+    public int getTargetSchemaId() {

Review Comment:
   unused, can be removed.



##########
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java:
##########
@@ -93,6 +95,9 @@ public class LogFetcher implements Closeable {
     //  bytes from remote file.
     private final LogRecordReadContext remoteReadContext;
     @Nullable private final Projection projection;
+    @Nullable private final Predicate recordBatchFilter;

Review Comment:
   unused, remove



##########
fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java:
##########
@@ -490,11 +503,81 @@ public FetchDataInfo read(
      * @param minOneMessage If this is true, the first message will be 
returned even if it exceeds
      *     `maxSize` (if one exists)
      * @param projection The column projection to apply to the log records
-     * @return The fetched data and the offset metadata of the first message 
whose offset is >=
-     *     startOffset, or null if the startOffset is larger than the largest 
offset in this log
+     * @param recordBatchFilter The filter to apply to the log records (must 
be null if readContext
+     *     is null)
+     * @param readContext The read context for batch filtering (must be null 
if recordBatchFilter is
+     *     null)
+     * @throws LogOffsetOutOfRangeException If startOffset is beyond the log 
start and end offset
+     * @return The fetch data information including fetch starting offset 
metadata and messages
+     *     read.
+     */
+    @VisibleForTesting
+    FetchDataInfo read(
+            long startOffset,
+            int maxSize,
+            long maxPosition,
+            boolean minOneMessage,
+            @Nullable FileLogProjection projection,
+            @Nullable Predicate recordBatchFilter,
+            @Nullable LogRecordReadContext readContext)
+            throws IOException {
+        FilterContext filterContext = null;
+        if (recordBatchFilter != null && readContext != null) {
+            filterContext = new FilterContext(recordBatchFilter, readContext, 
null);

Review Comment:
   In this scenario, we should be able to derive the `PredicateSchemaResolver` 
directly within the `FilterContext`. Since all necessary parameters for the 
`PredicateSchemaResolver` can be obtained from `recordBatchFilter`, 
`readContext`, and `filterSchemaId`, moving its construction into 
`FilterContext` is logical. Relying on a nullable `PredicateSchemaResolver` 
introduces unnecessary risk and potential errors, as it may lead to 
unpredictable behavior, such as using an outdated schema to parse statistics.



##########
fluss-common/src/main/java/org/apache/fluss/record/DefaultLogRecordBatchStatistics.java:
##########
@@ -147,6 +168,9 @@ public InternalRow getMaxValues() {
 
     @Override
     public Long[] getNullCounts() {

Review Comment:
   Will this be addressed in a follow-up PR? Could you create an issue for this?



##########
fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java:
##########
@@ -247,6 +270,17 @@ public boolean isProjectionPushDowned() {
         return projectionPushDowned;
     }
 
+    /** Get the target schema ID. */
+    public int getTargetSchemaId() {
+        return targetSchemaId;
+    }
+
+    /** Get the schema getter. */
+    @Nullable

Review Comment:
   It seems the `schemaGetter` is not-null?



##########
fluss-server/src/main/java/org/apache/fluss/server/log/FilterContext.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.log;
+
+import org.apache.fluss.predicate.Predicate;
+import org.apache.fluss.record.LogRecordReadContext;
+
+import javax.annotation.Nullable;
+
+/**
+ * Encapsulates the filter-related parameters for server-side filter pushdown 
during log reads.
+ *
+ * <p>All three parameters are logically coupled: the predicate defines what 
to filter, the read
+ * context provides batch statistics for filter evaluation, and the predicate 
resolver handles
+ * schema evolution. This class enforces that they are always passed together.
+ */
+public final class FilterContext {
+
+    private final Predicate recordBatchFilter;
+    private final LogRecordReadContext readContext;
+    @Nullable private final PredicateSchemaResolver predicateResolver;
+
+    public FilterContext(
+            Predicate recordBatchFilter,
+            LogRecordReadContext readContext,
+            @Nullable PredicateSchemaResolver predicateResolver) {
+        this.recordBatchFilter = recordBatchFilter;
+        this.readContext = readContext;
+        this.predicateResolver = predicateResolver;
+    }
+
+    public Predicate getRecordBatchFilter() {
+        return recordBatchFilter;
+    }
+
+    public LogRecordReadContext getReadContext() {
+        return readContext;
+    }
+
+    @Nullable
+    public PredicateSchemaResolver getPredicateResolver() {

Review Comment:
   Why and when the `PredicateSchemaResolver` can be nullable?



##########
fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java:
##########
@@ -1494,16 +1503,85 @@ private LogReadInfo readRecords(FetchParams 
fetchParams, LogTablet logTablet)
 
         // todo validate fetched epoch.
 
-        FetchDataInfo fetchDataInfo =
-                logTablet.read(
-                        readOffset,
-                        fetchParams.maxFetchBytes(),
-                        fetchParams.isolation(),
-                        fetchParams.minOneMessage(),
-                        fetchParams.projection());
+        FilterContext filterContext = createFilterContext(fetchParams);
+
+        FetchDataInfo fetchDataInfo;
+        try {
+            fetchDataInfo =
+                    logTablet.read(
+                            readOffset,
+                            fetchParams.maxFetchBytes(),
+                            fetchParams.isolation(),
+                            fetchParams.minOneMessage(),
+                            fetchParams.projection(),
+                            filterContext);
+        } finally {
+            // Close readContext eagerly — it is only used for statistics 
extraction during
+            // batch filtering and is NOT referenced by the returned 
FetchDataInfo records.
+            if (filterContext != null) {
+                IOUtils.closeQuietly(filterContext.getReadContext());
+            }
+        }
         return new LogReadInfo(fetchDataInfo, initialHighWatermark, 
initialLogEndOffset);
     }
 
+    /**
+     * Creates a {@link FilterContext} for batch filtering if a filter is 
configured for this table
+     * and the log format supports it. Returns null if no filter is applicable.
+     */
+    @Nullable
+    private FilterContext createFilterContext(FetchParams fetchParams) {
+        FilterInfo filterInfo = 
fetchParams.getFilterInfo(tableBucket.getTableId());
+        if (filterInfo == null || logFormat != LogFormat.ARROW) {
+            return null;
+        }
+
+        LogRecordReadContext readContext = null;
+        try {
+            int filterSchemaId = filterInfo.getSchemaId();
+            RowType rowType = null;
+            int schemaIdForContext = -1;
+            if (filterSchemaId >= 0) {
+                Schema filterSchema = schemaGetter.getSchema(filterSchemaId);
+                if (filterSchema == null) {
+                    LOG.warn(
+                            "Filter schema not found (schemaId={}) for {}, 
falling back to unfiltered read.",
+                            filterSchemaId,
+                            tableBucket);
+                } else {
+                    rowType = filterSchema.getRowType();
+                    schemaIdForContext = filterSchemaId;
+                }
+            } else {
+                rowType = tableInfo.getSchema().getRowType();
+                schemaIdForContext = tableInfo.getSchemaId();
+            }

Review Comment:
   I think we don't allow `filterSchemaId` to be `< 0`, using latest schema id 
is not safe here, maybe throw an exception here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to