This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 8cd233fb7b [GLUTEN-9337][VL] Support read Paimon non-PK table (#10186)
8cd233fb7b is described below
commit 8cd233fb7bee89c421db099581ef36c29995ade6
Author: Joey <[email protected]>
AuthorDate: Sun Aug 17 15:30:44 2025 +0800
[GLUTEN-9337][VL] Support read Paimon non-PK table (#10186)
---
.github/workflows/velox_backend_arm.yml | 3 +-
.github/workflows/velox_backend_x86.yml | 23 +--
.github/workflows/velox_nightly.yml | 4 +-
backends-velox/pom.xml | 26 +++
...rg.apache.gluten.component.VeloxPaimonComponent | 0
.../gluten/component/VeloxPaimonComponent.scala | 53 ++++++
.../apache/gluten/execution/VeloxPaimonSuite.scala | 19 ++
docs/get-started/Velox.md | 14 ++
gluten-paimon/pom.xml | 133 +++++++++++++
.../substrait/rel/PaimonLocalFilesBuilder.java | 42 +++++
.../gluten/substrait/rel/PaimonLocalFilesNode.java | 48 +++++
.../gluten/execution/OffloadPaimonScan.scala | 30 +++
.../gluten/execution/PaimonScanTransformer.scala | 210 +++++++++++++++++++++
.../org/apache/gluten/execution/PaimonSuite.scala | 97 ++++++++++
package/pom.xml | 10 +
pom.xml | 74 ++++++++
16 files changed, 772 insertions(+), 14 deletions(-)
diff --git a/.github/workflows/velox_backend_arm.yml
b/.github/workflows/velox_backend_arm.yml
index e1f5b643b7..701d87fcbb 100644
--- a/.github/workflows/velox_backend_arm.yml
+++ b/.github/workflows/velox_backend_arm.yml
@@ -30,6 +30,7 @@ on:
- 'gluten-delta/**'
- 'gluten-iceberg/**'
- 'gluten-hudi/**'
+ - 'gluten-paimon/**'
- 'gluten-ut/**'
- 'shims/**'
- 'tools/gluten-it/**'
@@ -181,7 +182,7 @@ jobs:
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
export PATH=$JAVA_HOME/bin:$PATH
java -version
- $MVN_CMD test -Pspark-3.5 -Pbackends-velox -Pjava-17 -Piceberg
-Pdelta -DtagsToExclude=None \
+ $MVN_CMD test -Pspark-3.5 -Pbackends-velox -Pjava-17 -Piceberg
-Pdelta -Ppaimon -DtagsToExclude=None \
-DtagsToInclude=org.apache.gluten.tags.UDFTest
- name: Upload test report
uses: actions/upload-artifact@v4
diff --git a/.github/workflows/velox_backend_x86.yml
b/.github/workflows/velox_backend_x86.yml
index 369aab6200..f15114d933 100644
--- a/.github/workflows/velox_backend_x86.yml
+++ b/.github/workflows/velox_backend_x86.yml
@@ -30,6 +30,7 @@ on:
- 'gluten-delta/**'
- 'gluten-iceberg/**'
- 'gluten-hudi/**'
+ - 'gluten-paimon/**'
- 'gluten-ut/**'
- 'shims/**'
- 'tools/gluten-it/**'
@@ -737,7 +738,7 @@ jobs:
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
export PATH=$JAVA_HOME/bin:$PATH
java -version
- $MVN_CMD clean test -Pspark-3.3 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Phudi -Pspark-ut \
+ $MVN_CMD clean test -Pspark-3.3 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Phudi -Ppaimon -Pspark-ut \
-DargLine="-Dspark.test.home=/opt/shims/spark33/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
- name: Upload test report
@@ -786,7 +787,7 @@ jobs:
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
export PATH=$JAVA_HOME/bin:$PATH
java -version
- $MVN_CMD clean test -Pspark-3.3 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Phudi -Pspark-ut \
+ $MVN_CMD clean test -Pspark-3.3 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Phudi -Ppaimon -Pspark-ut \
-DargLine="-Dspark.test.home=/opt/shims/spark33/spark_home/" \
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
- name: Upload test report
@@ -838,7 +839,7 @@ jobs:
java -version
export SPARK_HOME=/opt/shims/spark34/spark_home/
ls -l $SPARK_HOME
- $MVN_CMD clean test -Pspark-3.4 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Phudi -Pspark-ut \
+ $MVN_CMD clean test -Pspark-3.4 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Phudi -Ppaimon -Pspark-ut \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
\
-DargLine="-Dspark.test.home=$SPARK_HOME"
- name: Upload test report
@@ -888,7 +889,7 @@ jobs:
java -version
export SPARK_HOME=/opt/shims/spark34/spark_home/
ls -l $SPARK_HOME
- $MVN_CMD clean test -Pspark-3.4 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Pspark-ut -Phudi \
+ $MVN_CMD clean test -Pspark-3.4 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Phudi -Ppaimon -Pspark-ut \
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest \
-DargLine="-Dspark.test.home=$SPARK_HOME"
- name: Upload test report
@@ -938,7 +939,7 @@ jobs:
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
export PATH=$JAVA_HOME/bin:$PATH
java -version
- $MVN_CMD clean test -Pspark-3.5 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Phudi -Pspark-ut \
+ $MVN_CMD clean test -Pspark-3.5 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Phudi -Ppaimon -Pspark-ut \
-DargLine="-Dspark.test.home=/opt/shims/spark35/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
- name: Upload test report
@@ -1040,7 +1041,7 @@ jobs:
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
export PATH=$JAVA_HOME/bin:$PATH
java -version
- $MVN_CMD clean test -Pspark-3.5 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Phudi -Pspark-ut \
+ $MVN_CMD clean test -Pspark-3.5 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Phudi -Ppaimon -Pspark-ut \
-DargLine="-Dspark.test.home=/opt/shims/spark35/spark_home/" \
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
- name: Upload test report
@@ -1094,7 +1095,7 @@ jobs:
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
export PATH=$JAVA_HOME/bin:$PATH
java -version
- $MVN_CMD clean test -Pspark-3.5 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Pspark-ut \
+ $MVN_CMD clean test -Pspark-3.5 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Ppaimon -Pspark-ut \
-DargLine="-Dspark.test.home=/opt/shims/spark35/spark_home/
-Dspark.gluten.ras.enabled=true" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
- name: Upload test report
@@ -1139,7 +1140,7 @@ jobs:
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
export PATH=$JAVA_HOME/bin:$PATH
java -version
- $MVN_CMD clean test -Pspark-3.5 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Pspark-ut \
+ $MVN_CMD clean test -Pspark-3.5 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Ppaimon -Pspark-ut \
-DargLine="-Dspark.test.home=/opt/shims/spark35/spark_home/
-Dspark.gluten.ras.enabled=true" \
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
- name: Upload test report
@@ -1192,7 +1193,7 @@ jobs:
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
export PATH=$JAVA_HOME/bin:$PATH
java -version
- $MVN_CMD clean test -Pspark-3.5 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Pspark-ut \
+ $MVN_CMD clean test -Pspark-3.5 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Ppaimon -Pspark-ut \
-DargLine="-Dspark.test.home=/opt/shims/spark35/spark_home/
-Dspark.gluten.sql.columnar.forceShuffledHashJoin=false" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
- name: Upload test report
@@ -1237,7 +1238,7 @@ jobs:
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
export PATH=$JAVA_HOME/bin:$PATH
java -version
- $MVN_CMD clean test -Pspark-3.5 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Pspark-ut \
+ $MVN_CMD clean test -Pspark-3.5 -Pjava-17 -Pbackends-velox -Piceberg
-Pdelta -Ppaimon -Pspark-ut \
-DargLine="-Dspark.test.home=/opt/shims/spark35/spark_home/
-Dspark.gluten.sql.columnar.forceShuffledHashJoin=false" \
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
- name: Upload test report
@@ -1291,7 +1292,7 @@ jobs:
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk
export PATH=$JAVA_HOME/bin:$PATH
java -version
- $MVN_CMD test -Pspark-3.5 -Pbackends-velox -Pjava-17 -Piceberg
-Pdelta -DtagsToExclude=org.apache.gluten.tags.EnhancedFeaturesTest \
+ $MVN_CMD test -Pspark-3.5 -Pbackends-velox -Pjava-17 -Piceberg
-Pdelta -Ppaimon -DtagsToExclude=org.apache.gluten.tags.EnhancedFeaturesTest \
-DtagsToInclude=org.apache.gluten.tags.UDFTest \
-DargLine="-Dspark.test.home=/opt/shims/spark35/spark_home/"
- name: Upload test report
diff --git a/.github/workflows/velox_nightly.yml
b/.github/workflows/velox_nightly.yml
index 5ea395bb03..268c7e050d 100644
--- a/.github/workflows/velox_nightly.yml
+++ b/.github/workflows/velox_nightly.yml
@@ -148,8 +148,8 @@ jobs:
cd $GITHUB_WORKSPACE/ && \
export MAVEN_HOME=/usr/lib/maven && \
export PATH=${PATH}:${MAVEN_HOME}/bin && \
- mvn clean install -Pspark-3.4 -Pjava-17 -Pbackends-velox -Pceleborn
-Puniffle -Piceberg -Phudi -Pdelta -DskipTests -Dmaven.source.skip
- mvn clean install -Pspark-3.5 -Pjava-17 -Pbackends-velox -Pceleborn
-Puniffle -Piceberg -Phudi -Pdelta -DskipTests -Dmaven.source.skip
+ mvn clean install -Pspark-3.4 -Pjava-17 -Pbackends-velox -Pceleborn
-Puniffle -Piceberg -Phudi -Pdelta -Ppaimon -DskipTests -Dmaven.source.skip
+ mvn clean install -Pspark-3.5 -Pjava-17 -Pbackends-velox -Pceleborn
-Puniffle -Piceberg -Phudi -Pdelta -Ppaimon -DskipTests -Dmaven.source.skip
- name: Upload bundle package
uses: actions/upload-artifact@v4
with:
diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml
index 9751391e16..2f56439973 100755
--- a/backends-velox/pom.xml
+++ b/backends-velox/pom.xml
@@ -231,6 +231,32 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <id>paimon</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-paimon</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-paimon</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-spark-${sparkbundle.version}</artifactId>
+ <version>${paimon.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ </profile>
</profiles>
<dependencies>
diff --git
a/backends-velox/src-paimon/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxPaimonComponent
b/backends-velox/src-paimon/main/resources/META-INF/gluten-components/org.apache.gluten.component.VeloxPaimonComponent
new file mode 100644
index 0000000000..e69de29bb2
diff --git
a/backends-velox/src-paimon/main/scala/org/apache/gluten/component/VeloxPaimonComponent.scala
b/backends-velox/src-paimon/main/scala/org/apache/gluten/component/VeloxPaimonComponent.scala
new file mode 100644
index 0000000000..7dd4f805f3
--- /dev/null
+++
b/backends-velox/src-paimon/main/scala/org/apache/gluten/component/VeloxPaimonComponent.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.component
+
+import org.apache.gluten.backendsapi.velox.VeloxBackend
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution.OffloadPaimonScan
+import org.apache.gluten.extension.columnar.enumerated.RasOffload
+import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform
+import org.apache.gluten.extension.columnar.validator.Validators
+import org.apache.gluten.extension.injector.Injector
+
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+
+class VeloxPaimonComponent extends Component {
+ override def name(): String = "velox-paimon"
+ override def buildInfo(): Component.BuildInfo =
+ Component.BuildInfo("VeloxPaimon", "N/A", "N/A", "N/A")
+ override def dependencies(): Seq[Class[_ <: Component]] =
classOf[VeloxBackend] :: Nil
+ override def injectRules(injector: Injector): Unit = {
+ injector.gluten.legacy.injectTransform {
+ c =>
+ val offload = Seq(OffloadPaimonScan())
+ HeuristicTransform.Simple(
+ Validators.newValidator(new GlutenConfig(c.sqlConf), offload),
+ offload
+ )
+ }
+
+ // Inject RAS rule.
+ injector.gluten.ras.injectRasRule {
+ c =>
+ RasOffload.Rule(
+ RasOffload.from[BatchScanExec](OffloadPaimonScan()),
+ Validators.newValidator(new GlutenConfig(c.sqlConf)),
+ Nil)
+ }
+ }
+}
diff --git
a/backends-velox/src-paimon/test/scala/org/apache/gluten/execution/VeloxPaimonSuite.scala
b/backends-velox/src-paimon/test/scala/org/apache/gluten/execution/VeloxPaimonSuite.scala
new file mode 100644
index 0000000000..3e55a93a0b
--- /dev/null
+++
b/backends-velox/src-paimon/test/scala/org/apache/gluten/execution/VeloxPaimonSuite.scala
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+class VeloxPaimonSuite extends PaimonSuite
diff --git a/docs/get-started/Velox.md b/docs/get-started/Velox.md
index 8d4ab79336..0fda15ef87 100644
--- a/docs/get-started/Velox.md
+++ b/docs/get-started/Velox.md
@@ -367,6 +367,20 @@ Once built successfully, iceberg features will be included
in gluten-velox-bundl
Gluten with velox backend supports [Hudi](https://hudi.apache.org/) table.
Currently, only reading COW (Copy-On-Write) tables is supported.
+## Paimon Support
+
+Gluten with velox backend supports [Paimon](https://paimon.apache.org/) table.
Currently, only non-pk table is supported, and the Spark version needs to be >=
3.3.
+
+### How to use
+
+Compile gluten-paimon module by a `paimon` profile, as follows:
+
+```
+mvn clean package -Pbackends-velox -Pspark-3.5 -Ppaimon -DskipTests
+```
+
+Once built successfully, paimon features will be included in
gluten-velox-bundle-X jar. Then you can query paimon non-pk table by
gluten/velox without scan's fallback.
+
### How to use
First of all, compile gluten-hudi module by a `hudi` profile, as follows:
diff --git a/gluten-paimon/pom.xml b/gluten-paimon/pom.xml
new file mode 100644
index 0000000000..c11de27424
--- /dev/null
+++ b/gluten-paimon/pom.xml
@@ -0,0 +1,133 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>gluten-parent</artifactId>
+ <groupId>org.apache.gluten</groupId>
+ <version>1.5.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>gluten-paimon</artifactId>
+ <packaging>jar</packaging>
+ <name>Gluten Paimon</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-substrait</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-spark-${sparkbundle.version}</artifactId>
+ <version>${paimon.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- For test -->
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-substrait</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-catalyst_${scala.binary.version}</artifactId>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_${scala.binary.version}</artifactId>
+ <version>${spark.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+ <plugins>
+ <plugin>
+ <groupId>net.alchim31.maven</groupId>
+ <artifactId>scala-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.scalastyle</groupId>
+ <artifactId>scalastyle-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>com.diffplug.spotless</groupId>
+ <artifactId>spotless-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest-maven-plugin</artifactId>
+ <configuration>
+ <junitxml>.</junitxml>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>prepare-test-jar</id>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-resources-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/gluten-paimon/src-paimon/main/java/org/apache/gluten/substrait/rel/PaimonLocalFilesBuilder.java
b/gluten-paimon/src-paimon/main/java/org/apache/gluten/substrait/rel/PaimonLocalFilesBuilder.java
new file mode 100644
index 0000000000..4d6c33baee
--- /dev/null
+++
b/gluten-paimon/src-paimon/main/java/org/apache/gluten/substrait/rel/PaimonLocalFilesBuilder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.substrait.rel;
+
+import java.util.List;
+import java.util.Map;
+
+public class PaimonLocalFilesBuilder {
+ public static PaimonLocalFilesNode makePaimonLocalFiles(
+ Integer index,
+ List<String> paths,
+ List<Long> starts,
+ List<Long> lengths,
+ List<Map<String, String>> partitionColumns,
+ LocalFilesNode.ReadFileFormat fileFormat,
+ List<String> preferredLocations,
+ Map<String, String> properties) {
+ return new PaimonLocalFilesNode(
+ index,
+ paths,
+ starts,
+ lengths,
+ partitionColumns,
+ fileFormat,
+ preferredLocations,
+ properties);
+ }
+}
diff --git
a/gluten-paimon/src-paimon/main/java/org/apache/gluten/substrait/rel/PaimonLocalFilesNode.java
b/gluten-paimon/src-paimon/main/java/org/apache/gluten/substrait/rel/PaimonLocalFilesNode.java
new file mode 100644
index 0000000000..30f47d87b3
--- /dev/null
+++
b/gluten-paimon/src-paimon/main/java/org/apache/gluten/substrait/rel/PaimonLocalFilesNode.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.substrait.rel;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class PaimonLocalFilesNode extends LocalFilesNode {
+
+ public PaimonLocalFilesNode(
+ Integer index,
+ List<String> paths,
+ List<Long> starts,
+ List<Long> lengths,
+ List<Map<String, String>> partitionColumns,
+ ReadFileFormat fileFormat,
+ List<String> preferredLocations,
+ Map<String, String> properties) {
+ super(
+ index,
+ paths,
+ starts,
+ lengths,
+ new ArrayList<>(),
+ new ArrayList<>(),
+ partitionColumns,
+ new ArrayList<>(),
+ fileFormat,
+ preferredLocations,
+ properties,
+ new ArrayList<>());
+ }
+}
diff --git
a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/OffloadPaimonScan.scala
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/OffloadPaimonScan.scala
new file mode 100644
index 0000000000..19c0c22433
--- /dev/null
+++
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/OffloadPaimonScan.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
+
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+
+case class OffloadPaimonScan() extends OffloadSingleNode {
+ override def offload(plan: SparkPlan): SparkPlan = plan match {
+ case scan: BatchScanExec if
PaimonScanTransformer.supportsBatchScan(scan.scan) =>
+ PaimonScanTransformer(scan)
+ case other => other
+ }
+}
diff --git
a/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
new file mode 100644
index 0000000000..31eb197091
--- /dev/null
+++
b/gluten-paimon/src-paimon/main/scala/org/apache/gluten/execution/PaimonScanTransformer.scala
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.exception.GlutenNotSupportException
+import org.apache.gluten.sql.shims.SparkShimLoader
+import org.apache.gluten.substrait.rel.{PaimonLocalFilesBuilder, SplitInfo}
+import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.softaffinity.SoftAffinity
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference,
DynamicPruningExpression, Expression, Literal}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.connector.read.{InputPartition, Scan}
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+import org.apache.paimon.CoreOptions
+import org.apache.paimon.spark.{PaimonInputPartition, PaimonScan}
+import org.apache.paimon.table.{DataTable, FileStoreTable}
+import org.apache.paimon.table.source.DataSplit
+import org.apache.paimon.utils.InternalRowPartitionComputer
+
+import java.lang.{Long => JLong}
+import java.util.{HashMap => JHashMap, Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+case class PaimonScanTransformer(
+ override val output: Seq[AttributeReference],
+ @transient override val scan: Scan,
+ override val runtimeFilters: Seq[Expression],
+ @transient override val table: Table,
+ override val keyGroupedPartitioning: Option[Seq[Expression]] = None,
+ override val commonPartitionValues: Option[Seq[(InternalRow, Int)]] = None)
+ extends BatchScanExecTransformerBase(
+ output = output,
+ scan = scan,
+ runtimeFilters = runtimeFilters,
+ table = table,
+ keyGroupedPartitioning = keyGroupedPartitioning,
+ commonPartitionValues = commonPartitionValues
+ ) {
+
+ private lazy val coreOptions: CoreOptions = scan match {
+ case scan: PaimonScan =>
+ scan.table match {
+ case dataTable: DataTable =>
+ dataTable.coreOptions()
+ case _ =>
+ throw new GlutenNotSupportException("Only support Paimon DataTable.")
+ }
+ case _ =>
+ throw new GlutenNotSupportException("Only support PaimonScan.")
+ }
+
+ override def filterExprs(): Seq[Expression] = pushdownFilters
+
+ override def getPartitionSchema: StructType = scan match {
+ case paimonScan: PaimonScan =>
+ val partitionKeys = paimonScan.table.partitionKeys()
+ StructType(scan.readSchema().filter(field =>
partitionKeys.contains(field.name)))
+ case _ =>
+ throw new GlutenNotSupportException("Only support PaimonScan.")
+ }
+
+ override def getDataSchema: StructType = new StructType()
+
+ override lazy val fileFormat: ReadFileFormat = {
+ val formatStr = coreOptions.fileFormatString()
+ if ("parquet".equalsIgnoreCase(formatStr)) {
+ ReadFileFormat.ParquetReadFormat
+ } else if ("orc".equalsIgnoreCase(formatStr)) {
+ ReadFileFormat.OrcReadFormat
+ } else {
+ ReadFileFormat.UnknownFormat
+ }
+ }
+
+ override def doValidateInternal(): ValidationResult = {
+ scan match {
+ case paimonScan: PaimonScan =>
+ paimonScan.table match {
+ case table: FileStoreTable =>
+ if (fileFormat == ReadFileFormat.UnknownFormat) {
+ return ValidationResult.failed("Only support parquet/orc Paimon
table.")
+ }
+ if (!table.primaryKeys().isEmpty ||
coreOptions.deletionVectorsEnabled()) {
+ return ValidationResult.failed("Not support Paimon PK/DV table.")
+ }
+ case table =>
+ return ValidationResult.failed(
+ s"Not support Paimon ${table.getClass.getSimpleName} table.")
+ }
+ case _ =>
+ return ValidationResult.failed("Only support PaimonScan.")
+ }
+ super.doValidateInternal()
+ }
+
+ override def doExecuteColumnar(): RDD[ColumnarBatch] = throw new
UnsupportedOperationException()
+
+ override def getSplitInfosFromPartitions(partitions: Seq[InputPartition]):
Seq[SplitInfo] = {
+ val partitionComputer =
PaimonScanTransformer.getRowDataPartitionComputer(scan)
+ getPartitions.zipWithIndex.map {
+ case (p, index) =>
+ p match {
+ case partition: PaimonInputPartition =>
+ val paths = mutable.ListBuffer.empty[String]
+ val starts = mutable.ListBuffer.empty[JLong]
+ val lengths = mutable.ListBuffer.empty[JLong]
+ val partitionColumns = mutable.ListBuffer.empty[JMap[String,
String]]
+ partition.splits.foreach {
+ split =>
+ val rawFilesOpt = split.convertToRawFiles()
+ if (rawFilesOpt.isPresent) {
+ val partitionRow = split.asInstanceOf[DataSplit].partition()
+ val partitionCols =
partitionComputer.generatePartValues(partitionRow)
+ val rawFiles = rawFilesOpt.get().asScala
+ paths ++= rawFiles.map(_.path())
+ starts ++= rawFiles.map(file => JLong.valueOf(file.offset()))
+ lengths ++= rawFiles.map(file =>
JLong.valueOf(file.length()))
+ partitionColumns ++=
mutable.ArrayBuffer.fill(rawFiles.size)(partitionCols)
+ } else {
+ throw new GlutenNotSupportException(
+ "Cannot get raw files from paimon SparkInputPartition.")
+ }
+ }
+ val preferredLoc =
+ SoftAffinity.getFilePartitionLocations(paths.toArray,
partition.preferredLocations())
+ PaimonLocalFilesBuilder.makePaimonLocalFiles(
+ index,
+ paths.asJava,
+ starts.asJava,
+ lengths.asJava,
+ partitionColumns.asJava,
+ fileFormat,
+ preferredLoc.toList.asJava,
+ new JHashMap[String, String]()
+ )
+ case _ =>
+ throw new GlutenNotSupportException("Only support paimon
SparkInputPartition.")
+ }
+ }
+ }
+
+ override def doCanonicalize(): PaimonScanTransformer = {
+ this.copy(
+ output = output.map(QueryPlan.normalizeExpressions(_, output)),
+ runtimeFilters = QueryPlan.normalizePredicates(
+ runtimeFilters.filterNot(_ ==
DynamicPruningExpression(Literal.TrueLiteral)),
+ output)
+ )
+ }
+
+ override protected[this] def supportsBatchScan(scan: Scan): Boolean =
+ PaimonScanTransformer.supportsBatchScan(scan)
+}
+
+object PaimonScanTransformer {
+ def apply(batchScan: BatchScanExec): PaimonScanTransformer = {
+ new PaimonScanTransformer(
+ batchScan.output,
+ batchScan.scan,
+ batchScan.runtimeFilters,
+ table = SparkShimLoader.getSparkShims.getBatchScanExecTable(batchScan),
+ keyGroupedPartitioning =
SparkShimLoader.getSparkShims.getKeyGroupedPartitioning(batchScan),
+ commonPartitionValues =
SparkShimLoader.getSparkShims.getCommonPartitionValues(batchScan)
+ )
+ }
+
+ private def getRowDataPartitionComputer(scan: Scan):
InternalRowPartitionComputer = {
+ scan match {
+ case paimonScan: PaimonScan =>
+ val table = paimonScan.table.asInstanceOf[FileStoreTable]
+ // use __HIVE_DEFAULT_PARTITION__ because velox using this
+ new InternalRowPartitionComputer(
+ ExternalCatalogUtils.DEFAULT_PARTITION_NAME,
+ table.schema().logicalPartitionType(),
+ table.partitionKeys.asScala.toArray,
+ false
+ )
+ case _ =>
+ throw new GlutenNotSupportException("Only support PaimonScan.")
+ }
+ }
+
+ def supportsBatchScan(scan: Scan): Boolean = {
+ scan.getClass.getName == "org.apache.paimon.spark.PaimonScan"
+ }
+}
diff --git
a/gluten-paimon/src-paimon/test/scala/org/apache/gluten/execution/PaimonSuite.scala
b/gluten-paimon/src-paimon/test/scala/org/apache/gluten/execution/PaimonSuite.scala
new file mode 100644
index 0000000000..91493283a0
--- /dev/null
+++
b/gluten-paimon/src-paimon/test/scala/org/apache/gluten/execution/PaimonSuite.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+
+abstract class PaimonSuite extends WholeStageTransformerSuite {
+ protected val rootPath: String = getClass.getResource("/").getPath
+ override protected val resourcePath: String = "/tpch-data-parquet"
+ override protected val fileFormat: String = "parquet"
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.sql.files.maxPartitionBytes", "1g")
+ .set("spark.sql.shuffle.partitions", "1")
+ .set("spark.memory.offHeap.size", "2g")
+ .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+ .set("spark.sql.autoBroadcastJoinThreshold", "-1")
+ .set(
+ "spark.sql.extensions",
+ "org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions")
+ .set("spark.sql.catalog.spark_catalog",
"org.apache.paimon.spark.SparkCatalog")
+ .set("spark.sql.catalog.spark_catalog.warehouse",
s"file://$rootPath/data-paimon")
+ }
+
+ test("paimon transformer exists") {
+ Seq("parquet", "orc").foreach {
+ format =>
+ withTable(s"paimon_${format}_tbl") {
+ sql(s"DROP TABLE IF EXISTS paimon_${format}_tbl")
+ sql(
+ s"CREATE TABLE paimon_${format}_tbl (id INT, name STRING) USING
PAIMON " +
+ s"TBLPROPERTIES ('file.format'='$format')")
+ sql(s"INSERT INTO paimon_${format}_tbl VALUES (1, 'Bob'), (2,
'Blue'), (3, 'Mike')")
+ runQueryAndCompare(s"""
+ |SELECT * FROM paimon_${format}_tbl;
+ |""".stripMargin) {
+ checkGlutenOperatorMatch[PaimonScanTransformer]
+ }
+ }
+ }
+ }
+
+ test("paimon partitioned table") {
+ withTable("paimon_tbl") {
+ sql("DROP TABLE IF EXISTS paimon_tbl")
+ sql(
+ "CREATE TABLE paimon_tbl (id INT, p1 STRING, p2 STRING) USING paimon
PARTITIONED BY (p1, p2)")
+ sql("INSERT INTO paimon_tbl VALUES (1, '1', '1'), (2, '1', '2')")
+ runQueryAndCompare("""
+ |SELECT p1 FROM paimon_tbl WHERE p1 = '1' ORDER BY
id;
+ |""".stripMargin) {
+ checkGlutenOperatorMatch[PaimonScanTransformer]
+ }
+ runQueryAndCompare("""
+ |SELECT p2 FROM paimon_tbl WHERE p1 = '1' ORDER BY
id;
+ |""".stripMargin) {
+ checkGlutenOperatorMatch[PaimonScanTransformer]
+ }
+ runQueryAndCompare("""
+ |SELECT p1 FROM paimon_tbl WHERE p2 = '1';
+ |""".stripMargin) {
+ checkGlutenOperatorMatch[PaimonScanTransformer]
+ }
+ runQueryAndCompare("""
+ |SELECT p2 FROM paimon_tbl WHERE p2 = '1';
+ |""".stripMargin) {
+ checkGlutenOperatorMatch[PaimonScanTransformer]
+ }
+ runQueryAndCompare("""
+ |SELECT id, p2 FROM paimon_tbl WHERE p1 = '1' and
p2 = '2';
+ |""".stripMargin) {
+ checkGlutenOperatorMatch[PaimonScanTransformer]
+ }
+ runQueryAndCompare("""
+ |SELECT id FROM paimon_tbl ORDER BY id;
+ |""".stripMargin) {
+ checkGlutenOperatorMatch[PaimonScanTransformer]
+ }
+ }
+ }
+}
diff --git a/package/pom.xml b/package/pom.xml
index c46aa8dd9b..6c1c2e3981 100644
--- a/package/pom.xml
+++ b/package/pom.xml
@@ -113,6 +113,16 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <id>paimon</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-paimon</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ </profile>
</profiles>
<build>
diff --git a/pom.xml b/pom.xml
index b4334842dc..c93bad4a42 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,6 +76,7 @@
<spark.version>3.4.4</spark.version>
<sparkshim.artifactId>spark-sql-columnar-shims-spark34</sparkshim.artifactId>
<iceberg.version>1.5.0</iceberg.version>
+ <paimon.version>1.2.0</paimon.version>
<delta.package.name>delta-core</delta.package.name>
<delta.version>2.4.0</delta.version>
<delta.binary.version>24</delta.binary.version>
@@ -1008,6 +1009,79 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>paimon</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <modules>
+ <module>gluten-paimon</module>
+ </modules>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-paimon-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.basedir}/src-paimon/main/scala</source>
+ <source>${project.basedir}/src-paimon/main/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-paimon-resources</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>add-resource</goal>
+ </goals>
+ <configuration>
+ <resources>
+ <resource>
+
<directory>${project.basedir}/src-paimon/main/resources</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-paimon-test-sources</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.basedir}/src-paimon/test/scala</source>
+ <source>${project.basedir}/src-paimon/test/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-paimon-test-resources</id>
+ <phase>generate-test-resources</phase>
+ <goals>
+ <goal>add-test-resource</goal>
+ </goals>
+ <configuration>
+ <resources>
+ <resource>
+
<directory>${project.basedir}/src-paimon/test/resources</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
<profile>
<id>backends-velox</id>
<activation>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]