This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9a19973db [core][spark] Supports to push down limit (#2367)
9a19973db is described below
commit 9a19973db26f30339f246acdb503b3f682556b87
Author: Yann Byron <[email protected]>
AuthorDate: Wed Nov 29 19:41:18 2023 +0800
[core][spark] Supports to push down limit (#2367)
---
.../org/apache/paimon/table/source/DataSplit.java | 13 ++
.../apache/paimon/table/source/InnerTableScan.java | 4 +
.../paimon/table/source/InnerTableScanImpl.java | 72 +++++++++-
.../org/apache/paimon/table/source/RawFile.java | 17 ++-
.../apache/paimon/table/source/ReadBuilder.java | 3 +
.../paimon/table/source/ReadBuilderImpl.java | 16 ++-
.../table/source/snapshot/SnapshotReaderImpl.java | 3 +-
.../apache/paimon/table/source/TableScanTest.java | 32 +++++
.../table/source/snapshot/ScannerTestBase.java | 26 +++-
.../table/source/snapshot/SnapshotReaderTest.java | 15 ++-
paimon-spark/paimon-spark-3.1/pom.xml | 141 ++++++++++++++++++++
.../apache/paimon/spark/PaimonScanBuilder.scala | 23 +---
paimon-spark/paimon-spark-3.2/pom.xml | 148 +++++++++++++++++++++
.../apache/paimon/spark/PaimonScanBuilder.scala | 23 +---
.../paimon/spark/SparkGenericCatalogTest.java | 119 +++++++++++++++++
.../src/test/resources/log4j2-test.properties | 28 ++++
.../org/apache/paimon/spark/SparkScanBuilder.java | 106 ---------------
.../java/org/apache/paimon/spark/SparkTable.java | 2 +-
.../paimon/spark/PaimonBaseScanBuilder.scala | 107 +++++++++++++++
.../apache/paimon/spark/PaimonScanBuilder.scala | 33 +++--
.../PaimonPushDownTest.scala} | 59 ++++++++
21 files changed, 817 insertions(+), 173 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 5226739df..63110c0bd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -207,6 +207,19 @@ public class DataSplit implements Split {
return new Builder();
}
+ public static Builder builder(DataSplit split) {
+ Builder builder = builder();
+ builder.withSnapshot(split.snapshotId);
+ builder.withPartition(split.partition);
+ builder.withBucket(split.bucket);
+ builder.withBeforeFiles(split.beforeFiles);
+ builder.withDataFiles(split.dataFiles);
+ builder.isStreaming(split.isStreaming);
+ builder.rawFiles(split.rawFiles);
+
+ return builder;
+ }
+
/** Builder for {@link DataSplit}. */
public static class Builder {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index 3433aad36..234d82e52 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -28,6 +28,10 @@ public interface InnerTableScan extends TableScan {
InnerTableScan withFilter(Predicate predicate);
+ default InnerTableScan withLimit(int limit) {
+ return this;
+ }
+
default InnerTableScan withPartitionFilter(Map<String, String>
partitionSpec) {
return this;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
index 29f890a91..6dbc6ed32 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScanImpl.java
@@ -25,6 +25,10 @@ import
org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.table.source.snapshot.StartingScanner;
import org.apache.paimon.utils.SnapshotManager;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
/** {@link TableScan} implementation for batch planning. */
@@ -36,6 +40,8 @@ public class InnerTableScanImpl extends
AbstractInnerTableScan {
private StartingScanner startingScanner;
private boolean hasNext;
+ private Integer pushDownLimit;
+
public InnerTableScanImpl(
CoreOptions options,
SnapshotReader snapshotReader,
@@ -59,6 +65,12 @@ public class InnerTableScanImpl extends
AbstractInnerTableScan {
return this;
}
+ @Override
+ public InnerTableScan withLimit(int limit) {
+ this.pushDownLimit = limit;
+ return this;
+ }
+
@Override
public TableScan.Plan plan() {
if (startingScanner == null) {
@@ -68,9 +80,67 @@ public class InnerTableScanImpl extends
AbstractInnerTableScan {
if (hasNext) {
hasNext = false;
StartingScanner.Result result =
startingScanner.scan(snapshotReader);
- return DataFilePlan.fromResult(result);
+ StartingScanner.Result limitedResult = applyPushDownLimit(result);
+ return DataFilePlan.fromResult(limitedResult);
} else {
throw new EndOfScanException();
}
}
+
+ private StartingScanner.Result applyPushDownLimit(StartingScanner.Result
result) {
+ if (pushDownLimit != null && result instanceof
StartingScanner.ScannedResult) {
+ long scannedRowCount = 0;
+ SnapshotReader.Plan plan = ((StartingScanner.ScannedResult)
result).plan();
+ List<DataSplit> splits = plan.dataSplits();
+ List<DataSplit> limitedSplits = new ArrayList<>();
+ for (int i = 0; i < splits.size(); i++) {
+ if (scannedRowCount >= pushDownLimit) {
+ break;
+ }
+
+ DataSplit split = splits.get(i);
+ long splitRowCount = getRowCountForSplit(split);
+ limitedSplits.add(split);
+ scannedRowCount += splitRowCount;
+ }
+
+ SnapshotReader.Plan newPlan =
+ new SnapshotReader.Plan() {
+ @Nullable
+ @Override
+ public Long watermark() {
+ return plan.watermark();
+ }
+
+ @Nullable
+ @Override
+ public Long snapshotId() {
+ return plan.snapshotId();
+ }
+
+ @Override
+ public List<Split> splits() {
+ return (List) limitedSplits;
+ }
+ };
+ return new StartingScanner.ScannedResult(newPlan);
+ } else {
+ return result;
+ }
+ }
+
+ /**
+ * 0 represents that we can't compute the row count of this split, 'cause
this split needs to be
+ * merged.
+ */
+ private long getRowCountForSplit(DataSplit split) {
+ if (split.convertToRawFiles().isPresent()) {
+ return split.convertToRawFiles().get().stream()
+ .map(RawFile::rowCount)
+ .reduce(Long::sum)
+ .orElse(0L);
+ } else {
+ return 0L;
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
index 2923304e5..177b57bfa 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/RawFile.java
@@ -34,18 +34,24 @@ import java.util.Objects;
@Public
public class RawFile {
+ private static final long serialVersionUID = 2L;
+
private final String path;
private final long offset;
private final long length;
private final String format;
private final long schemaId;
- public RawFile(String path, long offset, long length, String format, long
schemaId) {
+ private final long rowCount;
+
+ public RawFile(
+ String path, long offset, long length, String format, long
schemaId, long rowCount) {
this.path = path;
this.offset = offset;
this.length = length;
this.format = format;
this.schemaId = schemaId;
+ this.rowCount = rowCount;
}
/** Path of the file. */
@@ -76,12 +82,18 @@ public class RawFile {
return schemaId;
}
+ /** row count of the file. */
+ public long rowCount() {
+ return rowCount;
+ }
+
public void serialize(DataOutputView out) throws IOException {
out.writeUTF(path);
out.writeLong(offset);
out.writeLong(length);
out.writeUTF(format);
out.writeLong(schemaId);
+ out.writeLong(rowCount);
}
public static RawFile deserialize(DataInputView in) throws IOException {
@@ -90,8 +102,9 @@ public class RawFile {
long length = in.readLong();
String format = in.readUTF();
long schemaId = in.readLong();
+ long rowCount = in.readLong();
- return new RawFile(path, offset, length, format, schemaId);
+ return new RawFile(path, offset, length, format, schemaId, rowCount);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index e83ae0bfa..fc7e41569 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -119,6 +119,9 @@ public interface ReadBuilder extends Serializable {
*/
ReadBuilder withProjection(int[][] projection);
+ /** the row number pushed down. */
+ ReadBuilder withLimit(int limit);
+
/** Create a {@link TableScan} to perform batch planning. */
TableScan newScan();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 4fb0dec37..6805f30e4 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -37,6 +37,9 @@ public class ReadBuilderImpl implements ReadBuilder {
private Predicate filter;
private int[][] projection;
+
+ private Integer limit = null;
+
private Map<String, String> partitionSpec;
public ReadBuilderImpl(InnerTable table) {
@@ -74,9 +77,20 @@ public class ReadBuilderImpl implements ReadBuilder {
return this;
}
+ @Override
+ public ReadBuilder withLimit(int limit) {
+ this.limit = limit;
+ return this;
+ }
+
@Override
public TableScan newScan() {
- return
table.newScan().withFilter(filter).withPartitionFilter(partitionSpec);
+ InnerTableScan tableScan =
+
table.newScan().withFilter(filter).withPartitionFilter(partitionSpec);
+ if (limit != null) {
+ tableScan.withLimit(limit);
+ }
+ return tableScan;
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index d36eb76ce..38e996ddc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -449,6 +449,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
.formatType()
.toString()
.toLowerCase()),
- meta.schemaId());
+ meta.schemaId(),
+ meta.rowCount());
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
index 0109125f5..8152fe2ca 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/TableScanTest.java
@@ -66,4 +66,36 @@ public class TableScanTest extends ScannerTestBase {
write.close();
commit.close();
}
+
+ @Test
+ public void testPushDownLimit() throws Exception {
+ createAppenOnlyTable();
+
+ StreamTableWrite write = table.newWrite(commitUser);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(rowData(1, 10, 100L));
+ write.write(rowData(2, 20, 200L));
+ write.write(rowData(3, 30, 300L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(rowData(4, 40, 400L));
+ write.write(rowData(5, 50, 500L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ // no limit pushed down
+ TableScan.Plan plan1 = table.newScan().plan();
+ assertThat(plan1.splits().size()).isEqualTo(5);
+
+ // with limit 1
+ TableScan.Plan plan2 = table.newScan().withLimit(1).plan();
+ assertThat(plan2.splits().size()).isEqualTo(1);
+
+ // with limit4
+ TableScan.Plan plan3 = table.newScan().withLimit(4).plan();
+ assertThat(plan3.splits().size()).isEqualTo(4);
+
+ write.close();
+ commit.close();
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
index df1d2b533..7fea1b42d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ScannerTestBase.java
@@ -48,6 +48,7 @@ import org.apache.paimon.utils.TraceableFileIO;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -83,6 +84,14 @@ public abstract class ScannerTestBase {
snapshotReader = table.newSnapshotReader();
}
+ protected void createAppenOnlyTable() throws Exception {
+ tempDir = Files.createTempDirectory("junit");
+ tablePath = new Path(TraceableFileIO.SCHEME + "://" +
tempDir.toString());
+ fileIO = FileIOFinder.find(tablePath);
+ table = createFileStoreTable(false);
+ snapshotReader = table.newSnapshotReader();
+ }
+
protected GenericRow rowData(Object... values) {
return GenericRow.of(values);
}
@@ -125,17 +134,30 @@ public abstract class ScannerTestBase {
}
protected FileStoreTable createFileStoreTable() throws Exception {
- return createFileStoreTable(new Options());
+ return createFileStoreTable(true, new Options());
}
protected FileStoreTable createFileStoreTable(Options conf) throws
Exception {
+ return createFileStoreTable(true, conf);
+ }
+
+ protected FileStoreTable createFileStoreTable(boolean withPrimaryKeys)
throws Exception {
+ return createFileStoreTable(withPrimaryKeys, new Options());
+ }
+
+ protected FileStoreTable createFileStoreTable(boolean withPrimaryKeys,
Options conf)
+ throws Exception {
SchemaManager schemaManager = new SchemaManager(fileIO, tablePath);
+ List<String> primaryKeys = new ArrayList<>();
+ if (withPrimaryKeys) {
+ primaryKeys = Arrays.asList("pt", "a");
+ }
TableSchema tableSchema =
schemaManager.createTable(
new Schema(
ROW_TYPE.getFields(),
Collections.singletonList("pt"),
- Arrays.asList("pt", "a"),
+ primaryKeys,
conf.toMap(),
""));
return FileStoreTableFactory.create(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
index a2b7faa9f..f38ce9064 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/SnapshotReaderTest.java
@@ -110,7 +110,8 @@ public class SnapshotReaderTest {
0,
meta.fileSize(),
"avro",
- meta.schemaId())));
+ meta.schemaId(),
+ meta.rowCount())));
}
// write another file on level 0
@@ -158,7 +159,8 @@ public class SnapshotReaderTest {
0,
meta.fileSize(),
meta.level() == 5 ? "orc" : "avro",
- meta.schemaId())));
+ meta.schemaId(),
+ meta.rowCount())));
}
// write another file on level 0
@@ -215,7 +217,8 @@ public class SnapshotReaderTest {
0,
meta.fileSize(),
"avro",
- meta.schemaId())));
+ meta.schemaId(),
+ meta.rowCount())));
// change file schema
@@ -248,14 +251,16 @@ public class SnapshotReaderTest {
0,
meta0.fileSize(),
"avro",
- meta0.schemaId()),
+ meta0.schemaId(),
+ meta0.rowCount()),
new RawFile(
String.format(
"%s/bucket-0/%s", tablePath,
meta1.fileName()),
0,
meta1.fileSize(),
"avro",
- meta1.schemaId())));
+ meta1.schemaId(),
+ meta1.rowCount())));
write.close();
commit.close();
diff --git a/paimon-spark/paimon-spark-3.1/pom.xml
b/paimon-spark/paimon-spark-3.1/pom.xml
index 93fac2e5e..d43ab9a15 100644
--- a/paimon-spark/paimon-spark-3.1/pom.xml
+++ b/paimon-spark/paimon-spark-3.1/pom.xml
@@ -42,6 +42,22 @@ under the License.
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
@@ -55,9 +71,89 @@ under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
+ <!-- SPARK-40511 upgrades SLF4J2, which is not compatible w/
SLF4J1 -->
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j2-impl</artifactId>
+ </exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.12</artifactId>
+ <version>${spark.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.12</artifactId>
+ <version>${spark.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j2-impl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_2.12</artifactId>
+ <version>${spark.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <version>3.1.0</version>
+ <scope>test</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
@@ -88,6 +184,51 @@ under the License.
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>${scala-maven-plugin.version}</version>
+ <executions>
+ <!-- Run scala compiler in the process-resources phase, so
that dependencies on
+ scala classes can be resolved later in the (Java)
compile phase -->
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>add-source</goal>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+
+ <!-- Run scala compiler in the process-test-resources
phase, so that dependencies on
+ scala classes can be resolved later in the (Java)
test-compile phase -->
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ <version>${spotless.version}</version>
+ <configuration>
+ <scala>
+ <scalafmt>
+ <version>3.4.3</version>
+ <!-- This file is in the root of the project to
make sure IntelliJ picks it up automatically -->
+
<file>${project.basedir}/../../.scalafmt.conf</file>
+ </scalafmt>
+ <licenseHeader>
+ <content>${spotless.license.header}</content>
+ <delimiter>${spotless.delimiter}</delimiter>
+ </licenseHeader>
+ </scala>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
similarity index 57%
copy from
paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
copy to
paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 3433aad36..2b1128989 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-spark/paimon-spark-3.1/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -15,25 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.paimon.spark
-package org.apache.paimon.table.source;
+import org.apache.paimon.table.Table
-import org.apache.paimon.metrics.MetricRegistry;
-import org.apache.paimon.predicate.Predicate;
-
-import java.util.Map;
-
-/** Inner {@link TableScan} contains filter push down. */
-public interface InnerTableScan extends TableScan {
-
- InnerTableScan withFilter(Predicate predicate);
-
- default InnerTableScan withPartitionFilter(Map<String, String>
partitionSpec) {
- return this;
- }
-
- default InnerTableScan withMetricsRegistry(MetricRegistry metricRegistry) {
- // do nothing, should implement this if need
- return this;
- }
-}
+class PaimonScanBuilder(table: Table) extends PaimonBaseScanBuilder(table)
diff --git a/paimon-spark/paimon-spark-3.2/pom.xml
b/paimon-spark/paimon-spark-3.2/pom.xml
index 3e0853677..23ec79ee5 100644
--- a/paimon-spark/paimon-spark-3.2/pom.xml
+++ b/paimon-spark/paimon-spark-3.2/pom.xml
@@ -42,6 +42,22 @@ under the License.
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
@@ -55,8 +71,95 @@ under the License.
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
+ <!-- SPARK-40511 upgrades SLF4J2, which is not compatible w/
SLF4J1 -->
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j2-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.12</artifactId>
+ <version>${spark.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.12</artifactId>
+ <version>${spark.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j2-impl</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_2.12</artifactId>
+ <version>${spark.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.orc</groupId>
+ <artifactId>orc-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <version>3.1.0</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ <version>${avro.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
@@ -81,6 +184,51 @@ under the License.
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ <version>${scala-maven-plugin.version}</version>
+ <executions>
+ <!-- Run scala compiler in the process-resources phase, so
that dependencies on
+ scala classes can be resolved later in the (Java)
compile phase -->
+ <execution>
+ <id>scala-compile-first</id>
+ <phase>process-resources</phase>
+ <goals>
+ <goal>add-source</goal>
+ <goal>compile</goal>
+ </goals>
+ </execution>
+
+ <!-- Run scala compiler in the process-test-resources
phase, so that dependencies on
+ scala classes can be resolved later in the (Java)
test-compile phase -->
+ <execution>
+ <id>scala-test-compile</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ <version>${spotless.version}</version>
+ <configuration>
+ <scala>
+ <scalafmt>
+ <version>3.4.3</version>
+ <!-- This file is in the root of the project to
make sure IntelliJ picks it up automatically -->
+
<file>${project.basedir}/../../.scalafmt.conf</file>
+ </scalafmt>
+ <licenseHeader>
+ <content>${spotless.license.header}</content>
+ <delimiter>${spotless.delimiter}</delimiter>
+ </licenseHeader>
+ </scala>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
similarity index 57%
copy from
paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
copy to
paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 3433aad36..2b1128989 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -15,25 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.paimon.spark
-package org.apache.paimon.table.source;
+import org.apache.paimon.table.Table
-import org.apache.paimon.metrics.MetricRegistry;
-import org.apache.paimon.predicate.Predicate;
-
-import java.util.Map;
-
-/** Inner {@link TableScan} contains filter push down. */
-public interface InnerTableScan extends TableScan {
-
- InnerTableScan withFilter(Predicate predicate);
-
- default InnerTableScan withPartitionFilter(Map<String, String>
partitionSpec) {
- return this;
- }
-
- default InnerTableScan withMetricsRegistry(MetricRegistry metricRegistry) {
- // do nothing, should implement this if need
- return this;
- }
-}
+class PaimonScanBuilder(table: Table) extends PaimonBaseScanBuilder(table)
diff --git
a/paimon-spark/paimon-spark-3.2/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
b/paimon-spark/paimon-spark-3.2/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
new file mode 100644
index 000000000..f632998c5
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.2/src/test/java/org/apache/paimon/spark/SparkGenericCatalogTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Base tests for spark read. */
+public class SparkGenericCatalogTest {
+
+ protected static SparkSession spark = null;
+
+ protected static Path warehousePath = null;
+
+ @BeforeAll
+ public static void startMetastoreAndSpark(@TempDir java.nio.file.Path
tempDir) {
+ warehousePath = new Path("file:" + tempDir.toString());
+ spark =
+ SparkSession.builder()
+ .config("spark.sql.warehouse.dir",
warehousePath.toString())
+ .master("local[2]")
+ .getOrCreate();
+ spark.conf().set("spark.sql.catalog.spark_catalog",
SparkGenericCatalog.class.getName());
+ }
+
+ @AfterAll
+ public static void stopMetastoreAndSpark() {
+ if (spark != null) {
+ spark.stop();
+ spark = null;
+ }
+ }
+
+ @Test
+ public void testPaimonTable() throws Exception {
+ spark.sql(
+ "CREATE TABLE PT (a INT, b INT, c STRING) USING paimon
TBLPROPERTIES"
+ + " ('file.format'='avro')");
+ writeTable(
+ "PT",
+ GenericRow.of(1, 2, BinaryString.fromString("3")),
+ GenericRow.of(4, 5, BinaryString.fromString("6")));
+ assertThat(spark.sql("SELECT * FROM
PT").collectAsList().stream().map(Object::toString))
+ .containsExactlyInAnyOrder("[1,2,3]", "[4,5,6]");
+
+ spark.sql("CREATE DATABASE my_db");
+ spark.sql(
+ "CREATE TABLE DB_PT (a INT, b INT, c STRING) USING paimon
TBLPROPERTIES"
+ + " ('file.format'='avro')");
+ writeTable(
+ "DB_PT",
+ GenericRow.of(1, 2, BinaryString.fromString("3")),
+ GenericRow.of(4, 5, BinaryString.fromString("6")));
+ assertThat(spark.sql("SELECT * FROM
DB_PT").collectAsList().stream().map(Object::toString))
+ .containsExactlyInAnyOrder("[1,2,3]", "[4,5,6]");
+
+ assertThat(spark.sql("SHOW
NAMESPACES").collectAsList().stream().map(Object::toString))
+ .containsExactlyInAnyOrder("[default]", "[my_db]");
+ }
+
+ @Test
+ public void testCsvTable() {
+ spark.sql("CREATE TABLE CT (a INT, b INT, c STRING) USING csv");
+ spark.sql("INSERT INTO CT VALUES (1, 2, '3'), (4, 5,
'6')").collectAsList();
+ List<Row> rows = spark.sql("SELECT * FROM CT").collectAsList();
+ assertThat(rows.stream().map(Object::toString))
+ .containsExactlyInAnyOrder("[1,2,3]", "[4,5,6]");
+ }
+
+ private static void writeTable(String tableName, GenericRow... rows)
throws Exception {
+ FileStoreTable fileStoreTable =
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new Path(warehousePath, String.format("default.db/%s",
tableName)));
+ BatchWriteBuilder writeBuilder = fileStoreTable.newBatchWriteBuilder();
+ BatchTableWrite writer = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit();
+ for (GenericRow row : rows) {
+ writer.write(row);
+ }
+ commit.commit(writer.prepareCommit());
+ writer.close();
+ commit.close();
+ }
+}
diff --git
a/paimon-spark/paimon-spark-3.2/src/test/resources/log4j2-test.properties
b/paimon-spark/paimon-spark-3.2/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..1b3980d15
--- /dev/null
+++ b/paimon-spark/paimon-spark-3.2/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
deleted file mode 100644
index 06652ff9f..000000000
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkScanBuilder.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark;
-
-import org.apache.paimon.predicate.PartitionPredicateVisitor;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.table.Table;
-
-import org.apache.spark.sql.connector.read.Scan;
-import org.apache.spark.sql.connector.read.ScanBuilder;
-import org.apache.spark.sql.connector.read.SupportsPushDownFilters;
-import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
-import org.apache.spark.sql.sources.Filter;
-import org.apache.spark.sql.types.StructType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/** A Spark {@link ScanBuilder} for paimon. */
-public class SparkScanBuilder
- implements ScanBuilder, SupportsPushDownFilters,
SupportsPushDownRequiredColumns {
- private static final Logger LOG =
LoggerFactory.getLogger(SparkScanBuilder.class);
- private final Table table;
-
- private List<Predicate> predicates = new ArrayList<>();
- private Filter[] pushedFilters;
- private int[] projectedFields;
-
- public SparkScanBuilder(Table table) {
- this.table = table;
- }
-
- @Override
- public Filter[] pushFilters(Filter[] filters) {
- // There are 3 kinds of filters:
- // (1) pushable filters which don't need to be evaluated again after
scanning, e.g. filter
- // partitions.
- // (2) pushable filters which still need to be evaluated after
scanning.
- // (3) non-pushable filters.
- // case 1 and 2 are considered as pushable filters and will be
returned by pushedFilters().
- // case 2 and 3 are considered as postScan filters and should be
return by this method.
- List<Filter> pushable = new ArrayList<>(filters.length);
- List<Filter> postScan = new ArrayList<>(filters.length);
- List<Predicate> predicates = new ArrayList<>(filters.length);
-
- SparkFilterConverter converter = new
SparkFilterConverter(table.rowType());
- PartitionPredicateVisitor visitor = new
PartitionPredicateVisitor(table.partitionKeys());
- for (Filter filter : filters) {
- try {
- Predicate predicate = converter.convert(filter);
- predicates.add(predicate);
- pushable.add(filter);
- if (!predicate.visit(visitor)) {
- postScan.add(filter);
- }
- } catch (UnsupportedOperationException e) {
- LOG.warn(e.getMessage());
- postScan.add(filter);
- }
- }
- this.predicates = predicates;
- this.pushedFilters = pushable.toArray(new Filter[0]);
- return postScan.toArray(new Filter[0]);
- }
-
- @Override
- public Filter[] pushedFilters() {
- return pushedFilters;
- }
-
- @Override
- public void pruneColumns(StructType requiredSchema) {
- String[] pruneFields = requiredSchema.fieldNames();
- List<String> fieldNames = table.rowType().getFieldNames();
- int[] projected = new int[pruneFields.length];
- for (int i = 0; i < projected.length; i++) {
- projected[i] = fieldNames.indexOf(pruneFields[i]);
- }
- this.projectedFields = projected;
- }
-
- @Override
- public Scan build() {
- return new SparkScan(
- table,
-
table.newReadBuilder().withFilter(predicates).withProjection(projectedFields));
- }
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
index 6b21fad0a..4a35f759b 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkTable.java
@@ -66,7 +66,7 @@ public class SparkTable
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
Table newTable = table.copy(options.asCaseSensitiveMap());
- return new SparkScanBuilder(newTable);
+ return new PaimonScanBuilder(newTable);
}
@Override
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
new file mode 100644
index 000000000..d7d4535ea
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonBaseScanBuilder.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.paimon.spark
+
+import org.apache.paimon.predicate.{PartitionPredicateVisitor, Predicate,
PredicateBuilder}
+import org.apache.paimon.table.Table
+import org.apache.paimon.table.source.ReadBuilder
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder,
SupportsPushDownFilters, SupportsPushDownRequiredColumns}
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.StructType
+
+import scala.collection.mutable
+
+abstract class PaimonBaseScanBuilder(table: Table)
+ extends ScanBuilder
+ with SupportsPushDownFilters
+ with SupportsPushDownRequiredColumns
+ with Logging {
+
+ protected var predicates: Option[Predicate] = None
+
+ protected var pushed: Option[Array[Filter]] = None
+
+ protected var projectedIndexes: Option[Array[Int]] = None
+
+ protected def getReadBuilder(): ReadBuilder = {
+ val readBuilder = table.newReadBuilder()
+ projectedIndexes.foreach(readBuilder.withProjection)
+ predicates.foreach(readBuilder.withFilter)
+
+ readBuilder
+ }
+
+ override def build(): Scan = {
+ new SparkScan(table, getReadBuilder());
+ }
+
+ /**
+ * Pushes down filters, and returns filters that need to be evaluated after
scanning. <p> Rows
+ * should be returned from the data source if and only if all of the filters
match. That is,
+ * filters must be interpreted as ANDed together.
+ */
+ override def pushFilters(filters: Array[Filter]): Array[Filter] = {
+ val pushable = mutable.ArrayBuffer.empty[Filter]
+ val postScan = mutable.ArrayBuffer.empty[Filter]
+ val predicates = mutable.ArrayBuffer.empty[Predicate]
+
+ val converter = new SparkFilterConverter(table.rowType)
+ val visitor = new PartitionPredicateVisitor(table.partitionKeys())
+ filters.foreach {
+ filter =>
+ try {
+ val predicate = converter.convert(filter)
+ pushable.append(filter)
+ predicates.append(predicate)
+ if (!predicate.visit(visitor)) postScan.append(filter)
+ } catch {
+ case e: UnsupportedOperationException =>
+ logWarning(e.getMessage)
+ postScan.append(filter)
+ }
+ }
+
+ if (predicates.nonEmpty) {
+ this.predicates = Some(PredicateBuilder.and(predicates: _*))
+ }
+ this.pushed = Some(pushable.toArray)
+ postScan.toArray
+ }
+
+ /**
+ * Returns the filters that are pushed to the data source via {@link #
pushFilters ( Filter [ ]
+ * )}. <p> There are 3 kinds of filters: <ol> <li>pushable filters which
don't need to be
+ * evaluated again after scanning.</li> <li>pushable filters which still
need to be evaluated
+ * after scanning, e.g. parquet row group filter.</li> <li>non-pushable
filters.</li> </ol> <p>
+ * Both case 1 and 2 should be considered as pushed filters and should be
returned by this method.
+ * <p> It's possible that there is no filters in the query and {@link #
pushFilters ( Filter [ ]
+ * )} is never called, empty array should be returned for this case.
+ */
+ override def pushedFilters(): Array[Filter] = {
+ pushed.getOrElse(Array.empty)
+ }
+
+ override def pruneColumns(requiredSchema: StructType): Unit = {
+ val pruneFields = requiredSchema.fieldNames
+ val fieldNames = table.rowType.getFieldNames
+ val projected = pruneFields.map(field => fieldNames.indexOf(field))
+ this.projectedIndexes = Some(projected)
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
similarity index 52%
copy from
paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
copy to
paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
index 3433aad36..55f24d93a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonScanBuilder.scala
@@ -15,25 +15,30 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.paimon.spark
-package org.apache.paimon.table.source;
+import org.apache.paimon.table.{AppendOnlyFileStoreTable, Table}
+import org.apache.paimon.table.source.ReadBuilder
-import org.apache.paimon.metrics.MetricRegistry;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.spark.sql.connector.read.{Scan, SupportsPushDownLimit}
-import java.util.Map;
+class PaimonScanBuilder(table: Table)
+ extends PaimonBaseScanBuilder(table)
+ with SupportsPushDownLimit {
-/** Inner {@link TableScan} contains filter push down. */
-public interface InnerTableScan extends TableScan {
+ private var pushDownLimit: Option[Int] = None
- InnerTableScan withFilter(Predicate predicate);
+ override protected def getReadBuilder(): ReadBuilder = {
+ val readBuilder = super.getReadBuilder()
+ pushDownLimit.foreach(readBuilder.withLimit)
+ readBuilder
+ }
- default InnerTableScan withPartitionFilter(Map<String, String>
partitionSpec) {
- return this;
- }
-
- default InnerTableScan withMetricsRegistry(MetricRegistry metricRegistry) {
- // do nothing, should implement this if need
- return this;
+ override def pushLimit(limit: Int): Boolean = {
+ if (table.isInstanceOf[AppendOnlyFileStoreTable]) {
+ pushDownLimit = Some(limit)
}
+ // just make a best effort to push down limit
+ false
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownTest.scala
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
similarity index 62%
rename from
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownTest.scala
rename to
paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
index 6d1360b95..9d3532adf 100644
---
a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/SparkPushDownTest.scala
+++
b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/PaimonPushDownTest.scala
@@ -17,11 +17,17 @@
*/
package org.apache.paimon.spark
+import org.apache.paimon.table.source.DataSplit
+
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo,
Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Filter
+import org.apache.spark.sql.connector.read.{ScanBuilder, SupportsPushDownLimit}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.junit.jupiter.api.Assertions
+import scala.collection.JavaConverters._
+
class SparkPushDownTest extends PaimonSparkTestBase {
test(s"Paimon push down: apply partition filter push down with
non-partitioned table") {
@@ -75,6 +81,59 @@ class SparkPushDownTest extends PaimonSparkTestBase {
checkAnswer(spark.sql(q), Row(1, "a", "p1") :: Row(2, "b", "p1") :: Row(3,
"c", "p2") :: Nil)
}
+ test("Paimon pushDown: limit for append-only tables") {
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING, c STRING)
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')")
+ spark.sql("INSERT INTO T VALUES (3, 'c', '11'), (4, 'd', '22')")
+
+ checkAnswer(
+ spark.sql("SELECT * FROM T ORDER BY a"),
+ Row(1, "a", "11") :: Row(2, "b", "22") :: Row(3, "c", "11") :: Row(4,
"d", "22") :: Nil)
+
+ val scanBuilder = getScanBuilder()
+ Assertions.assertTrue(scanBuilder.isInstanceOf[SupportsPushDownLimit])
+
+ val dataFilesWithoutLimit =
scanBuilder.build().toBatch.planInputPartitions().flatMap {
+ case partition: SparkInputPartition =>
+ partition.split() match {
+ case dataSplit: DataSplit => dataSplit.dataFiles().asScala
+ case _ => Seq.empty
+ }
+ }
+ Assertions.assertTrue(dataFilesWithoutLimit.length >= 2)
+
+ // It still return false even it can push down limit.
+
Assertions.assertFalse(scanBuilder.asInstanceOf[SupportsPushDownLimit].pushLimit(1))
+ val partitions = scanBuilder.build().toBatch.planInputPartitions()
+ Assertions.assertEquals(1, partitions.length)
+
+ Assertions.assertEquals(1, spark.sql("SELECT * FROM T LIMIT 1").count())
+ }
+
+ test("Paimon pushDown: limit for change-log tables") {
+ spark.sql(s"""
+ |CREATE TABLE T (a INT, b STRING, c STRING)
+ |TBLPROPERTIES ('primary-key'='a')
+ |""".stripMargin)
+
+ spark.sql("INSERT INTO T VALUES (1, 'a', '11'), (2, 'b', '22')")
+ spark.sql("INSERT INTO T VALUES (3, 'c', '11'), (4, 'd', '22')")
+
+ val scanBuilder = getScanBuilder()
+ Assertions.assertTrue(scanBuilder.isInstanceOf[SupportsPushDownLimit])
+
+ // Tables with primary keys can't support the push-down limit.
+
Assertions.assertFalse(scanBuilder.asInstanceOf[SupportsPushDownLimit].pushLimit(1))
+ }
+
+ private def getScanBuilder(tableName: String = "T"): ScanBuilder = {
+ new SparkTable(loadTable(tableName))
+ .newScanBuilder(CaseInsensitiveStringMap.empty())
+ }
+
private def checkFilterExists(sql: String): Boolean = {
spark.sql(sql).queryExecution.optimizedPlan.exists {
case Filter(_: Expression, _) => true