This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a19973db [core][spark] Supports to push down limit (#2367)
9a19973db is described below

commit 9a19973db26f30339f246acdb503b3f682556b87
Author: Yann Byron <[email protected]>
AuthorDate: Wed Nov 29 19:41:18 2023 +0800

    [core][spark] Supports to push down limit (#2367)
---
 .../org/apache/paimon/table/source/DataSplit.java  |  13 ++
 .../apache/paimon/table/source/InnerTableScan.java |   4 +
 .../paimon/table/source/InnerTableScanImpl.java    |  72 +++++++++-
 .../org/apache/paimon/table/source/RawFile.java    |  17 ++-
 .../apache/paimon/table/source/ReadBuilder.java    |   3 +
 .../paimon/table/source/ReadBuilderImpl.java       |  16 ++-
 .../table/source/snapshot/SnapshotReaderImpl.java  |   3 +-
 .../apache/paimon/table/source/TableScanTest.java  |  32 +++++
 .../table/source/snapshot/ScannerTestBase.java     |  26 +++-
 .../table/source/snapshot/SnapshotReaderTest.java  |  15 ++-
 paimon-spark/paimon-spark-3.1/pom.xml              | 141 ++++++++++++++++++++
 .../apache/paimon/spark/PaimonScanBuilder.scala    |  23 +---
 paimon-spark/paimon-spark-3.2/pom.xml              | 148 +++++++++++++++++++++
 .../apache/paimon/spark/PaimonScanBuilder.scala    |  23 +---
 .../paimon/spark/SparkGenericCatalogTest.java      | 119 +++++++++++++++++
 .../src/test/resources/log4j2-test.properties      |  28 ++++
 .../org/apache/paimon/spark/SparkScanBuilder.java  | 106 ---------------
 .../java/org/apache/paimon/spark/SparkTable.java   |   2 +-
 .../paimon/spark/PaimonBaseScanBuilder.scala       | 107 +++++++++++++++
 .../apache/paimon/spark/PaimonScanBuilder.scala    |  33 +++--
 .../PaimonPushDownTest.scala}                      |  59 ++++++++
 21 files changed, 817 insertions(+), 173 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 5226739df..63110c0bd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -207,6 +207,19 @@ public class DataSplit implements Split {
         return new Builder();
     }
 
+    public static Builder builder(DataSplit split) {
+        Builder builder = builder();
+        builder.withSnapshot(split.snapshotId);
+        builder.withPartition(split.partition);
+        builder.withBucket(split.bucket);
+        builder.withBeforeFiles(split.beforeFiles);
+        builder.withDataFiles(split.dataFiles);
+        builder.isStreaming(split.isStreaming);
+        builder.rawFiles(split.rawFiles);
+
+        return builder;
+    }
+
     /** Builder for {@link DataSplit}. */
     public static class Builder {
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index 3433aad36..234d82e52 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -28,6 +28,10 @@ public interface InnerTableScan extends TableScan {
 
     InnerTableScan withFilter(Predicate predicate);
 
+    default InnerTableScan withLimit(int limit) {
+        return this;
+    }
+
     default InnerTableScan withPartitionFilter(Map<String, String> 
partitionSpec) {
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index 29f890a91..6dbc6ed32 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -25,6 +25,10 @@ import 
org.apache.paimon.table.source.snapshot.SnapshotReader;
 import org.apache.paimon.table.source.snapshot.StartingScanner;
 import org.apache.paimon.utils.SnapshotManager;
 
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Map;
 
 /** {@link TableScan} implementation for batch planning. */
@@ -36,6 +40,8 @@ public class InnerTableScanImpl extends 
AbstractInnerTableScan {
     private StartingScanner startingScanner;
     private boolean hasNext;
 
+    private Integer pushDownLimit;
+
     public InnerTableScanImpl(
             CoreOptions options,
             SnapshotReader snapshotReader,
@@ -59,6 +65,12 @@ public class InnerTableScanImpl extends 
AbstractInnerTableScan {
         return this;
     }
 
+    @Override
+    public InnerTableScan withLimit(int limit) {
+        this.pushDownLimit = limit;
+        return this;
+    }
+
     @Override
     public TableScan.Plan plan() {
         if (startingScanner == null) {
@@ -68,9 +80,67 @@ public class InnerTableScanImpl extends 
AbstractInnerTableScan {
         if (hasNext) {
             hasNext = false;
             StartingScanner.Result result = 
startingScanner.scan(snapshotReader);
-            return DataFilePlan.fromResult(result);
+            StartingScanner.Result limitedResult = applyPushDownLimit(result);
+            return DataFilePlan.fromResult(limitedResult);
         } else {
             throw new EndOfScanException();
         }
     }
+
+    private StartingScanner.Result applyPushDownLimit(StartingScanner.Result 
result) {
+        if (pushDownLimit != null && result instanceof 
StartingScanner.ScannedResult) {
+            long scannedRowCount = 0;
+            SnapshotReader.Plan plan = ((StartingScanner.ScannedResult) 
result).plan();
+            List<DataSplit> splits = plan.dataSplits();
+            List<DataSplit> limitedSplits = new ArrayList<>();
+            for (int i = 0; i < splits.size(); i++) {
+                if (scannedRowCount >= pushDownLimit) {
+                    break;
+                }
+
+                DataSplit split = splits.get(i);
+                long splitRowCount = getRowCountForSplit(split);
+                limitedSplits.add(split);
+                scannedRowCount += splitRowCount;
+            }
+
+            SnapshotReader.Plan newPlan =
+                    new SnapshotReader.Plan() {
+                        @Nullable
+                        @Override
+                        public Long watermark() {
+                            return plan.watermark();
+                        }
+
+                        @Nullable
+                        @Override
+                        public Long snapshotId() {
+                            return plan.snapshotId();
+                        }
+
+                        @Override
+                        public List<Split> splits() {
+                            return (List) limitedSplits;
+                        }
+                    };
+            return new StartingScanner.ScannedResult(newPlan);
+        } else {
+            return result;
+        }
+    }
+
+    /**
+     * 0 represents that we can't compute the row count of this split, 'cause 
this split needs to be
+     * merged.
+     */
+    private long getRowCountForSplit(DataSplit split) {
+        if (split.convertToRawFiles().isPresent()) {
+            return split.convertToRawFiles().get().stream()
+                    .map(RawFile::rowCount)
+                    .reduce(Long::sum)
+                    .orElse(0L);
+        } else {
+            return 0L;
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
index 2923304e5..177b57bfa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
@@ -34,18 +34,24 @@ import java.util.Objects;
 @Public
 public class RawFile {
 
+    private static final long serialVersionUID = 2L;
+
     private final String path;
     private final long offset;
     private final long length;
     private final String format;
     private final long schemaId;
 
-    public RawFile(String path, long offset, long length, String format, long 
schemaId) {
+    private final long rowCount;
+
+    public RawFile(
+            String path, long offset, long length, String format, long 
schemaId, long rowCount) {
         this.path = path;
         this.offset = offset;
         this.length = length;
         this.format = format;
         this.schemaId = schemaId;
+        this.rowCount = rowCount;
     }
 
     /** Path of the file. */
@@ -76,12 +82,18 @@ public class RawFile {
         return schemaId;
     }
 
+    /** row count of the file. */
+    public long rowCount() {
+        return rowCount;
+    }
+
     public void serialize(DataOutputView out) throws IOException {
         out.writeUTF(path);
         out.writeLong(offset);
         out.writeLong(length);
         out.writeUTF(format);
         out.writeLong(schemaId);
+        out.writeLong(rowCount);
     }
 
     public static RawFile deserialize(DataInputView in) throws IOException {
@@ -90,8 +102,9 @@ public class RawFile {
         long length = in.readLong();
         String format = in.readUTF();
         long schemaId = in.readLong();
+        long rowCount = in.readLong();
 
-        return new RawFile(path, offset, length, format, schemaId);
+        return new RawFile(path, offset, length, format, schemaId, rowCount);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index e83ae0bfa..fc7e41569 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -119,6 +119,9 @@ public interface ReadBuilder extends Serializable {
      */
     ReadBuilder withProjection(int[][] projection);
 
+    /** the row number pushed down. */
+    ReadBuilder withLimit(int limit);
+
     /** Create a {@link TableScan} to perform batch planning. */
     TableScan newScan();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 4fb0dec37..6805f30e4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -37,6 +37,9 @@ public class ReadBuilderImpl implements ReadBuilder {
 
     private Predicate filter;
     private int[][] projection;
+
+    private Integer limit = null;
+
     private Map<String, String> partitionSpec;
 
     public ReadBuilderImpl(InnerTable table) {
@@ -74,9 +77,20 @@ public class ReadBuilderImpl implements ReadBuilder {
         return this;
     }
 
+    @Override
+    public ReadBuilder withLimit(int limit) {
+        this.limit = limit;
+        return this;
+    }
+
     @Override
     public TableScan newScan() {
-        return 
table.newScan().withFilter(filter).withPartitionFilter(partitionSpec);
+        InnerTableScan tableScan =
+                
table.newScan().withFilter(filter).withPartitionFilter(partitionSpec);
+        if (limit != null) {
+            tableScan.withLimit(limit);
+        }
+        return tableScan;
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index d36eb76ce..38e996ddc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -449,6 +449,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
                                         .formatType()
                                         .toString()
                                         .toLowerCase()),
-                meta.schemaId());
+                meta.schemaId(),
+                meta.rowCount());
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
index 0109125f5..8152fe2ca 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
@@ -66,4 +66,36 @@ public class TableScanTest extends ScannerTestBase {
         write.close();
         commit.close();
     }
+
+    @Test
+    public void testPushDownLimit() throws Exception {
+        createAppenOnlyTable();
+
+        StreamTableWrite write = table.newWrite(commitUser);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        write.write(rowData(1, 10, 100L));
+        write.write(rowData(2, 20, 200L));
+        write.write(rowData(3, 30, 300L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(4, 40, 400L));
+        write.write(rowData(5, 50, 500L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        // no limit pushed down
+        TableScan.Plan plan1 = table.newScan().plan();
+        assertThat(plan1.splits().size()).isEqualTo(5);
+
+        // with limit 1
+        TableScan.Plan plan2 = table.newScan().withLimit(1).plan();
+        assertThat(plan2.splits().size()).isEqualTo(1);
+
+        // with limit4
+        TableScan.Plan plan3 = table.newScan().withLimit(4).plan();
+        assertThat(plan3.splits().size()).isEqualTo(4);
+
+        write.close();
+        commit.close();
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
index df1d2b533..7fea1b42d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
@@ -48,6 +48,7 @@ import org.apache.paimon.utils.TraceableFileIO;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.io.TempDir;
 
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -83,6 +84,14 @@ public abstract class ScannerTestBase {
         snapshotReader = table.newSnapshotReader();
     }
 
+    protected void createAppenOnlyTable() throws Exception {
+        tempDir = Files.createTempDirectory("junit");
+        tablePath = new Path(TraceableFileIO.SCHEME + "://" + 
tempDir.toString());
+        fileIO = FileIOFinder.find(tablePath);
+        table = createFileStoreTable(false);
+        snapshotReader = table.newSnapshotReader();
+    }
+
     protected GenericRow rowData(Object... values) {
         return GenericRow.of(values);
     }
@@ -125,17 +134,30 @@ public abstract class ScannerTestBase {
     }
 
     protected FileStoreTable createFileStoreTable() throws Exception {
-        return createFileStoreTable(new Options());
+        return createFileStoreTable(true, new Options());
     }
 
     protected FileStoreTable createFileStoreTable(Options conf) throws 
Exception {
+        return createFileStoreTable(true, conf);
+    }
+
+    protected FileStoreTable createFileStoreTable(boolean withPrimaryKeys) 
throws Exception {
+        return createFileStoreTable(withPrimaryKeys, new Options());
+    }
+
+    protected FileStoreTable createFileStoreTable(boolean withPrimaryKeys, 
Options conf)
+            throws Exception {
         SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+        List<String> primaryKeys = new ArrayList<>();
+        if (withPrimaryKeys) {
+            primaryKeys = Arrays.asList("pt", "a");
+        }
         TableSchema tableSchema =
                 schemaManager.createTable(
                         new Schema(
                                 ROW_TYPE.getFields(),
                                 Collections.singletonList("pt"),
-                                Arrays.asList("pt", "a"),
+                                primaryKeys,
                                 conf.toMap(),
                                 ""));
         return FileStoreTableFactory.create(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
index a2b7faa9f..f38ce9064 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
@@ -110,7 +110,8 @@ public class SnapshotReaderTest {
                                             0,
                                             meta.fileSize(),
                                             "avro",
-                                            meta.schemaId())));
+                                            meta.schemaId(),
+                                            meta.rowCount())));
         }
 
         // write another file on level 0
@@ -158,7 +159,8 @@ public class SnapshotReaderTest {
                                             0,
                                             meta.fileSize(),
                                             meta.level() == 5 ? "orc" : "avro",
-                                            meta.schemaId())));
+                                            meta.schemaId(),
+                                            meta.rowCount())));
         }
 
         // write another file on level 0
@@ -215,7 +217,8 @@ public class SnapshotReaderTest {
                                         0,
                                         meta.fileSize(),
                                         "avro",
-                                        meta.schemaId())));
+                                        meta.schemaId(),
+                                        meta.rowCount())));
 
         // change file schema
 
@@ -248,14 +251,16 @@ public class SnapshotReaderTest {
                                         0,
                                         meta0.fileSize(),
                                         "avro",
-                                        meta0.schemaId()),
+                                        meta0.schemaId(),
+                                        meta0.rowCount()),
                                 new RawFile(
                                         String.format(
                                                 "%s/bucket-0/%s", tablePath, 
meta1.fileName()),
                                         0,
                                         meta1.fileSize(),
                                         "avro",
-                                        meta1.schemaId())));
+                                        meta1.schemaId(),
+                                        meta1.rowCount())));
 
         write.close();
         commit.close();
diff --git a/paimon-spark/paimon-spark-3.1/pom.xml 
b/paimon-spark/paimon-spark-3.1/pom.xml
index 93fac2e5e..d43ab9a15 100644
--- a/paimon-spark/paimon-spark-3.1/pom.xml
+++ b/paimon-spark/paimon-spark-3.1/pom.xml
@@ -42,6 +42,22 @@ under the License.
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_2.12</artifactId>
@@ -55,9 +71,89 @@ under the License.
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-log4j12</artifactId>
                 </exclusion>
+                <!-- SPARK-40511 upgrades SLF4J2, which is not compatible w/ 
SLF4J1 -->
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j2-impl</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.12</artifactId>
+            <version>${spark.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.12</artifactId>
+            <version>${spark.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j2-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_2.12</artifactId>
+            <version>${spark.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.binary.version}</artifactId>
+            <version>3.1.0</version>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.avro</groupId>
             <artifactId>avro</artifactId>
@@ -88,6 +184,51 @@ under the License.
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>${scala-maven-plugin.version}</version>
+                <executions>
+                    <!-- Run scala compiler in the process-resources phase, so 
that dependencies on
+                        scala classes can be resolved later in the (Java) 
compile phase -->
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+
+                    <!-- Run scala compiler in the process-test-resources 
phase, so that dependencies on
+                         scala classes can be resolved later in the (Java) 
test-compile phase -->
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>com.diffplug.spotless</groupId>
+                <artifactId>spotless-maven-plugin</artifactId>
+                <version>${spotless.version}</version>
+                <configuration>
+                    <scala>
+                        <scalafmt>
+                            <version>3.4.3</version>
+                            <!-- This file is in the root of the project to 
make sure IntelliJ picks it up automatically -->
+                            
<file>${project.basedir}/../../.scalafmt.conf</file>
+                        </scalafmt>
+                        <licenseHeader>
+                            <content>${spotless.license.header}</content>
+                            <delimiter>${spotless.delimiter}</delimiter>
+                        </licenseHeader>
+                    </scala>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 </project>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java 
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
similarity index 57%
copy from 
paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
copy to 
paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 3433aad36..2b1128989 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ 
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -15,25 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.paimon.spark
 
-package org.apache.paimon.table.source;
+import org.apache.paimon.table.Table
 
-import org.apache.paimon.metrics.MetricRegistry;
-import org.apache.paimon.predicate.Predicate;
-
-import java.util.Map;
-
-/** Inner {@link TableScan} contains filter push down. */
-public interface InnerTableScan extends TableScan {
-
-    InnerTableScan withFilter(Predicate predicate);
-
-    default InnerTableScan withPartitionFilter(Map<String, String> 
partitionSpec) {
-        return this;
-    }
-
-    default InnerTableScan withMetricsRegistry(MetricRegistry metricRegistry) {
-        // do nothing, should implement this if need
-        return this;
-    }
-}
+class PaimonScanBuilder(table: Table) extends PaimonBaseScanBuilder(table)
diff --git a/paimon-spark/paimon-spark-3.2/pom.xml 
b/paimon-spark/paimon-spark-3.2/pom.xml
index 3e0853677..23ec79ee5 100644
--- a/paimon-spark/paimon-spark-3.2/pom.xml
+++ b/paimon-spark/paimon-spark-3.2/pom.xml
@@ -42,6 +42,22 @@ under the License.
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-sql_2.12</artifactId>
@@ -55,8 +71,95 @@ under the License.
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-log4j12</artifactId>
                 </exclusion>
+                <!-- SPARK-40511 upgrades SLF4J2, which is not compatible w/ 
SLF4J1 -->
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j2-impl</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.12</artifactId>
+            <version>${spark.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.12</artifactId>
+            <version>${spark.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j2-impl</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-catalyst_2.12</artifactId>
+            <version>${spark.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.orc</groupId>
+                    <artifactId>orc-core</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.binary.version}</artifactId>
+            <version>3.1.0</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+            <version>${avro.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -81,6 +184,51 @@ under the License.
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>${scala-maven-plugin.version}</version>
+                <executions>
+                    <!-- Run scala compiler in the process-resources phase, so 
that dependencies on
+                        scala classes can be resolved later in the (Java) 
compile phase -->
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+
+                    <!-- Run scala compiler in the process-test-resources 
phase, so that dependencies on
+                         scala classes can be resolved later in the (Java) 
test-compile phase -->
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>com.diffplug.spotless</groupId>
+                <artifactId>spotless-maven-plugin</artifactId>
+                <version>${spotless.version}</version>
+                <configuration>
+                    <scala>
+                        <scalafmt>
+                            <version>3.4.3</version>
+                            <!-- This file is in the root of the project to 
make sure IntelliJ picks it up automatically -->
+                            
<file>${project.basedir}/../../.scalafmt.conf</file>
+                        </scalafmt>
+                        <licenseHeader>
+                            <content>${spotless.license.header}</content>
+                            <delimiter>${spotless.delimiter}</delimiter>
+                        </licenseHeader>
+                    </scala>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 </project>
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
similarity index 57%
copy from 
paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
copy to 
paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 3433aad36..2b1128989 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ 
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -15,25 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.paimon.spark
 
-package org.apache.paimon.table.source;
+import org.apache.paimon.table.Table
 
-import org.apache.paimon.metrics.MetricRegistry;
-import org.apache.paimon.predicate.Predicate;
-
-import java.util.Map;
-
-/** Inner {@link TableScan} contains filter push down. */
-public interface InnerTableScan extends TableScan {
-
-    InnerTableScan withFilter(Predicate predicate);
-
-    default InnerTableScan withPartitionFilter(Map<String, String> 
partitionSpec) {
-        return this;
-    }
-
-    default InnerTableScan withMetricsRegistry(MetricRegistry metricRegistry) {
-        // do nothing, should implement this if need
-        return this;
-    }
-}
+class PaimonScanBuilder(table: Table) extends PaimonBaseScanBuilder(table)
diff --git 
a/paimon-spark/paimon-spark-3.2/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
 
b/paimon-spark/paimon-spark-3.2/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
new file mode 100644
index 000000000..f632998c5
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.2/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base tests for spark read. */
+public class SparkGenericCatalogTest {
+
+    protected static SparkSession spark = null;
+
+    protected static Path warehousePath = null;
+
+    @BeforeAll
+    public static void startMetastoreAndSpark(@TempDir java.nio.file.Path 
tempDir) {
+        warehousePath = new Path("file:" + tempDir.toString());
+        spark =
+                SparkSession.builder()
+                        .config("spark.sql.warehouse.dir", 
warehousePath.toString())
+                        .master("local[2]")
+                        .getOrCreate();
+        spark.conf().set("spark.sql.catalog.spark_catalog", 
SparkGenericCatalog.class.getName());
+    }
+
+    @AfterAll
+    public static void stopMetastoreAndSpark() {
+        if (spark != null) {
+            spark.stop();
+            spark = null;
+        }
+    }
+
+    @Test
+    public void testPaimonTable() throws Exception {
+        spark.sql(
+                "CREATE TABLE PT (a INT, b INT, c STRING) USING paimon 
TBLPROPERTIES"
+                        + " ('file.format'='avro')");
+        writeTable(
+                "PT",
+                GenericRow.of(1, 2, BinaryString.fromString("3")),
+                GenericRow.of(4, 5, BinaryString.fromString("6")));
+        assertThat(spark.sql("SELECT * FROM 
PT").collectAsList().stream().map(Object::toString))
+                .containsExactlyInAnyOrder("[1,2,3]", "[4,5,6]");
+
+        spark.sql("CREATE DATABASE my_db");
+        spark.sql(
+                "CREATE TABLE DB_PT (a INT, b INT, c STRING) USING paimon 
TBLPROPERTIES"
+                        + " ('file.format'='avro')");
+        writeTable(
+                "DB_PT",
+                GenericRow.of(1, 2, BinaryString.fromString("3")),
+                GenericRow.of(4, 5, BinaryString.fromString("6")));
+        assertThat(spark.sql("SELECT * FROM 
DB_PT").collectAsList().stream().map(Object::toString))
+                .containsExactlyInAnyOrder("[1,2,3]", "[4,5,6]");
+
+        assertThat(spark.sql("SHOW 
NAMESPACES").collectAsList().stream().map(Object::toString))
+                .containsExactlyInAnyOrder("[default]", "[my_db]");
+    }
+
+    @Test
+    public void testCsvTable() {
+        spark.sql("CREATE TABLE CT (a INT, b INT, c STRING) USING csv");
+        spark.sql("INSERT INTO CT VALUES (1, 2, '3'), (4, 5, 
'6')").collectAsList();
+        List<Row> rows = spark.sql("SELECT * FROM CT").collectAsList();
+        assertThat(rows.stream().map(Object::toString))
+                .containsExactlyInAnyOrder("[1,2,3]", "[4,5,6]");
+    }
+
+    private static void writeTable(String tableName, GenericRow... rows) 
throws Exception {
+        FileStoreTable fileStoreTable =
+                FileStoreTableFactory.create(
+                        LocalFileIO.create(),
+                        new Path(warehousePath, String.format("default.db/%s", 
tableName)));
+        BatchWriteBuilder writeBuilder = fileStoreTable.newBatchWriteBuilder();
+        BatchTableWrite writer = writeBuilder.newWrite();
+        BatchTableCommit commit = writeBuilder.newCommit();
+        for (GenericRow row : rows) {
+            writer.write(row);
+        }
+        commit.commit(writer.prepareCommit());
+        writer.close();
+        commit.close();
+    }
+}
diff --git 
a/paimon-spark/paimon-spark-3.2/src/test/resources/log4j2-test.properties 
b/paimon-spark/paimon-spark-3.2/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..1b3980d15
--- /dev/null
+++ b/paimon-spark/paimon-spark-3.2/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
deleted file mode 100644
index 06652ff9f..000000000
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark;
-
-import org.apache.paimon.predicate.PartitionPredicateVisitor;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.Table;
-
-import org.apache.spark.sql.connector.read.Scan;
-import org.apache.spark.sql.connector.read.ScanBuilder;
-import org.apache.spark.sql.connector.read.SupportsPushDownFilters;
-import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.types.StructType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/** A Spark {@link ScanBuilder} for paimon. */
-public class SparkScanBuilder
-        implements ScanBuilder, SupportsPushDownFilters, 
SupportsPushDownRequiredColumns {
-    private static final Logger LOG = 
LoggerFactory.getLogger(SparkScanBuilder.class);
-    private final Table table;
-
-    private List<Predicate> predicates = new ArrayList<>();
-    private Filter[] pushedFilters;
-    private int[] projectedFields;
-
-    public SparkScanBuilder(Table table) {
-        this.table = table;
-    }
-
-    @Override
-    public Filter[] pushFilters(Filter[] filters) {
-        // There are 3 kinds of filters:
-        // (1) pushable filters which don't need to be evaluated again after 
scanning, e.g. filter
-        // partitions.
-        // (2) pushable filters which still need to be evaluated after 
scanning.
-        // (3) non-pushable filters.
-        // case 1 and 2 are considered as pushable filters and will be 
returned by pushedFilters().
-        // case 2 and 3 are considered as postScan filters and should be 
return by this method.
-        List<Filter> pushable = new ArrayList<>(filters.length);
-        List<Filter> postScan = new ArrayList<>(filters.length);
-        List<Predicate> predicates = new ArrayList<>(filters.length);
-
-        SparkFilterConverter converter = new 
SparkFilterConverter(table.rowType());
-        PartitionPredicateVisitor visitor = new 
PartitionPredicateVisitor(table.partitionKeys());
-        for (Filter filter : filters) {
-            try {
-                Predicate predicate = converter.convert(filter);
-                predicates.add(predicate);
-                pushable.add(filter);
-                if (!predicate.visit(visitor)) {
-                    postScan.add(filter);
-                }
-            } catch (UnsupportedOperationException e) {
-                LOG.warn(e.getMessage());
-                postScan.add(filter);
-            }
-        }
-        this.predicates = predicates;
-        this.pushedFilters = pushable.toArray(new Filter[0]);
-        return postScan.toArray(new Filter[0]);
-    }
-
-    @Override
-    public Filter[] pushedFilters() {
-        return pushedFilters;
-    }
-
-    @Override
-    public void pruneColumns(StructType requiredSchema) {
-        String[] pruneFields = requiredSchema.fieldNames();
-        List<String> fieldNames = table.rowType().getFieldNames();
-        int[] projected = new int[pruneFields.length];
-        for (int i = 0; i < projected.length; i++) {
-            projected[i] = fieldNames.indexOf(pruneFields[i]);
-        }
-        this.projectedFields = projected;
-    }
-
-    @Override
-    public Scan build() {
-        return new SparkScan(
-                table,
-                
table.newReadBuilder().withFilter(predicates).withProjection(projectedFields));
-    }
-}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index 6b21fad0a..4a35f759b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -66,7 +66,7 @@ public class SparkTable
     @Override
     public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
         Table newTable = table.copy(options.asCaseSensitiveMap());
-        return new SparkScanBuilder(newTable);
+        return new PaimonScanBuilder(newTable);
     }
 
     @Override
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
new file mode 100644
index 000000000..d7d4535ea
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate, 
PredicateBuilder}
+import org.apache.paimon.table.Table
+import org.apache.paimon.table.source.ReadBuilder
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, 
SupportsPushDownFilters, SupportsPushDownRequiredColumns}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.mutable
+
+abstract class PaimonBaseScanBuilder(table: Table)
+  extends ScanBuilder
+  with SupportsPushDownFilters
+  with SupportsPushDownRequiredColumns
+  with Logging {
+
+  protected var predicates: Option[Predicate] = None
+
+  protected var pushed: Option[Array[Filter]] = None
+
+  protected var projectedIndexes: Option[Array[Int]] = None
+
+  protected def getReadBuilder(): ReadBuilder = {
+    val readBuilder = table.newReadBuilder()
+    projectedIndexes.foreach(readBuilder.withProjection)
+    predicates.foreach(readBuilder.withFilter)
+
+    readBuilder
+  }
+
+  override def build(): Scan = {
+    new SparkScan(table, getReadBuilder());
+  }
+
+  /**
+   * Pushes down filters, and returns filters that need to be evaluated after 
scanning. <p> Rows
+   * should be returned from the data source if and only if all of the filters 
match. That is,
+   * filters must be interpreted as ANDed together.
+   */
+  override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+    val pushable = mutable.ArrayBuffer.empty[Filter]
+    val postScan = mutable.ArrayBuffer.empty[Filter]
+    val predicates = mutable.ArrayBuffer.empty[Predicate]
+
+    val converter = new SparkFilterConverter(table.rowType)
+    val visitor = new PartitionPredicateVisitor(table.partitionKeys())
+    filters.foreach {
+      filter =>
+        try {
+          val predicate = converter.convert(filter)
+          pushable.append(filter)
+          predicates.append(predicate)
+          if (!predicate.visit(visitor)) postScan.append(filter)
+        } catch {
+          case e: UnsupportedOperationException =>
+            logWarning(e.getMessage)
+            postScan.append(filter)
+        }
+    }
+
+    if (predicates.nonEmpty) {
+      this.predicates = Some(PredicateBuilder.and(predicates: _*))
+    }
+    this.pushed = Some(pushable.toArray)
+    postScan.toArray
+  }
+
+  /**
+   * Returns the filters that are pushed to the data source via {@link # 
pushFilters ( Filter [ ]
+   * )}. <p> There are 3 kinds of filters: <ol> <li>pushable filters which 
don't need to be
+   * evaluated again after scanning.</li> <li>pushable filters which still 
need to be evaluated
+   * after scanning, e.g. parquet row group filter.</li> <li>non-pushable 
filters.</li> </ol> <p>
+   * Both case 1 and 2 should be considered as pushed filters and should be 
returned by this method.
+   * <p> It's possible that there is no filters in the query and {@link # 
pushFilters ( Filter [ ]
+   * )} is never called, empty array should be returned for this case.
+   */
+  override def pushedFilters(): Array[Filter] = {
+    pushed.getOrElse(Array.empty)
+  }
+
+  override def pruneColumns(requiredSchema: StructType): Unit = {
+    val pruneFields = requiredSchema.fieldNames
+    val fieldNames = table.rowType.getFieldNames
+    val projected = pruneFields.map(field => fieldNames.indexOf(field))
+    this.projectedIndexes = Some(projected)
+  }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
similarity index 52%
copy from 
paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
copy to 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 3433aad36..55f24d93a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -15,25 +15,30 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.paimon.spark
 
-package org.apache.paimon.table.source;
+import org.apache.paimon.table.{AppendOnlyFileStoreTable, Table}
+import org.apache.paimon.table.source.ReadBuilder
 
-import org.apache.paimon.metrics.MetricRegistry;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownLimit}
 
-import java.util.Map;
+class PaimonScanBuilder(table: Table)
+  extends PaimonBaseScanBuilder(table)
+  with SupportsPushDownLimit {
 
-/** Inner {@link TableScan} contains filter push down. */
-public interface InnerTableScan extends TableScan {
+  private var pushDownLimit: Option[Int] = None
 
-    InnerTableScan withFilter(Predicate predicate);
+  override protected def getReadBuilder(): ReadBuilder = {
+    val readBuilder = super.getReadBuilder()
+    pushDownLimit.foreach(readBuilder.withLimit)
+    readBuilder
+  }
 
-    default InnerTableScan withPartitionFilter(Map<String, String> 
partitionSpec) {
-        return this;
-    }
-
-    default InnerTableScan withMetricsRegistry(MetricRegistry metricRegistry) {
-        // do nothing, should implement this if need
-        return this;
+  override def pushLimit(limit: Int): Boolean = {
+    if (table.isInstanceOf[AppendOnlyFileStoreTable]) {
+      pushDownLimit = Some(limit)
     }
+    // just make a best effort to push down limit
+    false
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownTest.scala
 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
similarity index 62%
rename from 
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownTest.scala
rename to 
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
index 6d1360b95..9d3532adf 100644
--- 
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownTest.scala
+++ 
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
@@ -17,11 +17,17 @@
  */
 package org.apache.paimon.spark
 
+import org.apache.paimon.table.source.DataSplit
+
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, 
Expression, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.Filter
+import org.apache.spark.sql.connector.read.{ScanBuilder, SupportsPushDownLimit}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.junit.jupiter.api.Assertions
 
+import scala.collection.JavaConverters._
+
 class SparkPushDownTest extends PaimonSparkTestBase {
 
   test(s"Paimon push down: apply partition filter push down with 
non-partitioned table") {
@@ -75,6 +81,59 @@ class SparkPushDownTest extends PaimonSparkTestBase {
     checkAnswer(spark.sql(q), Row(1, "a", "p1") :: Row(2, "b", "p1") :: Row(3, 
"c", "p2") :: Nil)
   }
 
+  test("Paimon pushDown: limit for append-only tables") {
+    spark.sql(s"""
+                 |CREATE TABLE T (a INT, b STRING, c STRING)
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')")
+    spark.sql("INSERT INTO T VALUES (3, 'c', '11'), (4, 'd', '22')")
+
+    checkAnswer(
+      spark.sql("SELECT * FROM T ORDER BY a"),
+      Row(1, "a", "11") :: Row(2, "b", "22") :: Row(3, "c", "11") :: Row(4, 
"d", "22") :: Nil)
+
+    val scanBuilder = getScanBuilder()
+    Assertions.assertTrue(scanBuilder.isInstanceOf[SupportsPushDownLimit])
+
+    val dataFilesWithoutLimit = 
scanBuilder.build().toBatch.planInputPartitions().flatMap {
+      case partition: SparkInputPartition =>
+        partition.split() match {
+          case dataSplit: DataSplit => dataSplit.dataFiles().asScala
+          case _ => Seq.empty
+        }
+    }
+    Assertions.assertTrue(dataFilesWithoutLimit.length >= 2)
+
+    // It still return false even it can push down limit.
+    
Assertions.assertFalse(scanBuilder.asInstanceOf[SupportsPushDownLimit].pushLimit(1))
+    val partitions = scanBuilder.build().toBatch.planInputPartitions()
+    Assertions.assertEquals(1, partitions.length)
+
+    Assertions.assertEquals(1, spark.sql("SELECT * FROM T LIMIT 1").count())
+  }
+
+  test("Paimon pushDown: limit for change-log tables") {
+    spark.sql(s"""
+                 |CREATE TABLE T (a INT, b STRING, c STRING)
+                 |TBLPROPERTIES ('primary-key'='a')
+                 |""".stripMargin)
+
+    spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')")
+    spark.sql("INSERT INTO T VALUES (3, 'c', '11'), (4, 'd', '22')")
+
+    val scanBuilder = getScanBuilder()
+    Assertions.assertTrue(scanBuilder.isInstanceOf[SupportsPushDownLimit])
+
+    // Tables with primary keys can't support the push-down limit.
+    
Assertions.assertFalse(scanBuilder.asInstanceOf[SupportsPushDownLimit].pushLimit(1))
+  }
+
+  private def getScanBuilder(tableName: String = "T"): ScanBuilder = {
+    new SparkTable(loadTable(tableName))
+      .newScanBuilder(CaseInsensitiveStringMap.empty())
+  }
+
   private def checkFilterExists(sql: String): Boolean = {
     spark.sql(sql).queryExecution.optimizedPlan.exists {
       case Filter(_: Expression, _) => true

Reply via email to