This is an automated email from the ASF dual-hosted git repository.

zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new ea0bcd5b70 [GLUTEN-8094][CH][Part-1] Support reading data from the 
iceberg with CH backend (#8095)
ea0bcd5b70 is described below

commit ea0bcd5b7027f86e7625f9b77f1c5542697fb2a0
Author: Zhichao Zhang <[email protected]>
AuthorDate: Fri Nov 29 16:38:05 2024 +0800

    [GLUTEN-8094][CH][Part-1] Support reading data from the iceberg with CH 
backend (#8095)
    
    * [GLUTEN-8094][CH][Part-1] Support reading data from the iceberg with CH 
backend
    
    Support reading data from the iceberg with CH backend
    
    - basic iceberg scan transformer
    - read from the iceberg with the copy-on-write mode
---
 backends-clickhouse/pom.xml                        |  64 +++
 .../backendsapi/clickhouse/CHTransformerApi.scala  |   5 +
 .../execution/iceberg/ClickHouseIcebergSuite.scala | 640 +++++++++++++++++++++
 ...ckHouseTPCHColumnarShuffleParquetAQESuite.scala |  26 +-
 .../Storages/SubstraitSource/FormatFile.cpp        |   4 +-
 .../spark/source/GlutenIcebergSourceUtil.scala     |   8 +-
 .../apache/gluten/backendsapi/TransformerApi.scala |   3 +
 7 files changed, 744 insertions(+), 6 deletions(-)

diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml
index c616266bf2..3a4a942247 100644
--- a/backends-clickhouse/pom.xml
+++ b/backends-clickhouse/pom.xml
@@ -13,6 +13,70 @@
   <packaging>jar</packaging>
   <name>Gluten Backends ClickHouse</name>
 
+  <profiles>
+    <profile>
+      <id>iceberg</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.gluten</groupId>
+          <artifactId>gluten-iceberg</artifactId>
+          <version>${project.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.iceberg</groupId>
+          
<artifactId>iceberg-spark-${sparkbundle.version}_${scala.binary.version}</artifactId>
+          <version>${iceberg.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.iceberg</groupId>
+          
<artifactId>iceberg-spark-runtime-${sparkbundle.version}_${scala.binary.version}</artifactId>
+          <version>${iceberg.version}</version>
+          <scope>test</scope>
+        </dependency>
+      </dependencies>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>add-iceberg-sources</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>${project.basedir}/src/main-iceberg/scala</source>
+                    <source>${project.basedir}/src/main-iceberg/java</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-iceberg-test-sources</id>
+                <phase>generate-test-sources</phase>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>${project.basedir}/src/test-iceberg/scala</source>
+                    <source>${project.basedir}/src/test-iceberg/java</source>
+                  </sources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+  </profiles>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.gluten</groupId>
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
index 438c6cd3df..0be8cf2c25 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
@@ -42,6 +42,7 @@ import org.apache.spark.util.collection.BitSet
 
 import com.google.common.collect.Lists
 import com.google.protobuf.{Any, Message}
+import org.apache.hadoop.fs.Path
 
 import java.util
 
@@ -269,4 +270,8 @@ class CHTransformerApi extends TransformerApi with Logging {
     }
     packPBMessage(write.build())
   }
+
+  /** use Hadoop Path class to encode the file path */
+  override def encodeFilePathIfNeed(filePath: String): String =
+    (new Path(filePath)).toUri.toASCIIString
 }
diff --git 
a/backends-clickhouse/src/test-iceberg/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
 
b/backends-clickhouse/src/test-iceberg/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
new file mode 100644
index 0000000000..676061235e
--- /dev/null
+++ 
b/backends-clickhouse/src/test-iceberg/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
@@ -0,0 +1,640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.iceberg
+
+import 
org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite, 
IcebergScanTransformer}
+import org.apache.gluten.GlutenConfig
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.Row
+
+class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite {
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+      .set("spark.sql.adaptive.enabled", "true")
+      .set("spark.sql.shuffle.partitions", "2")
+      .set("spark.memory.offHeap.size", "2g")
+      .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+      .set("spark.sql.autoBroadcastJoinThreshold", "-1")
+      .set(
+        "spark.sql.extensions",
+        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+      .set("spark.sql.catalog.spark_catalog", 
"org.apache.iceberg.spark.SparkCatalog")
+      .set("spark.sql.catalog.spark_catalog.type", "hadoop")
+      .set("spark.sql.catalog.spark_catalog.warehouse", 
s"file://$basePath/tpch-data-iceberg")
+  }
+
+  test("iceberg transformer exists") {
+    withTable("iceberg_tb") {
+      spark.sql(
+        """
+          |create table iceberg_tb using iceberg as
+          |(select 1 as col1, 2 as col2, 3 as col3)
+          |""".stripMargin)
+
+      runQueryAndCompare(
+        """
+          |select * from iceberg_tb;
+          |""".stripMargin) {
+        checkGlutenOperatorMatch[IcebergScanTransformer]
+      }
+    }
+  }
+
+  testWithSpecifiedSparkVersion(
+    "iceberg bucketed join", Array("3.3", "3.5")) {
+    val leftTable = "p_str_tb"
+    val rightTable = "p_int_tb"
+    withTable(leftTable, rightTable) {
+      // Partition key of string type.
+      withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
+        // Gluten does not support write iceberg table.
+        spark.sql(
+          s"""
+             |create table $leftTable(id int, name string, p string)
+             |using iceberg
+             |partitioned by (bucket(4, id));
+             |""".stripMargin)
+        spark.sql(
+          s"""
+             |insert into table $leftTable values
+             |(4, 'a5', 'p4'),
+             |(1, 'a1', 'p1'),
+             |(2, 'a3', 'p2'),
+             |(1, 'a2', 'p1'),
+             |(3, 'a4', 'p3');
+             |""".stripMargin
+        )
+      }
+
+      // Partition key of integer type.
+      withSQLConf(
+        GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
+      ) {
+        // Gluten does not support write iceberg table.
+        spark.sql(
+          s"""
+             |create table $rightTable(id int, name string, p int)
+             |using iceberg
+             |partitioned by (bucket(4, id));
+             |""".stripMargin)
+        spark.sql(
+          s"""
+             |insert into table $rightTable values
+             |(3, 'b4', 23),
+             |(1, 'b2', 21),
+             |(4, 'b5', 24),
+             |(2, 'b3', 22),
+             |(1, 'b1', 21);
+             |""".stripMargin
+        )
+      }
+
+      withSQLConf(
+        "spark.sql.sources.v2.bucketing.enabled" -> "true",
+        "spark.sql.requireAllClusterKeysForCoPartition" -> "false",
+        "spark.sql.adaptive.enabled" -> "false",
+        "spark.sql.iceberg.planning.preserve-data-grouping" -> "true",
+        "spark.sql.autoBroadcastJoinThreshold" -> "-1",
+        "spark.sql.sources.v2.bucketing.pushPartValues.enabled" -> "true"
+      ) {
+        runQueryAndCompare(
+          s"""
+             |select s.id, s.name, i.name, i.p
+             | from $leftTable s inner join $rightTable i
+             | on s.id = i.id;
+             |""".stripMargin) {
+          df => {
+            assert(
+              getExecutedPlan(df).count(
+                plan => {
+                  plan.isInstanceOf[IcebergScanTransformer]
+                }) == 2)
+            getExecutedPlan(df).map {
+              case plan if plan.isInstanceOf[IcebergScanTransformer] =>
+                assert(
+                  
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
+                
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
+              case _ => // do nothing
+            }
+            checkLengthAndPlan(df, 7)
+          }
+        }
+      }
+    }
+  }
+
+  testWithSpecifiedSparkVersion(
+    "iceberg bucketed join with partition", Array("3.3", "3.5")) {
+    val leftTable = "p_str_tb"
+    val rightTable = "p_int_tb"
+    withTable(leftTable, rightTable) {
+      // Partition key of string type.
+      withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
+        // Gluten does not support write iceberg table.
+        spark.sql(
+          s"""
+             |create table $leftTable(id int, name string, p int)
+             |using iceberg
+             |partitioned by (bucket(4, id), p);
+             |""".stripMargin)
+        spark.sql(
+          s"""
+             |insert into table $leftTable values
+             |(4, 'a5', 2),
+             |(1, 'a1', 1),
+             |(2, 'a3', 1),
+             |(1, 'a2', 1),
+             |(3, 'a4', 2);
+             |""".stripMargin
+        )
+      }
+
+      // Partition key of integer type.
+      withSQLConf(
+        GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
+      ) {
+        // Gluten does not support write iceberg table.
+        spark.sql(
+          s"""
+             |create table $rightTable(id int, name string, p int)
+             |using iceberg
+             |partitioned by (bucket(4, id), p);
+             |""".stripMargin)
+        spark.sql(
+          s"""
+             |insert into table $rightTable values
+             |(3, 'b4', 2),
+             |(1, 'b2', 1),
+             |(4, 'b5', 2),
+             |(2, 'b3', 1),
+             |(1, 'b1', 1);
+             |""".stripMargin
+        )
+      }
+
+      withSQLConf(
+        "spark.sql.sources.v2.bucketing.enabled" -> "true",
+        "spark.sql.requireAllClusterKeysForCoPartition" -> "false",
+        "spark.sql.adaptive.enabled" -> "false",
+        "spark.sql.iceberg.planning.preserve-data-grouping" -> "true",
+        "spark.sql.autoBroadcastJoinThreshold" -> "-1",
+        "spark.sql.sources.v2.bucketing.pushPartValues.enabled" -> "true"
+      ) {
+        runQueryAndCompare(
+          s"""
+             |select s.id, s.name, i.name, i.p
+             | from $leftTable s inner join $rightTable i
+             | on s.id = i.id and s.p = i.p;
+             |""".stripMargin) {
+          df => {
+            assert(
+              getExecutedPlan(df).count(
+                plan => {
+                  plan.isInstanceOf[IcebergScanTransformer]
+                }) == 2)
+            getExecutedPlan(df).map {
+              case plan if plan.isInstanceOf[IcebergScanTransformer] =>
+                assert(
+                  
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
+                
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
+              case _ => // do nothing
+            }
+            checkLengthAndPlan(df, 7)
+          }
+        }
+      }
+    }
+  }
+
+  testWithSpecifiedSparkVersion(
+    "iceberg bucketed join with partition filter", Array("3.3", "3.5")) {
+    val leftTable = "p_str_tb"
+    val rightTable = "p_int_tb"
+    withTable(leftTable, rightTable) {
+      // Partition key of string type.
+      withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
+        // Gluten does not support write iceberg table.
+        spark.sql(
+          s"""
+             |create table $leftTable(id int, name string, p int)
+             |using iceberg
+             |partitioned by (bucket(4, id), p);
+             |""".stripMargin)
+        spark.sql(
+          s"""
+             |insert into table $leftTable values
+             |(4, 'a5', 2),
+             |(1, 'a1', 1),
+             |(2, 'a3', 1),
+             |(1, 'a2', 1),
+             |(3, 'a4', 2);
+             |""".stripMargin
+        )
+      }
+
+      // Partition key of integer type.
+      withSQLConf(
+        GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
+      ) {
+        // Gluten does not support write iceberg table.
+        spark.sql(
+          s"""
+             |create table $rightTable(id int, name string, p int)
+             |using iceberg
+             |partitioned by (bucket(4, id), p);
+             |""".stripMargin)
+        spark.sql(
+          s"""
+             |insert into table $rightTable values
+             |(3, 'b4', 2),
+             |(1, 'b2', 1),
+             |(4, 'b5', 2),
+             |(2, 'b3', 1),
+             |(1, 'b1', 1);
+             |""".stripMargin
+        )
+      }
+
+      withSQLConf(
+        "spark.sql.sources.v2.bucketing.enabled" -> "true",
+        "spark.sql.requireAllClusterKeysForCoPartition" -> "false",
+        "spark.sql.adaptive.enabled" -> "false",
+        "spark.sql.iceberg.planning.preserve-data-grouping" -> "true",
+        "spark.sql.autoBroadcastJoinThreshold" -> "-1",
+        "spark.sql.sources.v2.bucketing.pushPartValues.enabled" -> "true"
+      ) {
+        runQueryAndCompare(
+          s"""
+             |select s.id, s.name, i.name, i.p
+             | from $leftTable s inner join $rightTable i
+             | on s.id = i.id
+             | where s.p = 1 and i.p = 1;
+             |""".stripMargin) {
+          df => {
+            assert(
+              getExecutedPlan(df).count(
+                plan => {
+                  plan.isInstanceOf[IcebergScanTransformer]
+                }) == 2)
+            getExecutedPlan(df).map {
+              case plan if plan.isInstanceOf[IcebergScanTransformer] =>
+                assert(
+                  
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
+                
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1)
+              case _ => // do nothing
+            }
+            checkLengthAndPlan(df, 5)
+          }
+        }
+      }
+    }
+  }
+
+  testWithSpecifiedSparkVersion("iceberg: time travel") {
+    withTable("iceberg_tm") {
+      spark.sql(
+        s"""
+           |create table iceberg_tm (id int, name string) using iceberg
+           |""".stripMargin)
+      spark.sql(
+        s"""
+           |insert into iceberg_tm values (1, "v1"), (2, "v2")
+           |""".stripMargin)
+      spark.sql(
+        s"""
+           |insert into iceberg_tm values (3, "v3"), (4, "v4")
+           |""".stripMargin)
+
+      val df =
+        spark.sql("select snapshot_id from default.iceberg_tm.snapshots where 
parent_id is null")
+      val value = df.collectAsList().get(0).getAs[Long](0);
+      spark.sql(s"call 
system.set_current_snapshot('default.iceberg_tm',$value)");
+      val data = runQueryAndCompare("select * from iceberg_tm") { _ => }
+      checkLengthAndPlan(data, 2)
+      checkAnswer(data, Row(1, "v1") :: Row(2, "v2") :: Nil)
+    }
+  }
+
+  test("iceberg: partition filters") {
+    withTable("iceberg_pf") {
+      spark.sql(
+        s"""
+           |create table iceberg_pf (id int, name string)
+           | using iceberg partitioned by (name)
+           |""".stripMargin)
+      spark.sql(
+        s"""
+           |insert into iceberg_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, 
"v2")
+           |""".stripMargin)
+      val df1 = runQueryAndCompare("select * from iceberg_pf where name = 
'v1'") { _ => }
+      checkLengthAndPlan(df1, 2)
+      checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil)
+    }
+  }
+
+  test("iceberg read mor table - delete and update") {
+    withTable("iceberg_mor_tb") {
+      withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
+        spark.sql(
+          """
+            |create table iceberg_mor_tb (
+            |  id int,
+            |  name string,
+            |  p string
+            |) using iceberg
+            |tblproperties (
+            |  'format-version' = '2'
+            |)
+            |partitioned by (p);
+            |""".stripMargin)
+
+        // Insert some test rows.
+        spark.sql(
+          """
+            |insert into table iceberg_mor_tb
+            |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
+            |       (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
+            |""".stripMargin)
+
+        // Delete row.
+        spark.sql(
+          """
+            |delete from iceberg_mor_tb where name = 'a1';
+            |""".stripMargin
+        )
+        // Update row.
+        spark.sql(
+          """
+            |update iceberg_mor_tb set name = 'new_a2' where id = 'a2';
+            |""".stripMargin
+        )
+        // Delete row again.
+        spark.sql(
+          """
+            |delete from iceberg_mor_tb where id = 6;
+            |""".stripMargin
+        )
+      }
+      runQueryAndCompare(
+        """
+          |select * from iceberg_mor_tb;
+          |""".stripMargin) {
+        checkGlutenOperatorMatch[IcebergScanTransformer]
+      }
+    }
+  }
+
+  // TODO: support merge-on-read mode
+  ignore("iceberg read mor table - delete and update with merge-on-read mode") 
{
+    withTable("iceberg_mor_tb") {
+      withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
+        spark.sql(
+          """
+            |create table iceberg_mor_tb (
+            |  id int,
+            |  name string,
+            |  p string
+            |) using iceberg
+            |tblproperties (
+            |  'format-version' = '2',
+            |  'write.delete.mode' = 'merge-on-read',
+            |  'write.update.mode' = 'merge-on-read',
+            |  'write.merge.mode' = 'merge-on-read'
+            |)
+            |partitioned by (p);
+            |""".stripMargin)
+
+        // Insert some test rows.
+        spark.sql(
+          """
+            |insert into table iceberg_mor_tb
+            |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
+            |       (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
+            |""".stripMargin)
+
+        // Delete row.
+        spark.sql(
+          """
+            |delete from iceberg_mor_tb where name = 'a1';
+            |""".stripMargin
+        )
+        // Update row.
+        spark.sql(
+          """
+            |update iceberg_mor_tb set name = 'new_a2' where id = 'a2';
+            |""".stripMargin
+        )
+        // Delete row again.
+        spark.sql(
+          """
+            |delete from iceberg_mor_tb where id = 6;
+            |""".stripMargin
+        )
+      }
+      runQueryAndCompare(
+        """
+          |select * from iceberg_mor_tb;
+          |""".stripMargin) {
+        checkGlutenOperatorMatch[IcebergScanTransformer]
+      }
+    }
+  }
+
+  test("iceberg read mor table - merge into") {
+    withTable("iceberg_mor_tb", "merge_into_source_tb") {
+      withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
+        spark.sql(
+          """
+            |create table iceberg_mor_tb (
+            |  id int,
+            |  name string,
+            |  p string
+            |) using iceberg
+            |tblproperties (
+            |  'format-version' = '2'
+            |)
+            |partitioned by (p);
+            |""".stripMargin)
+        spark.sql(
+          """
+            |create table merge_into_source_tb (
+            |  id int,
+            |  name string,
+            |  p string
+            |) using iceberg;
+            |""".stripMargin)
+
+        // Insert some test rows.
+        spark.sql(
+          """
+            |insert into table iceberg_mor_tb
+            |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
+            |""".stripMargin)
+        spark.sql(
+          """
+            |insert into table merge_into_source_tb
+            |values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1', 'p1'),
+            |       (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2');
+            |""".stripMargin)
+
+        // Delete row.
+        spark.sql(
+          """
+            |delete from iceberg_mor_tb where name = 'a1';
+            |""".stripMargin
+        )
+        // Update row.
+        spark.sql(
+          """
+            |update iceberg_mor_tb set name = 'new_a2' where id = 'a2';
+            |""".stripMargin
+        )
+
+        // Merge into.
+        spark.sql(
+          """
+            |merge into iceberg_mor_tb t
+            |using (select * from merge_into_source_tb) s
+            |on t.id = s.id
+            |when matched then
+            | update set t.name = s.name, t.p = s.p
+            |when not matched then
+            | insert (id, name, p) values (s.id, s.name, s.p);
+            |""".stripMargin
+        )
+      }
+      runQueryAndCompare(
+        """
+          |select * from iceberg_mor_tb;
+          |""".stripMargin) {
+        checkGlutenOperatorMatch[IcebergScanTransformer]
+      }
+    }
+  }
+
+  // TODO: support merge-on-read mode
+  ignore("iceberg read mor table - merge into with merge-on-read mode") {
+    withTable("iceberg_mor_tb", "merge_into_source_tb") {
+      withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
+        spark.sql(
+          """
+            |create table iceberg_mor_tb (
+            |  id int,
+            |  name string,
+            |  p string
+            |) using iceberg
+            |tblproperties (
+            |  'format-version' = '2',
+            |  'write.delete.mode' = 'merge-on-read',
+            |  'write.update.mode' = 'merge-on-read',
+            |  'write.merge.mode' = 'merge-on-read'
+            |)
+            |partitioned by (p);
+            |""".stripMargin)
+        spark.sql(
+          """
+            |create table merge_into_source_tb (
+            |  id int,
+            |  name string,
+            |  p string
+            |) using iceberg;
+            |""".stripMargin)
+
+        // Insert some test rows.
+        spark.sql(
+          """
+            |insert into table iceberg_mor_tb
+            |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
+            |""".stripMargin)
+        spark.sql(
+          """
+            |insert into table merge_into_source_tb
+            |values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1', 'p1'),
+            |       (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2');
+            |""".stripMargin)
+
+        // Delete row.
+        spark.sql(
+          """
+            |delete from iceberg_mor_tb where name = 'a1';
+            |""".stripMargin
+        )
+        // Update row.
+        spark.sql(
+          """
+            |update iceberg_mor_tb set name = 'new_a2' where id = 'a2';
+            |""".stripMargin
+        )
+
+        // Merge into.
+        spark.sql(
+          """
+            |merge into iceberg_mor_tb t
+            |using (select * from merge_into_source_tb) s
+            |on t.id = s.id
+            |when matched then
+            | update set t.name = s.name, t.p = s.p
+            |when not matched then
+            | insert (id, name, p) values (s.id, s.name, s.p);
+            |""".stripMargin
+        )
+      }
+      runQueryAndCompare(
+        """
+          |select * from iceberg_mor_tb;
+          |""".stripMargin) {
+        checkGlutenOperatorMatch[IcebergScanTransformer]
+      }
+    }
+  }
+
+  // Spark configuration spark.sql.iceberg.handle-timestamp-without-timezone 
is not supported
+  // in Spark 3.4
+  // TODO: there is a bug when using timestamp type as the partition column
+  testWithSpecifiedSparkVersion("iceberg partition type - timestamp", 
Array("")) {
+    Seq("true", "false").foreach {
+      flag =>
+        withSQLConf(
+          "spark.sql.iceberg.handle-timestamp-without-timezone" -> flag,
+          "spark.sql.iceberg.use-timestamp-without-timezone-in-new-tables" -> 
flag) {
+          withTable("part_by_timestamp") {
+            spark.sql(
+              """
+                |create table part_by_timestamp (
+                |  p timestamp
+                |) using iceberg
+                |tblproperties (
+                |  'format-version' = '1'
+                |)
+                |partitioned by (p);
+                |""".stripMargin)
+
+            // Insert some test rows.
+            spark.sql(
+              """
+                |insert into table part_by_timestamp
+                |values (TIMESTAMP '2022-01-01 00:01:20');
+                |""".stripMargin)
+            val df = spark.sql("select * from part_by_timestamp")
+            checkAnswer(df, Row(java.sql.Timestamp.valueOf("2022-01-01 
00:01:20")) :: Nil)
+          }
+        }
+    }
+  }
+}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
index 1c3e4de4c6..ad9cb854d9 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.execution.tpch
 
 import org.apache.gluten.GlutenConfig
 import org.apache.gluten.execution._
-import org.apache.gluten.execution.GlutenPlan
 import org.apache.gluten.utils.Arm
 
 import org.apache.spark.SparkConf
@@ -116,6 +115,31 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
     }
   }
 
+  // TODO: there is a bug when using timestamp type as the partition column
+  ignore("test timestamp as partition column") {
+    spark.sql("""
+                |create table part_by_timestamp (
+                |  a int,
+                |  b timestamp,
+                |  c string,
+                |  p timestamp
+                |) using parquet
+                |partitioned by (p);
+                |""".stripMargin)
+
+    // Insert some test rows.
+    spark.sql("""
+                |insert into table part_by_timestamp
+                |values
+                |(1, TIMESTAMP '2022-01-01 00:01:20', '2022-01-01 00:01:20',
+                |TIMESTAMP '2022-01-01 00:01:20');
+                |""".stripMargin)
+    compareResultsAgainstVanillaSpark(
+      "select a, b, to_timestamp(c), p from part_by_timestamp",
+      compareResult = true,
+      customCheck = { _ => })
+  }
+
   test("TPCH Q2") {
     runTPCHQuery(2) {
       df =>
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp
index 3c68c70e66..ee54a8ff5e 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp
@@ -87,7 +87,7 @@ FormatFilePtr FormatFileUtil::createFile(
     DB::ContextPtr context, ReadBufferBuilderPtr read_buffer_builder, const 
substrait::ReadRel::LocalFiles::FileOrFiles & file)
 {
 #if USE_PARQUET
-    if (file.has_parquet())
+    if (file.has_parquet() || (file.has_iceberg() && 
file.iceberg().has_parquet()))
     {
         auto config = ExecutorConfig::loadFromContext(context);
         return std::make_shared<ParquetFormatFile>(context, file, 
read_buffer_builder, config.use_local_format);
@@ -95,7 +95,7 @@ FormatFilePtr FormatFileUtil::createFile(
 #endif
 
 #if USE_ORC
-    if (file.has_orc())
+    if (file.has_orc() || (file.has_iceberg() && file.iceberg().has_orc()))
         return std::make_shared<ORCFormatFile>(context, file, 
read_buffer_builder);
 #endif
 
diff --git 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
index ad8222cff5..a7451355b0 100644
--- 
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
+++ 
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.iceberg.spark.source
 
+import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.exception.GlutenNotSupportException
 import org.apache.gluten.substrait.rel.{IcebergLocalFilesBuilder, SplitInfo}
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
@@ -25,7 +26,7 @@ import 
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
 import org.apache.spark.sql.connector.read.{InputPartition, Scan}
 import org.apache.spark.sql.types.StructType
 
-import org.apache.iceberg.{CombinedScanTask, DeleteFile, FileFormat, 
FileScanTask, ScanTask, Schema}
+import org.apache.iceberg._
 import org.apache.iceberg.spark.SparkSchemaUtil
 
 import java.lang.{Long => JLong}
@@ -50,8 +51,9 @@ object GlutenIcebergSourceUtil {
       val tasks = partition.taskGroup[ScanTask]().tasks().asScala
       asFileScanTask(tasks.toList).foreach {
         task =>
-          val filePath = task.file().path().toString
-          paths.add(filePath)
+          paths.add(
+            BackendsApiManager.getTransformerApiInstance
+              .encodeFilePathIfNeed(task.file().path().toString))
           starts.add(task.start())
           lengths.add(task.length())
           partitionColumns.add(getPartitionColumns(task, readPartitionSchema))
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
index 9642f63b3b..69cea9c547 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
@@ -76,4 +76,7 @@ trait TransformerApi {
   def invalidateSQLExecutionResource(executionId: String): Unit = {}
 
   def genWriteParameters(fileFormat: FileFormat, writeOptions: Map[String, 
String]): Any
+
+  /** use Hadoop Path class to encode the file path */
+  def encodeFilePathIfNeed(filePath: String): String = filePath
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to