This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4625ce945e [core] Refactor row filter and column masking to 
AbstractDataTableRead
4625ce945e is described below

commit 4625ce945e3b469d4d1804eb53e84d2b4ab4605b
Author: JingsongLi <[email protected]>
AuthorDate: Wed Jan 14 21:39:07 2026 +0800

    [core] Refactor row filter and column masking to AbstractDataTableRead
---
 .../rest/responses/AuthTableQueryResponse.java     |   8 +-
 .../org/apache/paimon/predicate/LeafFunction.java  |   1 +
 .../org/apache/paimon/predicate/Predicate.java     |   2 +
 .../org/apache/paimon/predicate/Transform.java     |   1 +
 .../paimon/catalog/TableQueryAuthResult.java       | 231 ++++++++++++++-
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  51 +---
 .../apache/paimon/table/CatalogEnvironment.java    |   3 +-
 .../paimon/table/source/AbstractDataTableRead.java |  11 +
 .../paimon/table/source/AbstractDataTableScan.java |  40 +--
 .../paimon/table/source/AppendTableRead.java       |   2 +-
 .../paimon/table/source/AuthAwareTableRead.java    | 311 ---------------------
 .../paimon/table/source/DataTableBatchScan.java    |   8 +-
 .../paimon/table/source/DataTableStreamScan.java   |   8 +-
 .../apache/paimon/table/source/PushDownUtils.java  |  11 +-
 .../apache/paimon/table/source/QueryAuthSplit.java | 199 +------------
 .../paimon/table/source/ReadBuilderImpl.java       |   7 -
 .../apache/paimon/table/source/TableQueryAuth.java |   1 +
 .../source/TableQueryAuthResultSerializer.java     |  75 -----
 .../apache/paimon/rest/MockRESTCatalogTest.java    |   5 +-
 .../paimon/flink/lookup/LookupCompactDiffRead.java |   3 +-
 .../org/apache/paimon/flink/RESTCatalogITCase.java |  15 +-
 .../spark/aggregate/AggregatePushDownUtils.scala   |  16 +-
 .../paimon/spark/scan/BinPackingSplits.scala       |  29 +-
 .../paimon/spark/SparkCatalogWithRestTest.java     |  14 +-
 24 files changed, 328 insertions(+), 724 deletions(-)

diff --git 
a/paimon-api/src/main/java/org/apache/paimon/rest/responses/AuthTableQueryResponse.java
 
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/AuthTableQueryResponse.java
index 3aab9ea14e..7410631aea 100644
--- 
a/paimon-api/src/main/java/org/apache/paimon/rest/responses/AuthTableQueryResponse.java
+++ 
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/AuthTableQueryResponse.java
@@ -40,6 +40,10 @@ public class AuthTableQueryResponse implements RESTResponse {
     @JsonProperty(FIELD_FILTER)
     private final List<String> filter;
 
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    @JsonProperty(FIELD_COLUMN_MASKING)
+    private final Map<String, String> columnMasking;
+
     @JsonCreator
     public AuthTableQueryResponse(
             @JsonProperty(FIELD_FILTER) List<String> filter,
@@ -53,10 +57,6 @@ public class AuthTableQueryResponse implements RESTResponse {
         return filter;
     }
 
-    @JsonInclude(JsonInclude.Include.NON_NULL)
-    @JsonProperty(FIELD_COLUMN_MASKING)
-    private final Map<String, String> columnMasking;
-
     @JsonGetter(FIELD_COLUMN_MASKING)
     public Map<String, String> columnMasking() {
         return columnMasking;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java
index 974537f526..d3d0ba9eb9 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/LeafFunction.java
@@ -30,6 +30,7 @@ import java.util.Optional;
 
 /** Function to test a field with literals. */
 public abstract class LeafFunction implements Serializable {
+
     @JsonCreator
     public static LeafFunction fromJson(String name) throws IOException {
         switch (name) {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/Predicate.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/Predicate.java
index c8ae3dd9d2..6ff977fd27 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/Predicate.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/Predicate.java
@@ -44,7 +44,9 @@ import java.util.Optional;
     @JsonSubTypes.Type(value = CompoundPredicate.class, name = 
CompoundPredicate.NAME)
 })
 public interface Predicate extends Serializable {
+
     String FIELD_KIND = "kind";
+
     /**
      * Test based on the specific input row.
      *
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java 
b/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
index b90ab3c9a7..79f15699b5 100644
--- a/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
+++ b/paimon-common/src/main/java/org/apache/paimon/predicate/Transform.java
@@ -40,6 +40,7 @@ import java.util.List;
     @JsonSubTypes.Type(value = UpperTransform.class, name = 
UpperTransform.NAME)
 })
 public interface Transform extends Serializable {
+
     String FIELD_NAME = "name";
 
     String name();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/TableQueryAuthResult.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/TableQueryAuthResult.java
index c4c33f241a..87910b5cc6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/catalog/TableQueryAuthResult.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/catalog/TableQueryAuthResult.java
@@ -18,40 +18,243 @@
 
 package org.apache.paimon.catalog;
 
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.predicate.And;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateVisitor;
 import org.apache.paimon.predicate.Transform;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.source.DataFilePlan;
+import org.apache.paimon.table.source.QueryAuthSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableScan;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InternalRowUtils;
+import org.apache.paimon.utils.JsonSerdeUtil;
+
+import org.apache.paimon.shade.org.apache.commons.lang3.StringUtils;
 
 import javax.annotation.Nullable;
 
-import java.util.Collections;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
 
 /** Auth result for table query, including row level filter and optional 
column masking rules. */
-public class TableQueryAuthResult {
+public class TableQueryAuthResult implements Serializable {
+
+    private static final long serialVersionUID = 1L;
 
-    @Nullable private final Predicate rowFilter;
-    private final Map<String, Transform> columnMasking;
+    private final @Nullable List<String> filter;
+    private final @Nullable Map<String, String> columnMasking;
 
     public TableQueryAuthResult(
-            @Nullable Predicate rowFilter, Map<String, Transform> 
columnMasking) {
-        this.rowFilter = rowFilter;
-        this.columnMasking = columnMasking == null ? Collections.emptyMap() : 
columnMasking;
+            @Nullable List<String> filter, @Nullable Map<String, String> 
columnMasking) {
+        this.filter = filter;
+        this.columnMasking = columnMasking;
     }
 
-    public static TableQueryAuthResult empty() {
-        return new TableQueryAuthResult(null, Collections.emptyMap());
+    @Nullable
+    public List<String> filter() {
+        return filter;
     }
 
     @Nullable
-    public Predicate rowFilter() {
+    public Map<String, String> columnMasking() {
+        return columnMasking;
+    }
+
+    public TableScan.Plan convertPlan(TableScan.Plan plan) {
+        if (filter == null && (columnMasking == null || 
columnMasking.isEmpty())) {
+            return plan;
+        }
+        List<Split> authSplits =
+                plan.splits().stream()
+                        .map(split -> new QueryAuthSplit(split, this))
+                        .collect(Collectors.toList());
+        return new DataFilePlan<>(authSplits);
+    }
+
+    @Nullable
+    public Predicate extractPredicate() {
+        Predicate rowFilter = null;
+        if (filter != null && !filter.isEmpty()) {
+            List<Predicate> predicates = new ArrayList<>();
+            for (String json : filter) {
+                if (StringUtils.isEmpty(json)) {
+                    continue;
+                }
+                Predicate predicate = JsonSerdeUtil.fromJson(json, 
Predicate.class);
+                if (predicate != null) {
+                    predicates.add(predicate);
+                }
+            }
+            if (predicates.size() == 1) {
+                rowFilter = predicates.get(0);
+            } else if (!predicates.isEmpty()) {
+                rowFilter = new CompoundPredicate(And.INSTANCE, predicates);
+            }
+        }
         return rowFilter;
     }
 
-    public Map<String, Transform> columnMasking() {
-        return columnMasking;
+    public Map<String, Transform> extractColumnMasking() {
+        Map<String, Transform> result = new TreeMap<>();
+        if (columnMasking != null && !columnMasking.isEmpty()) {
+            for (Map.Entry<String, String> e : columnMasking.entrySet()) {
+                String column = e.getKey();
+                String json = e.getValue();
+                if (StringUtils.isEmpty(column) || StringUtils.isEmpty(json)) {
+                    continue;
+                }
+                Transform transform = JsonSerdeUtil.fromJson(json, 
Transform.class);
+                if (transform == null) {
+                    continue;
+                }
+                result.put(column, transform);
+            }
+        }
+        return result;
     }
 
-    public boolean isEmpty() {
-        return rowFilter == null && (columnMasking == null || 
columnMasking.isEmpty());
+    public RecordReader<InternalRow> doAuth(
+            RecordReader<InternalRow> reader, RowType outputRowType) {
+        Predicate rowFilter = extractPredicate();
+        if (rowFilter != null) {
+            Predicate remappedFilter = rowFilter.visit(new 
PredicateRemapper(outputRowType));
+            if (remappedFilter != null) {
+                reader = reader.filter(remappedFilter::test);
+            }
+        }
+
+        Map<String, Transform> columnMasking = extractColumnMasking();
+        if (columnMasking != null && !columnMasking.isEmpty()) {
+            Map<Integer, Transform> remappedMasking =
+                    transformRemapping(outputRowType, columnMasking);
+            if (!remappedMasking.isEmpty()) {
+                reader = reader.transform(row -> transform(outputRowType, 
remappedMasking, row));
+            }
+        }
+
+        return reader;
+    }
+
+    private static InternalRow transform(
+            RowType outputRowType, Map<Integer, Transform> remappedMasking, 
InternalRow row) {
+        int arity = outputRowType.getFieldCount();
+        GenericRow out = new GenericRow(row.getRowKind(), arity);
+        for (int i = 0; i < arity; i++) {
+            DataType type = outputRowType.getTypeAt(i);
+            out.setField(i, InternalRowUtils.get(row, i, type));
+        }
+        for (Map.Entry<Integer, Transform> e : remappedMasking.entrySet()) {
+            int targetIndex = e.getKey();
+            Transform transform = e.getValue();
+            Object masked = transform.transform(row);
+            out.setField(targetIndex, masked);
+        }
+        return out;
+    }
+
+    private static Map<Integer, Transform> transformRemapping(
+            RowType outputRowType, Map<String, Transform> masking) {
+        Map<Integer, Transform> out = new HashMap<>();
+        if (masking == null || masking.isEmpty()) {
+            return out;
+        }
+
+        for (Map.Entry<String, Transform> e : masking.entrySet()) {
+            String targetColumn = e.getKey();
+            Transform transform = e.getValue();
+            if (targetColumn == null || transform == null) {
+                continue;
+            }
+
+            int targetIndex = outputRowType.getFieldIndex(targetColumn);
+            if (targetIndex < 0) {
+                continue;
+            }
+
+            List<Object> newInputs = new ArrayList<>();
+            for (Object input : transform.inputs()) {
+                if (input instanceof FieldRef) {
+                    FieldRef ref = (FieldRef) input;
+                    int newIndex = outputRowType.getFieldIndex(ref.name());
+                    if (newIndex < 0) {
+                        throw new IllegalArgumentException(
+                                "Column masking refers to field '"
+                                        + ref.name()
+                                        + "' which is not present in output 
row type "
+                                        + outputRowType);
+                    }
+                    DataType type = outputRowType.getTypeAt(newIndex);
+                    newInputs.add(new FieldRef(newIndex, ref.name(), type));
+                } else {
+                    newInputs.add(input);
+                }
+            }
+            out.put(targetIndex, transform.copyWithNewInputs(newInputs));
+        }
+        return out;
+    }
+
+    private static class PredicateRemapper implements 
PredicateVisitor<Predicate> {
+
+        private final RowType outputRowType;
+
+        private PredicateRemapper(RowType outputRowType) {
+            this.outputRowType = outputRowType;
+        }
+
+        @Override
+        public Predicate visit(LeafPredicate predicate) {
+            Transform transform = predicate.transform();
+            List<Object> newInputs = new ArrayList<>();
+            for (Object input : transform.inputs()) {
+                if (input instanceof FieldRef) {
+                    FieldRef ref = (FieldRef) input;
+                    String fieldName = ref.name();
+                    int newIndex = outputRowType.getFieldIndex(fieldName);
+                    if (newIndex < 0) {
+                        throw new RuntimeException(
+                                String.format(
+                                        "Unable to read data without column %s 
when row filter enabled.",
+                                        fieldName));
+                    }
+                    DataType type = outputRowType.getTypeAt(newIndex);
+                    newInputs.add(new FieldRef(newIndex, fieldName, type));
+                } else {
+                    newInputs.add(input);
+                }
+            }
+            return predicate.copyWithNewInputs(newInputs);
+        }
+
+        @Override
+        public Predicate visit(CompoundPredicate predicate) {
+            List<Predicate> remappedChildren = new ArrayList<>();
+            for (Predicate child : predicate.children()) {
+                Predicate remapped = child.visit(this);
+                if (remapped != null) {
+                    remappedChildren.add(remapped);
+                }
+            }
+            if (remappedChildren.isEmpty()) {
+                return null;
+            }
+            if (remappedChildren.size() == 1) {
+                return remappedChildren.get(0);
+            }
+            return new CompoundPredicate(predicate.function(), 
remappedChildren);
+        }
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index c41810c0fd..16d2f599bd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -38,10 +38,6 @@ import org.apache.paimon.function.FunctionChange;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.partition.PartitionStatistics;
-import org.apache.paimon.predicate.And;
-import org.apache.paimon.predicate.CompoundPredicate;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.Transform;
 import org.apache.paimon.rest.exceptions.AlreadyExistsException;
 import org.apache.paimon.rest.exceptions.BadRequestException;
 import org.apache.paimon.rest.exceptions.ForbiddenException;
@@ -64,7 +60,6 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableSnapshot;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.system.SystemTableLoader;
-import org.apache.paimon.utils.JsonSerdeUtil;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotNotExistException;
 import org.apache.paimon.view.View;
@@ -74,8 +69,6 @@ import org.apache.paimon.view.ViewSchema;
 
 import org.apache.paimon.shade.org.apache.commons.lang3.StringUtils;
 
-import org.jetbrains.annotations.NotNull;
-
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -88,7 +81,6 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.TreeMap;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.BRANCH;
@@ -539,7 +531,7 @@ public class RESTCatalog implements Catalog {
         checkNotSystemTable(identifier, "authTable");
         try {
             AuthTableQueryResponse response = api.authTableQuery(identifier, 
select);
-            return getTableQueryAuthResult(response);
+            return new TableQueryAuthResult(response.filter(), 
response.columnMasking());
         } catch (NoSuchResourceException e) {
             throw new TableNotExistException(identifier);
         } catch (ForbiddenException e) {
@@ -1165,45 +1157,4 @@ public class RESTCatalog implements Catalog {
         }
         return schema;
     }
-
-    private static @NotNull TableQueryAuthResult getTableQueryAuthResult(
-            AuthTableQueryResponse response) {
-        List<String> predicateJsons = response == null ? null : 
response.filter();
-        Predicate rowFilter = null;
-        if (predicateJsons != null && !predicateJsons.isEmpty()) {
-            List<Predicate> predicates = new ArrayList<>();
-            for (String json : predicateJsons) {
-                if (StringUtils.isEmpty(json)) {
-                    continue;
-                }
-                Predicate predicate = JsonSerdeUtil.fromJson(json, 
Predicate.class);
-                if (predicate != null) {
-                    predicates.add(predicate);
-                }
-            }
-            if (predicates.size() == 1) {
-                rowFilter = predicates.get(0);
-            } else if (!predicates.isEmpty()) {
-                rowFilter = new CompoundPredicate(And.INSTANCE, predicates);
-            }
-        }
-
-        Map<String, Transform> columnMasking = new TreeMap<>();
-        Map<String, String> maskingJsons = response == null ? null : 
response.columnMasking();
-        if (maskingJsons != null && !maskingJsons.isEmpty()) {
-            for (Map.Entry<String, String> e : maskingJsons.entrySet()) {
-                String column = e.getKey();
-                String json = e.getValue();
-                if (StringUtils.isEmpty(column) || StringUtils.isEmpty(json)) {
-                    continue;
-                }
-                Transform transform = JsonSerdeUtil.fromJson(json, 
Transform.class);
-                if (transform == null) {
-                    continue;
-                }
-                columnMasking.put(column, transform);
-            }
-        }
-        return new TableQueryAuthResult(rowFilter, columnMasking);
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java 
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
index d07e009214..a0a23d8ca4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -28,7 +28,6 @@ import org.apache.paimon.catalog.CatalogSnapshotCommit;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.RenamingSnapshotCommit;
 import org.apache.paimon.catalog.SnapshotCommit;
-import org.apache.paimon.catalog.TableQueryAuthResult;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.table.source.TableQueryAuth;
 import org.apache.paimon.tag.SnapshotLoaderImpl;
@@ -154,7 +153,7 @@ public class CatalogEnvironment implements Serializable {
 
     public TableQueryAuth tableQueryAuth(CoreOptions options) {
         if (!options.queryAuthEnabled() || catalogLoader == null) {
-            return select -> TableQueryAuthResult.empty();
+            return select -> null;
         }
         return select -> {
             try (Catalog catalog = catalogLoader.load()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
index bb4a3cce2b..b1dab4db56 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableRead.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.table.source;
 
+import org.apache.paimon.catalog.TableQueryAuthResult;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.variant.VariantAccessInfo;
 import org.apache.paimon.disk.IOManager;
@@ -90,7 +91,17 @@ public abstract class AbstractDataTableRead implements 
InnerTableRead {
 
     @Override
     public final RecordReader<InternalRow> createReader(Split split) throws 
IOException {
+        TableQueryAuthResult queryAuthResult = null;
+        if (split instanceof QueryAuthSplit) {
+            QueryAuthSplit authSplit = (QueryAuthSplit) split;
+            split = authSplit.split();
+            queryAuthResult = authSplit.authResult();
+        }
         RecordReader<InternalRow> reader = reader(split);
+        if (queryAuthResult != null) {
+            RowType type = readType == null ? schema.logicalRowType() : 
readType;
+            reader = queryAuthResult.doAuth(reader, type);
+        }
         if (executeFilter) {
             reader = executeFilter(reader);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index c64b75557a..dbc88937a4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -65,7 +65,6 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -87,7 +86,6 @@ abstract class AbstractDataTableScan implements DataTableScan 
{
     private final TableQueryAuth queryAuth;
 
     @Nullable private RowType readType;
-    @Nullable private TableQueryAuthResult authResult;
 
     protected AbstractDataTableScan(
             TableSchema schema,
@@ -100,6 +98,18 @@ abstract class AbstractDataTableScan implements 
DataTableScan {
         this.queryAuth = queryAuth;
     }
 
+    @Override
+    public final TableScan.Plan plan() {
+        TableQueryAuthResult queryAuthResult = authQuery();
+        Plan plan = planWithoutAuth();
+        if (queryAuthResult != null) {
+            plan = queryAuthResult.convertPlan(plan);
+        }
+        return plan;
+    }
+
+    protected abstract TableScan.Plan planWithoutAuth();
+
     @Override
     public InnerTableScan withFilter(Predicate predicate) {
         snapshotReader.withFilter(predicate);
@@ -166,34 +176,12 @@ abstract class AbstractDataTableScan implements 
DataTableScan {
         return this;
     }
 
+    @Nullable
     protected TableQueryAuthResult authQuery() {
         if (!options.queryAuthEnabled()) {
             return null;
         }
-        if (authResult == null) {
-            authResult = queryAuth.auth(readType == null ? null : 
readType.getFieldNames());
-        }
-        return authResult;
-    }
-
-    protected TableScan.Plan applyAuthToSplits(Plan plan) {
-        TableQueryAuthResult authResult = authQuery();
-        if (authResult == null || authResult.isEmpty()) {
-            return plan;
-        }
-
-        List<Split> splits = plan.splits();
-        List<Split> authSplits = new ArrayList<>(splits.size());
-        for (Split split : splits) {
-            if (split instanceof DataSplit) {
-                DataSplit dataSplit = (DataSplit) split;
-                authSplits.add(QueryAuthSplit.wrap(dataSplit, authResult));
-            } else {
-                authSplits.add(split);
-            }
-        }
-
-        return new DataFilePlan<>(authSplits);
+        return queryAuth.auth(readType == null ? null : 
readType.getFieldNames());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
index e47ffcc2ba..c727fb63be 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AppendTableRead.java
@@ -122,6 +122,6 @@ public final class AppendTableRead extends 
AbstractDataTableRead {
             }
         }
 
-        throw new RuntimeException("Should not happen.");
+        throw new RuntimeException("Unsupported split: " + split.getClass());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AuthAwareTableRead.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AuthAwareTableRead.java
deleted file mode 100644
index a7772b3c0a..0000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AuthAwareTableRead.java
+++ /dev/null
@@ -1,311 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source;
-
-import org.apache.paimon.catalog.TableQueryAuthResult;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.metrics.MetricRegistry;
-import org.apache.paimon.predicate.CompoundPredicate;
-import org.apache.paimon.predicate.FieldRef;
-import org.apache.paimon.predicate.LeafPredicate;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateVisitor;
-import org.apache.paimon.predicate.Transform;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.RowType;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.paimon.utils.InternalRowUtils.get;
-
-/** A {@link TableRead} wrapper that checks splits for authorization 
information. */
-public class AuthAwareTableRead implements TableRead {
-
-    private final TableRead wrapped;
-    private final RowType outputRowType;
-
-    public AuthAwareTableRead(TableRead wrapped, RowType outputRowType) {
-        this.wrapped = wrapped;
-        this.outputRowType = outputRowType;
-    }
-
-    @Override
-    public TableRead withMetricRegistry(MetricRegistry registry) {
-        return new AuthAwareTableRead(wrapped.withMetricRegistry(registry), 
outputRowType);
-    }
-
-    @Override
-    public TableRead executeFilter() {
-        return new AuthAwareTableRead(wrapped.executeFilter(), outputRowType);
-    }
-
-    @Override
-    public TableRead withIOManager(IOManager ioManager) {
-        return new AuthAwareTableRead(wrapped.withIOManager(ioManager), 
outputRowType);
-    }
-
-    @Override
-    public RecordReader<InternalRow> createReader(Split split) throws 
IOException {
-        if (split instanceof QueryAuthSplit) {
-            TableQueryAuthResult authResult = ((QueryAuthSplit) 
split).authResult();
-            if (authResult != null) {
-                RecordReader<InternalRow> reader =
-                        wrapped.createReader(((QueryAuthSplit) 
split).dataSplit());
-                // Apply row-level filter if present
-                Predicate rowFilter = authResult.rowFilter();
-                if (rowFilter != null) {
-                    Predicate remappedFilter = 
remapPredicateToOutputRow(outputRowType, rowFilter);
-                    if (remappedFilter != null) {
-                        reader = new FilterRecordReader(reader, 
remappedFilter);
-                    }
-                }
-
-                // Apply column masking if present
-                Map<String, Transform> columnMasking = 
authResult.columnMasking();
-                if (columnMasking != null && !columnMasking.isEmpty()) {
-                    MaskingApplier applier = new MaskingApplier(outputRowType, 
columnMasking);
-                    reader = new MaskingRecordReader(reader, applier);
-                }
-
-                return reader;
-            }
-        }
-        return wrapped.createReader(split);
-    }
-
-    private static class FilterRecordReader implements 
RecordReader<InternalRow> {
-
-        private final RecordReader<InternalRow> wrapped;
-        private final Predicate predicate;
-
-        private FilterRecordReader(RecordReader<InternalRow> wrapped, 
Predicate predicate) {
-            this.wrapped = wrapped;
-            this.predicate = predicate;
-        }
-
-        @Nullable
-        @Override
-        public RecordIterator<InternalRow> readBatch() throws IOException {
-            RecordIterator<InternalRow> batch = wrapped.readBatch();
-            if (batch == null) {
-                return null;
-            }
-            return new FilterRecordIterator(batch, predicate);
-        }
-
-        @Override
-        public void close() throws IOException {
-            wrapped.close();
-        }
-    }
-
-    private static class FilterRecordIterator implements 
RecordReader.RecordIterator<InternalRow> {
-
-        private final RecordReader.RecordIterator<InternalRow> wrapped;
-        private final Predicate predicate;
-
-        private FilterRecordIterator(
-                RecordReader.RecordIterator<InternalRow> wrapped, Predicate 
predicate) {
-            this.wrapped = wrapped;
-            this.predicate = predicate;
-        }
-
-        @Nullable
-        @Override
-        public InternalRow next() throws IOException {
-            while (true) {
-                InternalRow row = wrapped.next();
-                if (row == null) {
-                    return null;
-                }
-                if (predicate.test(row)) {
-                    return row;
-                }
-            }
-        }
-
-        @Override
-        public void releaseBatch() {
-            wrapped.releaseBatch();
-        }
-    }
-
-    private static class MaskingRecordReader implements 
RecordReader<InternalRow> {
-
-        private final RecordReader<InternalRow> wrapped;
-        private final MaskingApplier applier;
-
-        private MaskingRecordReader(RecordReader<InternalRow> wrapped, 
MaskingApplier applier) {
-            this.wrapped = wrapped;
-            this.applier = applier;
-        }
-
-        @Nullable
-        @Override
-        public RecordIterator<InternalRow> readBatch() throws IOException {
-            RecordIterator<InternalRow> batch = wrapped.readBatch();
-            if (batch == null) {
-                return null;
-            }
-            return batch.transform(applier::apply);
-        }
-
-        @Override
-        public void close() throws IOException {
-            wrapped.close();
-        }
-    }
-
-    private static class MaskingApplier {
-
-        private final RowType outputRowType;
-        private final Map<Integer, Transform> remapped;
-
-        private MaskingApplier(RowType outputRowType, Map<String, Transform> 
masking) {
-            this.outputRowType = outputRowType;
-            this.remapped = remapToOutputRow(outputRowType, masking);
-        }
-
-        private InternalRow apply(InternalRow row) {
-            if (remapped.isEmpty()) {
-                return row;
-            }
-            int arity = outputRowType.getFieldCount();
-            GenericRow out = new GenericRow(row.getRowKind(), arity);
-            for (int i = 0; i < arity; i++) {
-                DataType type = outputRowType.getTypeAt(i);
-                out.setField(i, get(row, i, type));
-            }
-            for (Map.Entry<Integer, Transform> e : remapped.entrySet()) {
-                int targetIndex = e.getKey();
-                Transform transform = e.getValue();
-                Object masked = transform.transform(row);
-                out.setField(targetIndex, masked);
-            }
-            return out;
-        }
-
-        private static Map<Integer, Transform> remapToOutputRow(
-                RowType outputRowType, Map<String, Transform> masking) {
-            Map<Integer, Transform> out = new HashMap<>();
-            if (masking == null || masking.isEmpty()) {
-                return out;
-            }
-
-            for (Map.Entry<String, Transform> e : masking.entrySet()) {
-                String targetColumn = e.getKey();
-                Transform transform = e.getValue();
-                if (targetColumn == null || transform == null) {
-                    continue;
-                }
-
-                int targetIndex = outputRowType.getFieldIndex(targetColumn);
-                if (targetIndex < 0) {
-                    continue;
-                }
-
-                List<Object> newInputs = new ArrayList<>();
-                for (Object input : transform.inputs()) {
-                    if (input instanceof FieldRef) {
-                        FieldRef ref = (FieldRef) input;
-                        int newIndex = outputRowType.getFieldIndex(ref.name());
-                        if (newIndex < 0) {
-                            throw new IllegalArgumentException(
-                                    "Column masking refers to field '"
-                                            + ref.name()
-                                            + "' which is not present in 
output row type "
-                                            + outputRowType);
-                        }
-                        DataType type = outputRowType.getTypeAt(newIndex);
-                        newInputs.add(new FieldRef(newIndex, ref.name(), 
type));
-                    } else {
-                        newInputs.add(input);
-                    }
-                }
-                out.put(targetIndex, transform.copyWithNewInputs(newInputs));
-            }
-            return out;
-        }
-    }
-
-    private static Predicate remapPredicateToOutputRow(RowType outputRowType, 
Predicate predicate) {
-        return predicate.visit(new PredicateRemapper(outputRowType));
-    }
-
-    private static class PredicateRemapper implements 
PredicateVisitor<Predicate> {
-        private final RowType outputRowType;
-
-        private PredicateRemapper(RowType outputRowType) {
-            this.outputRowType = outputRowType;
-        }
-
-        @Override
-        public Predicate visit(LeafPredicate predicate) {
-            Transform transform = predicate.transform();
-            List<Object> newInputs = new ArrayList<>();
-            boolean hasUnmappedField = false;
-            for (Object input : transform.inputs()) {
-                if (input instanceof FieldRef) {
-                    FieldRef ref = (FieldRef) input;
-                    String fieldName = ref.name();
-                    int newIndex = outputRowType.getFieldIndex(fieldName);
-                    if (newIndex < 0) {
-                        hasUnmappedField = true;
-                        break;
-                    }
-                    DataType type = outputRowType.getTypeAt(newIndex);
-                    newInputs.add(new FieldRef(newIndex, fieldName, type));
-                } else {
-                    newInputs.add(input);
-                }
-            }
-            if (hasUnmappedField) {
-                return null;
-            }
-            return predicate.copyWithNewInputs(newInputs);
-        }
-
-        @Override
-        public Predicate visit(CompoundPredicate predicate) {
-            List<Predicate> remappedChildren = new ArrayList<>();
-            for (Predicate child : predicate.children()) {
-                Predicate remapped = child.visit(this);
-                if (remapped != null) {
-                    remappedChildren.add(remapped);
-                }
-            }
-            if (remappedChildren.isEmpty()) {
-                return null;
-            }
-            if (remappedChildren.size() == 1) {
-                return remappedChildren.get(0);
-            }
-            return new CompoundPredicate(predicate.function(), 
remappedChildren);
-        }
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
index 9fe11c44fd..67c8acbf62 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableBatchScan.java
@@ -90,7 +90,7 @@ public class DataTableBatchScan extends AbstractDataTableScan 
{
     }
 
     @Override
-    public TableScan.Plan plan() {
+    protected TableScan.Plan planWithoutAuth() {
         if (startingScanner == null) {
             startingScanner = createStartingScanner(false);
         }
@@ -99,13 +99,13 @@ public class DataTableBatchScan extends 
AbstractDataTableScan {
             hasNext = false;
             Optional<StartingScanner.Result> pushed = applyPushDownLimit();
             if (pushed.isPresent()) {
-                return 
applyAuthToSplits(DataFilePlan.fromResult(pushed.get()));
+                return DataFilePlan.fromResult(pushed.get());
             }
             pushed = applyPushDownTopN();
             if (pushed.isPresent()) {
-                return 
applyAuthToSplits(DataFilePlan.fromResult(pushed.get()));
+                return DataFilePlan.fromResult(pushed.get());
             }
-            return 
applyAuthToSplits(DataFilePlan.fromResult(startingScanner.scan(snapshotReader)));
+            return 
DataFilePlan.fromResult(startingScanner.scan(snapshotReader));
         } else {
             throw new EndOfScanException();
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
index 678cf0f4df..8b5031de4c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataTableStreamScan.java
@@ -117,7 +117,7 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
     }
 
     @Override
-    public Plan plan() {
+    protected TableScan.Plan planWithoutAuth() {
         if (!initialized) {
             initScanner();
         }
@@ -180,7 +180,7 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
                     "Starting snapshot is {}, next snapshot will be {}.",
                     scannedResult.plan().snapshotId(),
                     nextSnapshotId);
-            return applyAuthToSplits(scannedResult.plan());
+            return scannedResult.plan();
         } else if (result instanceof StartingScanner.NextSnapshot) {
             nextSnapshotId = ((StartingScanner.NextSnapshot) 
result).nextSnapshotId();
             isFullPhaseEnd =
@@ -221,7 +221,7 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
                     if (overwritePlan.splits().isEmpty()) {
                         continue;
                     }
-                    return applyAuthToSplits(overwritePlan);
+                    return overwritePlan;
                 }
             }
 
@@ -233,7 +233,7 @@ public class DataTableStreamScan extends 
AbstractDataTableScan implements Stream
                 if (plan.splits().isEmpty()) {
                     continue;
                 }
-                return applyAuthToSplits(plan);
+                return plan;
             } else {
                 nextSnapshotId++;
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/PushDownUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/PushDownUtils.java
index 833d6422fb..65793014f1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/PushDownUtils.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/PushDownUtils.java
@@ -55,16 +55,21 @@ public class PushDownUtils {
                 || type instanceof DateType;
     }
 
-    public static boolean minmaxAvailable(DataSplit split, Set<String> 
columns) {
+    public static boolean minmaxAvailable(Split split, Set<String> columns) {
+        if (!(split instanceof DataSplit)) {
+            return false;
+        }
+
+        DataSplit dataSplit = (DataSplit) split;
         if (isNullOrEmpty(columns)) {
             return false;
         }
 
-        if (!split.rawConvertible()) {
+        if (!dataSplit.rawConvertible()) {
             return false;
         }
 
-        return split.dataFiles().stream()
+        return dataSplit.dataFiles().stream()
                 .map(DataFileMeta::valueStatsCols)
                 .allMatch(
                         valueStatsCols ->
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java
index 9daa9fe9ea..75da5bda7f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/QueryAuthSplit.java
@@ -19,42 +19,24 @@
 package org.apache.paimon.table.source;
 
 import org.apache.paimon.catalog.TableQueryAuthResult;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.io.DataInputView;
-import org.apache.paimon.io.DataInputViewStreamWrapper;
-import org.apache.paimon.io.DataOutputView;
-import org.apache.paimon.stats.SimpleStatsEvolutions;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.OptionalLong;
-
-/**
- * A wrapper class for {@link DataSplit} that adds query authorization 
information. This class
- * delegates all Split interface methods to the wrapped DataSplit, while 
providing additional auth
- * result functionality.
- */
-public class QueryAuthSplit extends DataSplit {
+/** A wrapper class for {@link Split} that adds query authorization 
information. */
+public class QueryAuthSplit implements Split {
 
     private static final long serialVersionUID = 1L;
 
-    private DataSplit dataSplit;
-    @Nullable private TableQueryAuthResult authResult;
+    private final Split split;
+    private final TableQueryAuthResult authResult;
 
-    public QueryAuthSplit(DataSplit dataSplit, @Nullable TableQueryAuthResult 
authResult) {
-        this.dataSplit = dataSplit;
+    public QueryAuthSplit(Split split, TableQueryAuthResult authResult) {
+        this.split = split;
         this.authResult = authResult;
     }
 
-    public DataSplit dataSplit() {
-        return dataSplit;
+    public Split split() {
+        return split;
     }
 
     @Nullable
@@ -62,171 +44,8 @@ public class QueryAuthSplit extends DataSplit {
         return authResult;
     }
 
-    // Delegate all DataSplit methods to the wrapped instance
-
-    public long snapshotId() {
-        return dataSplit.snapshotId();
-    }
-
-    public BinaryRow partition() {
-        return dataSplit.partition();
-    }
-
-    public int bucket() {
-        return dataSplit.bucket();
-    }
-
-    public String bucketPath() {
-        return dataSplit.bucketPath();
-    }
-
-    @Nullable
-    public Integer totalBuckets() {
-        return dataSplit.totalBuckets();
-    }
-
-    public List<DataFileMeta> beforeFiles() {
-        return dataSplit.beforeFiles();
-    }
-
-    public Optional<List<DeletionFile>> beforeDeletionFiles() {
-        return dataSplit.beforeDeletionFiles();
-    }
-
-    public List<DataFileMeta> dataFiles() {
-        return dataSplit.dataFiles();
-    }
-
     @Override
-    public Optional<List<DeletionFile>> deletionFiles() {
-        return dataSplit.deletionFiles();
-    }
-
-    public boolean isStreaming() {
-        return dataSplit.isStreaming();
-    }
-
-    public boolean rawConvertible() {
-        return dataSplit.rawConvertible();
-    }
-
-    public OptionalLong latestFileCreationEpochMillis() {
-        return dataSplit.latestFileCreationEpochMillis();
-    }
-
-    public OptionalLong earliestFileCreationEpochMillis() {
-        return dataSplit.earliestFileCreationEpochMillis();
-    }
-
     public long rowCount() {
-        return dataSplit.rowCount();
-    }
-
-    public boolean mergedRowCountAvailable() {
-        return dataSplit.mergedRowCountAvailable();
-    }
-
-    public long mergedRowCount() {
-        return dataSplit.mergedRowCount();
-    }
-
-    public Object minValue(
-            int fieldIndex,
-            org.apache.paimon.types.DataField dataField,
-            SimpleStatsEvolutions evolutions) {
-        return dataSplit.minValue(fieldIndex, dataField, evolutions);
-    }
-
-    public Object maxValue(
-            int fieldIndex,
-            org.apache.paimon.types.DataField dataField,
-            SimpleStatsEvolutions evolutions) {
-        return dataSplit.maxValue(fieldIndex, dataField, evolutions);
-    }
-
-    public Long nullCount(int fieldIndex, SimpleStatsEvolutions evolutions) {
-        return dataSplit.nullCount(fieldIndex, evolutions);
-    }
-
-    public long partialMergedRowCount() {
-        return dataSplit.partialMergedRowCount();
-    }
-
-    @Override
-    public Optional<List<RawFile>> convertToRawFiles() {
-        return dataSplit.convertToRawFiles();
-    }
-
-    @Override
-    @Nullable
-    public Optional<List<IndexFile>> indexFiles() {
-        return dataSplit.indexFiles();
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        QueryAuthSplit that = (QueryAuthSplit) o;
-        return Objects.equals(dataSplit, that.dataSplit)
-                && Objects.equals(authResult, that.authResult);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hash(dataSplit, authResult);
-    }
-
-    @Override
-    public String toString() {
-        return "QueryAuthSplit{" + "dataSplit=" + dataSplit + ", authResult=" 
+ authResult + '}';
-    }
-
-    private void writeObject(ObjectOutputStream out) throws IOException {
-        serialize(new org.apache.paimon.io.DataOutputViewStreamWrapper(out));
-    }
-
-    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
-        QueryAuthSplit other = deserialize(new DataInputViewStreamWrapper(in));
-        this.dataSplit = other.dataSplit;
-        this.authResult = other.authResult;
-    }
-
-    public void serialize(DataOutputView out) throws IOException {
-        // Serialize the wrapped DataSplit
-        dataSplit.serialize(out);
-
-        // Serialize authResult
-        if (authResult != null) {
-            out.writeBoolean(true);
-            TableQueryAuthResultSerializer.serialize(authResult, out);
-        } else {
-            out.writeBoolean(false);
-        }
-    }
-
-    public static QueryAuthSplit deserialize(DataInputView in) throws 
IOException {
-        // Deserialize the wrapped DataSplit
-        DataSplit dataSplit = DataSplit.deserialize(in);
-
-        // Deserialize authResult
-        TableQueryAuthResult authResult = null;
-        if (in.readBoolean()) {
-            authResult = TableQueryAuthResultSerializer.deserialize(in);
-        }
-
-        return new QueryAuthSplit(dataSplit, authResult);
-    }
-
-    public static QueryAuthSplit wrap(
-            DataSplit dataSplit, @Nullable TableQueryAuthResult authResult) {
-        if (authResult == null || authResult.isEmpty()) {
-            return new QueryAuthSplit(dataSplit, null);
-        }
-        return new QueryAuthSplit(dataSplit, authResult);
+        return split.rowCount();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 8da04edae0..c81dfd8e01 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -26,7 +26,6 @@ import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.predicate.TopN;
 import org.apache.paimon.predicate.VectorSearch;
-import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.InnerTable;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
@@ -257,12 +256,6 @@ public class ReadBuilderImpl implements ReadBuilder {
         if (variantAccessInfo != null) {
             read.withVariantAccess(variantAccessInfo);
         }
-        if (table instanceof FileStoreTable) {
-            CoreOptions options = new CoreOptions(table.options());
-            if (options.queryAuthEnabled()) {
-                return new AuthAwareTableRead(read, readType());
-            }
-        }
         return read;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuth.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuth.java
index 3d45ec2f33..b4e98576ad 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuth.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuth.java
@@ -27,5 +27,6 @@ import java.util.List;
 /** Table query auth. */
 public interface TableQueryAuth {
 
+    @Nullable
     TableQueryAuthResult auth(@Nullable List<String> select);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuthResultSerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuthResultSerializer.java
deleted file mode 100644
index c557ee08e0..0000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/TableQueryAuthResultSerializer.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table.source;
-
-import org.apache.paimon.catalog.TableQueryAuthResult;
-import org.apache.paimon.io.DataInputView;
-import org.apache.paimon.io.DataOutputView;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.Transform;
-import org.apache.paimon.rest.RESTApi;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/** Serializer for {@link TableQueryAuthResult}. */
-public class TableQueryAuthResultSerializer {
-    public static void serialize(TableQueryAuthResult authResult, 
DataOutputView out)
-            throws IOException {
-        // Serialize row filter
-        if (authResult.rowFilter() != null) {
-            out.writeBoolean(true);
-            String predicateJson = RESTApi.toJson(authResult.rowFilter());
-            out.writeUTF(predicateJson);
-        } else {
-            out.writeBoolean(false);
-        }
-
-        // Serialize column masking
-        Map<String, Transform> columnMasking = authResult.columnMasking();
-        out.writeInt(columnMasking.size());
-        for (Map.Entry<String, Transform> entry : columnMasking.entrySet()) {
-            out.writeUTF(entry.getKey());
-            String transformJson = RESTApi.toJson(entry.getValue());
-            out.writeUTF(transformJson);
-        }
-    }
-
-    public static TableQueryAuthResult deserialize(DataInputView in) throws 
IOException {
-        // Deserialize row filter
-        Predicate rowFilter = null;
-        if (in.readBoolean()) {
-            String predicateJson = in.readUTF();
-            rowFilter = RESTApi.fromJson(predicateJson, Predicate.class);
-        }
-
-        // Deserialize column masking
-        int maskingSize = in.readInt();
-        Map<String, Transform> columnMasking = new HashMap<>(maskingSize);
-        for (int i = 0; i < maskingSize; i++) {
-            String columnName = in.readUTF();
-            String transformJson = in.readUTF();
-            Transform transform = RESTApi.fromJson(transformJson, 
Transform.class);
-            columnMasking.put(columnName, transform);
-        }
-
-        return new TableQueryAuthResult(rowFilter, columnMasking);
-    }
-}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
index f628a68ea0..a5a84b2512 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
@@ -296,6 +296,7 @@ class MockRESTCatalogTest extends RESTCatalogTest {
         Transform transform =
                 new UpperTransform(
                         Collections.singletonList(new FieldRef(1, "col2", 
DataTypes.STRING())));
+        String transformJson = JsonSerdeUtil.toFlatJson(transform);
 
         // Set up mock response with filter and columnMasking
         List<Predicate> rowFilters = Collections.singletonList(predicate);
@@ -305,10 +306,10 @@ class MockRESTCatalogTest extends RESTCatalogTest {
         restCatalogServer.setColumnMaskingAuth(identifier, columnMasking);
 
         TableQueryAuthResult result = catalog.authTableQuery(identifier, null);
-        assertThat(result.rowFilter()).isEqualTo(predicate);
+        assertThat(result.filter()).containsOnly(predicateJson);
         assertThat(result.columnMasking()).isNotEmpty();
         assertThat(result.columnMasking()).containsKey("col2");
-        assertThat(result.columnMasking().get("col2")).isEqualTo(transform);
+        
assertThat(result.columnMasking().get("col2")).isEqualTo(transformJson);
 
         catalog.dropTable(identifier, true);
         catalog.dropDatabase(identifier.getDatabaseName(), true, true);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
index f955418b6d..33d7344f6a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/LookupCompactDiffRead.java
@@ -39,6 +39,7 @@ import static 
org.apache.paimon.table.source.KeyValueTableRead.unwrap;
 
 /** An {@link InnerTableRead} that reads the data changed before and after 
compaction. */
 public class LookupCompactDiffRead extends AbstractDataTableRead {
+
     private final SplitRead<InternalRow> fullPhaseMergeRead;
     private final SplitRead<InternalRow> incrementalDiffRead;
 
@@ -67,7 +68,7 @@ public class LookupCompactDiffRead extends 
AbstractDataTableRead {
         if (dataSplit.beforeFiles().isEmpty()) {
             return fullPhaseMergeRead.createReader(dataSplit); // full reading 
phase
         } else {
-            return incrementalDiffRead.createReader((DataSplit) split);
+            return incrementalDiffRead.createReader(split);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index 48fefa8e24..5fa01a6bb7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -58,6 +58,7 @@ import java.util.Map;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /** ITCase for REST catalog. */
@@ -549,13 +550,23 @@ class RESTCatalogITCase extends RESTCatalogITCaseBase {
         assertThat(combinedResult.get(0).getField(3)).isEqualTo(25); // age 
not masked
         assertThat(combinedResult.get(0).getField(4)).isEqualTo("IT"); // 
department not masked
 
+        // Test must read with row filter columns
+        assertThatThrownBy(
+                        () ->
+                                batchSql(
+                                        String.format(
+                                                "SELECT id, name FROM %s.%s 
WHERE age > 30 ORDER BY id",
+                                                DATABASE_NAME, combinedTable)))
+                .rootCause()
+                .hasMessageContaining("Unable to read data without column 
department");
+
         // Test WHERE clause with both features
         assertThat(
                         batchSql(
                                 String.format(
-                                        "SELECT id, name FROM %s.%s WHERE age 
> 30 ORDER BY id",
+                                        "SELECT id, name, department FROM 
%s.%s WHERE age > 30 ORDER BY id",
                                         DATABASE_NAME, combinedTable)))
-                .containsExactlyInAnyOrder(Row.of(3, "***"));
+                .containsExactlyInAnyOrder(Row.of(3, "***", "IT"));
 
         // Clear both column masking and row filter
         restCatalogServer.setColumnMaskingAuth(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
index e07db9515e..df6af4d483 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/aggregate/AggregatePushDownUtils.scala
@@ -19,7 +19,7 @@
 package org.apache.paimon.spark.aggregate
 
 import org.apache.paimon.table.FileStoreTable
-import org.apache.paimon.table.source.{DataSplit, ReadBuilder}
+import org.apache.paimon.table.source.{DataSplit, ReadBuilder, Split}
 import org.apache.paimon.table.source.PushDownUtils.minmaxAvailable
 import org.apache.paimon.types._
 
@@ -29,6 +29,7 @@ import 
org.apache.spark.sql.execution.datasources.v2.V2ColumnUtils.extractV2Colu
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.language.postfixOps
 
 object AggregatePushDownUtils {
 
@@ -64,18 +65,23 @@ object AggregatePushDownUtils {
       case None => return None
     }
 
-    if (!splits.forall(_.mergedRowCountAvailable())) {
+    if (!splits.forall(_.isInstanceOf[DataSplit])) {
+      return None
+    }
+    val dataSplits = splits.map(_.asInstanceOf[DataSplit])
+
+    if (!dataSplits.forall(_.mergedRowCountAvailable())) {
       return None
     }
 
     val aggregator = new LocalAggregator(table)
     aggregator.initialize(aggregation)
-    splits.foreach(aggregator.update)
+    dataSplits.foreach(aggregator.update)
     Option(aggregator)
   }
 
-  private def generateSplits(readBuilder: ReadBuilder): mutable.Seq[DataSplit] 
= {
-    
readBuilder.newScan().plan().splits().asScala.map(_.asInstanceOf[DataSplit])
+  private def generateSplits(readBuilder: ReadBuilder): mutable.Seq[Split] = {
+    readBuilder.newScan().plan().splits().asScala
   }
 
   private def extractMinMaxColumns(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
index 8e635d404c..679234bc74 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/scan/BinPackingSplits.scala
@@ -25,7 +25,7 @@ import org.apache.paimon.spark.PaimonInputPartition
 import org.apache.paimon.spark.util.SplitUtils
 import org.apache.paimon.table.FallbackReadFileStoreTable.FallbackSplit
 import org.apache.paimon.table.format.FormatDataSplit
-import org.apache.paimon.table.source.{DataSplit, DeletionFile, 
QueryAuthSplit, Split}
+import org.apache.paimon.table.source.{DataSplit, DeletionFile, Split}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.PaimonSparkSession
@@ -160,32 +160,19 @@ case class BinPackingSplits(coreOptions: CoreOptions, 
readRowSizeRatio: Double =
       split: DataSplit,
       dataFiles: Seq[DataFileMeta],
       deletionFiles: Seq[DeletionFile]): DataSplit = {
-    val (actualSplit, authResult) = split match {
-      case queryAuthSplit: QueryAuthSplit =>
-        (queryAuthSplit.dataSplit(), queryAuthSplit.authResult())
-      case _ =>
-        (split, null)
-    }
-
     val builder = DataSplit
       .builder()
-      .withSnapshot(actualSplit.snapshotId())
-      .withPartition(actualSplit.partition())
-      .withBucket(actualSplit.bucket())
-      .withTotalBuckets(actualSplit.totalBuckets())
+      .withSnapshot(split.snapshotId())
+      .withPartition(split.partition())
+      .withBucket(split.bucket())
+      .withTotalBuckets(split.totalBuckets())
       .withDataFiles(dataFiles.toList.asJava)
-      .rawConvertible(actualSplit.rawConvertible)
-      .withBucketPath(actualSplit.bucketPath)
+      .rawConvertible(split.rawConvertible)
+      .withBucketPath(split.bucketPath)
     if (deletionVectors) {
       builder.withDataDeletionFiles(deletionFiles.toList.asJava)
     }
-    val newDataSplit = builder.build()
-
-    if (authResult != null) {
-      new QueryAuthSplit(newDataSplit, authResult)
-    } else {
-      newDataSplit
-    }
+    builder.build()
   }
 
   private def withSamePartitionAndBucket(split1: DataSplit, split2: 
DataSplit): Boolean = {
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
index bbae01af95..cdf6384da3 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
@@ -71,6 +71,7 @@ import java.util.Map;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for spark read from Rest catalog. */
 public class SparkCatalogWithRestTest {
@@ -511,12 +512,21 @@ public class SparkCatalogWithRestTest {
         assertThat(combinedResult.get(0).getInt(3)).isEqualTo(25); // age not 
masked
         assertThat(combinedResult.get(0).getString(4)).isEqualTo("IT"); // 
department not masked
 
+        // Test must read with row filter columns
+        assertThatThrownBy(
+                        () ->
+                                spark.sql(
+                                                "SELECT id, name FROM 
t_combined WHERE age > 30 ORDER BY id")
+                                        .collectAsList())
+                .hasMessageContaining("Unable to read data without column 
department");
+
         // Test WHERE clause with both features
         assertThat(
-                        spark.sql("SELECT id, name FROM t_combined WHERE age > 
30 ORDER BY id")
+                        spark.sql(
+                                        "SELECT id, name, department FROM 
t_combined WHERE age > 30 ORDER BY id")
                                 .collectAsList()
                                 .toString())
-                .isEqualTo("[[3,***]]");
+                .isEqualTo("[[3,***,IT]]");
 
         // Clear both column masking and row filter
         restCatalogServer.setColumnMaskingAuth(

Reply via email to