This is an automated email from the ASF dual-hosted git repository.
zhangzc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new ea0bcd5b70 [GLUTEN-8094][CH][Part-1] Support reading data from the
iceberg with CH backend (#8095)
ea0bcd5b70 is described below
commit ea0bcd5b7027f86e7625f9b77f1c5542697fb2a0
Author: Zhichao Zhang <[email protected]>
AuthorDate: Fri Nov 29 16:38:05 2024 +0800
[GLUTEN-8094][CH][Part-1] Support reading data from the iceberg with CH
backend (#8095)
* [GLUTEN-8094][CH][Part-1] Support reading data from the iceberg with CH
backend
Support reading data from the iceberg with CH backend
- basic iceberg scan transformer
- read from the iceberg with the copy-on-write mode
---
backends-clickhouse/pom.xml | 64 +++
.../backendsapi/clickhouse/CHTransformerApi.scala | 5 +
.../execution/iceberg/ClickHouseIcebergSuite.scala | 640 +++++++++++++++++++++
...ckHouseTPCHColumnarShuffleParquetAQESuite.scala | 26 +-
.../Storages/SubstraitSource/FormatFile.cpp | 4 +-
.../spark/source/GlutenIcebergSourceUtil.scala | 8 +-
.../apache/gluten/backendsapi/TransformerApi.scala | 3 +
7 files changed, 744 insertions(+), 6 deletions(-)
diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml
index c616266bf2..3a4a942247 100644
--- a/backends-clickhouse/pom.xml
+++ b/backends-clickhouse/pom.xml
@@ -13,6 +13,70 @@
<packaging>jar</packaging>
<name>Gluten Backends ClickHouse</name>
+ <profiles>
+ <profile>
+ <id>iceberg</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.gluten</groupId>
+ <artifactId>gluten-iceberg</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+
<artifactId>iceberg-spark-${sparkbundle.version}_${scala.binary.version}</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+
<artifactId>iceberg-spark-runtime-${sparkbundle.version}_${scala.binary.version}</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-iceberg-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.basedir}/src/main-iceberg/scala</source>
+ <source>${project.basedir}/src/main-iceberg/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-iceberg-test-sources</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.basedir}/src/test-iceberg/scala</source>
+ <source>${project.basedir}/src/test-iceberg/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
index 438c6cd3df..0be8cf2c25 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHTransformerApi.scala
@@ -42,6 +42,7 @@ import org.apache.spark.util.collection.BitSet
import com.google.common.collect.Lists
import com.google.protobuf.{Any, Message}
+import org.apache.hadoop.fs.Path
import java.util
@@ -269,4 +270,8 @@ class CHTransformerApi extends TransformerApi with Logging {
}
packPBMessage(write.build())
}
+
+ /** use Hadoop Path class to encode the file path */
+ override def encodeFilePathIfNeed(filePath: String): String =
+ (new Path(filePath)).toUri.toASCIIString
}
diff --git
a/backends-clickhouse/src/test-iceberg/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
b/backends-clickhouse/src/test-iceberg/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
new file mode 100644
index 0000000000..676061235e
--- /dev/null
+++
b/backends-clickhouse/src/test-iceberg/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
@@ -0,0 +1,640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.iceberg
+
+import
org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite,
IcebergScanTransformer}
+import org.apache.gluten.GlutenConfig
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.Row
+
+class ClickHouseIcebergSuite extends
GlutenClickHouseWholeStageTransformerSuite {
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set("spark.sql.shuffle.partitions", "2")
+ .set("spark.memory.offHeap.size", "2g")
+ .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+ .set("spark.sql.autoBroadcastJoinThreshold", "-1")
+ .set(
+ "spark.sql.extensions",
+ "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+ .set("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkCatalog")
+ .set("spark.sql.catalog.spark_catalog.type", "hadoop")
+ .set("spark.sql.catalog.spark_catalog.warehouse",
s"file://$basePath/tpch-data-iceberg")
+ }
+
+ test("iceberg transformer exists") {
+ withTable("iceberg_tb") {
+ spark.sql(
+ """
+ |create table iceberg_tb using iceberg as
+ |(select 1 as col1, 2 as col2, 3 as col3)
+ |""".stripMargin)
+
+ runQueryAndCompare(
+ """
+ |select * from iceberg_tb;
+ |""".stripMargin) {
+ checkGlutenOperatorMatch[IcebergScanTransformer]
+ }
+ }
+ }
+
+ testWithSpecifiedSparkVersion(
+ "iceberg bucketed join", Array("3.3", "3.5")) {
+ val leftTable = "p_str_tb"
+ val rightTable = "p_int_tb"
+ withTable(leftTable, rightTable) {
+ // Partition key of string type.
+ withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
+ // Gluten does not support write iceberg table.
+ spark.sql(
+ s"""
+ |create table $leftTable(id int, name string, p string)
+ |using iceberg
+ |partitioned by (bucket(4, id));
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into table $leftTable values
+ |(4, 'a5', 'p4'),
+ |(1, 'a1', 'p1'),
+ |(2, 'a3', 'p2'),
+ |(1, 'a2', 'p1'),
+ |(3, 'a4', 'p3');
+ |""".stripMargin
+ )
+ }
+
+ // Partition key of integer type.
+ withSQLConf(
+ GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
+ ) {
+ // Gluten does not support write iceberg table.
+ spark.sql(
+ s"""
+ |create table $rightTable(id int, name string, p int)
+ |using iceberg
+ |partitioned by (bucket(4, id));
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into table $rightTable values
+ |(3, 'b4', 23),
+ |(1, 'b2', 21),
+ |(4, 'b5', 24),
+ |(2, 'b3', 22),
+ |(1, 'b1', 21);
+ |""".stripMargin
+ )
+ }
+
+ withSQLConf(
+ "spark.sql.sources.v2.bucketing.enabled" -> "true",
+ "spark.sql.requireAllClusterKeysForCoPartition" -> "false",
+ "spark.sql.adaptive.enabled" -> "false",
+ "spark.sql.iceberg.planning.preserve-data-grouping" -> "true",
+ "spark.sql.autoBroadcastJoinThreshold" -> "-1",
+ "spark.sql.sources.v2.bucketing.pushPartValues.enabled" -> "true"
+ ) {
+ runQueryAndCompare(
+ s"""
+ |select s.id, s.name, i.name, i.p
+ | from $leftTable s inner join $rightTable i
+ | on s.id = i.id;
+ |""".stripMargin) {
+ df => {
+ assert(
+ getExecutedPlan(df).count(
+ plan => {
+ plan.isInstanceOf[IcebergScanTransformer]
+ }) == 2)
+ getExecutedPlan(df).map {
+ case plan if plan.isInstanceOf[IcebergScanTransformer] =>
+ assert(
+
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
+
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
+ case _ => // do nothing
+ }
+ checkLengthAndPlan(df, 7)
+ }
+ }
+ }
+ }
+ }
+
+ testWithSpecifiedSparkVersion(
+ "iceberg bucketed join with partition", Array("3.3", "3.5")) {
+ val leftTable = "p_str_tb"
+ val rightTable = "p_int_tb"
+ withTable(leftTable, rightTable) {
+ // Partition key of string type.
+ withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
+ // Gluten does not support write iceberg table.
+ spark.sql(
+ s"""
+ |create table $leftTable(id int, name string, p int)
+ |using iceberg
+ |partitioned by (bucket(4, id), p);
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into table $leftTable values
+ |(4, 'a5', 2),
+ |(1, 'a1', 1),
+ |(2, 'a3', 1),
+ |(1, 'a2', 1),
+ |(3, 'a4', 2);
+ |""".stripMargin
+ )
+ }
+
+ // Partition key of integer type.
+ withSQLConf(
+ GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
+ ) {
+ // Gluten does not support write iceberg table.
+ spark.sql(
+ s"""
+ |create table $rightTable(id int, name string, p int)
+ |using iceberg
+ |partitioned by (bucket(4, id), p);
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into table $rightTable values
+ |(3, 'b4', 2),
+ |(1, 'b2', 1),
+ |(4, 'b5', 2),
+ |(2, 'b3', 1),
+ |(1, 'b1', 1);
+ |""".stripMargin
+ )
+ }
+
+ withSQLConf(
+ "spark.sql.sources.v2.bucketing.enabled" -> "true",
+ "spark.sql.requireAllClusterKeysForCoPartition" -> "false",
+ "spark.sql.adaptive.enabled" -> "false",
+ "spark.sql.iceberg.planning.preserve-data-grouping" -> "true",
+ "spark.sql.autoBroadcastJoinThreshold" -> "-1",
+ "spark.sql.sources.v2.bucketing.pushPartValues.enabled" -> "true"
+ ) {
+ runQueryAndCompare(
+ s"""
+ |select s.id, s.name, i.name, i.p
+ | from $leftTable s inner join $rightTable i
+ | on s.id = i.id and s.p = i.p;
+ |""".stripMargin) {
+ df => {
+ assert(
+ getExecutedPlan(df).count(
+ plan => {
+ plan.isInstanceOf[IcebergScanTransformer]
+ }) == 2)
+ getExecutedPlan(df).map {
+ case plan if plan.isInstanceOf[IcebergScanTransformer] =>
+ assert(
+
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
+
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
+ case _ => // do nothing
+ }
+ checkLengthAndPlan(df, 7)
+ }
+ }
+ }
+ }
+ }
+
+ testWithSpecifiedSparkVersion(
+ "iceberg bucketed join with partition filter", Array("3.3", "3.5")) {
+ val leftTable = "p_str_tb"
+ val rightTable = "p_int_tb"
+ withTable(leftTable, rightTable) {
+ // Partition key of string type.
+ withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
+ // Gluten does not support write iceberg table.
+ spark.sql(
+ s"""
+ |create table $leftTable(id int, name string, p int)
+ |using iceberg
+ |partitioned by (bucket(4, id), p);
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into table $leftTable values
+ |(4, 'a5', 2),
+ |(1, 'a1', 1),
+ |(2, 'a3', 1),
+ |(1, 'a2', 1),
+ |(3, 'a4', 2);
+ |""".stripMargin
+ )
+ }
+
+ // Partition key of integer type.
+ withSQLConf(
+ GlutenConfig.GLUTEN_ENABLED_KEY -> "false"
+ ) {
+ // Gluten does not support write iceberg table.
+ spark.sql(
+ s"""
+ |create table $rightTable(id int, name string, p int)
+ |using iceberg
+ |partitioned by (bucket(4, id), p);
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into table $rightTable values
+ |(3, 'b4', 2),
+ |(1, 'b2', 1),
+ |(4, 'b5', 2),
+ |(2, 'b3', 1),
+ |(1, 'b1', 1);
+ |""".stripMargin
+ )
+ }
+
+ withSQLConf(
+ "spark.sql.sources.v2.bucketing.enabled" -> "true",
+ "spark.sql.requireAllClusterKeysForCoPartition" -> "false",
+ "spark.sql.adaptive.enabled" -> "false",
+ "spark.sql.iceberg.planning.preserve-data-grouping" -> "true",
+ "spark.sql.autoBroadcastJoinThreshold" -> "-1",
+ "spark.sql.sources.v2.bucketing.pushPartValues.enabled" -> "true"
+ ) {
+ runQueryAndCompare(
+ s"""
+ |select s.id, s.name, i.name, i.p
+ | from $leftTable s inner join $rightTable i
+ | on s.id = i.id
+ | where s.p = 1 and i.p = 1;
+ |""".stripMargin) {
+ df => {
+ assert(
+ getExecutedPlan(df).count(
+ plan => {
+ plan.isInstanceOf[IcebergScanTransformer]
+ }) == 2)
+ getExecutedPlan(df).map {
+ case plan if plan.isInstanceOf[IcebergScanTransformer] =>
+ assert(
+
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
+
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1)
+ case _ => // do nothing
+ }
+ checkLengthAndPlan(df, 5)
+ }
+ }
+ }
+ }
+ }
+
+ testWithSpecifiedSparkVersion("iceberg: time travel") {
+ withTable("iceberg_tm") {
+ spark.sql(
+ s"""
+ |create table iceberg_tm (id int, name string) using iceberg
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into iceberg_tm values (1, "v1"), (2, "v2")
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into iceberg_tm values (3, "v3"), (4, "v4")
+ |""".stripMargin)
+
+ val df =
+ spark.sql("select snapshot_id from default.iceberg_tm.snapshots where
parent_id is null")
+ val value = df.collectAsList().get(0).getAs[Long](0);
+ spark.sql(s"call
system.set_current_snapshot('default.iceberg_tm',$value)");
+ val data = runQueryAndCompare("select * from iceberg_tm") { _ => }
+ checkLengthAndPlan(data, 2)
+ checkAnswer(data, Row(1, "v1") :: Row(2, "v2") :: Nil)
+ }
+ }
+
+ test("iceberg: partition filters") {
+ withTable("iceberg_pf") {
+ spark.sql(
+ s"""
+ |create table iceberg_pf (id int, name string)
+ | using iceberg partitioned by (name)
+ |""".stripMargin)
+ spark.sql(
+ s"""
+ |insert into iceberg_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4,
"v2")
+ |""".stripMargin)
+ val df1 = runQueryAndCompare("select * from iceberg_pf where name =
'v1'") { _ => }
+ checkLengthAndPlan(df1, 2)
+ checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil)
+ }
+ }
+
+ test("iceberg read mor table - delete and update") {
+ withTable("iceberg_mor_tb") {
+ withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
+ spark.sql(
+ """
+ |create table iceberg_mor_tb (
+ | id int,
+ | name string,
+ | p string
+ |) using iceberg
+ |tblproperties (
+ | 'format-version' = '2'
+ |)
+ |partitioned by (p);
+ |""".stripMargin)
+
+ // Insert some test rows.
+ spark.sql(
+ """
+ |insert into table iceberg_mor_tb
+ |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
+ | (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
+ |""".stripMargin)
+
+ // Delete row.
+ spark.sql(
+ """
+ |delete from iceberg_mor_tb where name = 'a1';
+ |""".stripMargin
+ )
+ // Update row.
+ spark.sql(
+ """
+ |update iceberg_mor_tb set name = 'new_a2' where id = 'a2';
+ |""".stripMargin
+ )
+ // Delete row again.
+ spark.sql(
+ """
+ |delete from iceberg_mor_tb where id = 6;
+ |""".stripMargin
+ )
+ }
+ runQueryAndCompare(
+ """
+ |select * from iceberg_mor_tb;
+ |""".stripMargin) {
+ checkGlutenOperatorMatch[IcebergScanTransformer]
+ }
+ }
+ }
+
+ // TODO: support merge-on-read mode
+ ignore("iceberg read mor table - delete and update with merge-on-read mode")
{
+ withTable("iceberg_mor_tb") {
+ withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
+ spark.sql(
+ """
+ |create table iceberg_mor_tb (
+ | id int,
+ | name string,
+ | p string
+ |) using iceberg
+ |tblproperties (
+ | 'format-version' = '2',
+ | 'write.delete.mode' = 'merge-on-read',
+ | 'write.update.mode' = 'merge-on-read',
+ | 'write.merge.mode' = 'merge-on-read'
+ |)
+ |partitioned by (p);
+ |""".stripMargin)
+
+ // Insert some test rows.
+ spark.sql(
+ """
+ |insert into table iceberg_mor_tb
+ |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
+ | (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
+ |""".stripMargin)
+
+ // Delete row.
+ spark.sql(
+ """
+ |delete from iceberg_mor_tb where name = 'a1';
+ |""".stripMargin
+ )
+ // Update row.
+ spark.sql(
+ """
+ |update iceberg_mor_tb set name = 'new_a2' where id = 'a2';
+ |""".stripMargin
+ )
+ // Delete row again.
+ spark.sql(
+ """
+ |delete from iceberg_mor_tb where id = 6;
+ |""".stripMargin
+ )
+ }
+ runQueryAndCompare(
+ """
+ |select * from iceberg_mor_tb;
+ |""".stripMargin) {
+ checkGlutenOperatorMatch[IcebergScanTransformer]
+ }
+ }
+ }
+
+ test("iceberg read mor table - merge into") {
+ withTable("iceberg_mor_tb", "merge_into_source_tb") {
+ withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
+ spark.sql(
+ """
+ |create table iceberg_mor_tb (
+ | id int,
+ | name string,
+ | p string
+ |) using iceberg
+ |tblproperties (
+ | 'format-version' = '2'
+ |)
+ |partitioned by (p);
+ |""".stripMargin)
+ spark.sql(
+ """
+ |create table merge_into_source_tb (
+ | id int,
+ | name string,
+ | p string
+ |) using iceberg;
+ |""".stripMargin)
+
+ // Insert some test rows.
+ spark.sql(
+ """
+ |insert into table iceberg_mor_tb
+ |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
+ |""".stripMargin)
+ spark.sql(
+ """
+ |insert into table merge_into_source_tb
+ |values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1', 'p1'),
+ | (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2');
+ |""".stripMargin)
+
+ // Delete row.
+ spark.sql(
+ """
+ |delete from iceberg_mor_tb where name = 'a1';
+ |""".stripMargin
+ )
+ // Update row.
+ spark.sql(
+ """
+ |update iceberg_mor_tb set name = 'new_a2' where id = 'a2';
+ |""".stripMargin
+ )
+
+ // Merge into.
+ spark.sql(
+ """
+ |merge into iceberg_mor_tb t
+ |using (select * from merge_into_source_tb) s
+ |on t.id = s.id
+ |when matched then
+ | update set t.name = s.name, t.p = s.p
+ |when not matched then
+ | insert (id, name, p) values (s.id, s.name, s.p);
+ |""".stripMargin
+ )
+ }
+ runQueryAndCompare(
+ """
+ |select * from iceberg_mor_tb;
+ |""".stripMargin) {
+ checkGlutenOperatorMatch[IcebergScanTransformer]
+ }
+ }
+ }
+
+ // TODO: support merge-on-read mode
+ ignore("iceberg read mor table - merge into with merge-on-read mode") {
+ withTable("iceberg_mor_tb", "merge_into_source_tb") {
+ withSQLConf(GlutenConfig.GLUTEN_ENABLED_KEY -> "false") {
+ spark.sql(
+ """
+ |create table iceberg_mor_tb (
+ | id int,
+ | name string,
+ | p string
+ |) using iceberg
+ |tblproperties (
+ | 'format-version' = '2',
+ | 'write.delete.mode' = 'merge-on-read',
+ | 'write.update.mode' = 'merge-on-read',
+ | 'write.merge.mode' = 'merge-on-read'
+ |)
+ |partitioned by (p);
+ |""".stripMargin)
+ spark.sql(
+ """
+ |create table merge_into_source_tb (
+ | id int,
+ | name string,
+ | p string
+ |) using iceberg;
+ |""".stripMargin)
+
+ // Insert some test rows.
+ spark.sql(
+ """
+ |insert into table iceberg_mor_tb
+ |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
+ |""".stripMargin)
+ spark.sql(
+ """
+ |insert into table merge_into_source_tb
+ |values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1', 'p1'),
+ | (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2');
+ |""".stripMargin)
+
+ // Delete row.
+ spark.sql(
+ """
+ |delete from iceberg_mor_tb where name = 'a1';
+ |""".stripMargin
+ )
+ // Update row.
+ spark.sql(
+ """
+ |update iceberg_mor_tb set name = 'new_a2' where id = 'a2';
+ |""".stripMargin
+ )
+
+ // Merge into.
+ spark.sql(
+ """
+ |merge into iceberg_mor_tb t
+ |using (select * from merge_into_source_tb) s
+ |on t.id = s.id
+ |when matched then
+ | update set t.name = s.name, t.p = s.p
+ |when not matched then
+ | insert (id, name, p) values (s.id, s.name, s.p);
+ |""".stripMargin
+ )
+ }
+ runQueryAndCompare(
+ """
+ |select * from iceberg_mor_tb;
+ |""".stripMargin) {
+ checkGlutenOperatorMatch[IcebergScanTransformer]
+ }
+ }
+ }
+
+ // Spark configuration spark.sql.iceberg.handle-timestamp-without-timezone
is not supported
+ // in Spark 3.4
+ // TODO: there is a bug when using timestamp type as the partition column
+ testWithSpecifiedSparkVersion("iceberg partition type - timestamp",
Array("")) {
+ Seq("true", "false").foreach {
+ flag =>
+ withSQLConf(
+ "spark.sql.iceberg.handle-timestamp-without-timezone" -> flag,
+ "spark.sql.iceberg.use-timestamp-without-timezone-in-new-tables" ->
flag) {
+ withTable("part_by_timestamp") {
+ spark.sql(
+ """
+ |create table part_by_timestamp (
+ | p timestamp
+ |) using iceberg
+ |tblproperties (
+ | 'format-version' = '1'
+ |)
+ |partitioned by (p);
+ |""".stripMargin)
+
+ // Insert some test rows.
+ spark.sql(
+ """
+ |insert into table part_by_timestamp
+ |values (TIMESTAMP '2022-01-01 00:01:20');
+ |""".stripMargin)
+ val df = spark.sql("select * from part_by_timestamp")
+ checkAnswer(df, Row(java.sql.Timestamp.valueOf("2022-01-01
00:01:20")) :: Nil)
+ }
+ }
+ }
+ }
+}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
index 1c3e4de4c6..ad9cb854d9 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/tpch/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala
@@ -18,7 +18,6 @@ package org.apache.gluten.execution.tpch
import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution._
-import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.utils.Arm
import org.apache.spark.SparkConf
@@ -116,6 +115,31 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite
}
}
+ // TODO: there is a bug when using timestamp type as the partition column
+ ignore("test timestamp as partition column") {
+ spark.sql("""
+ |create table part_by_timestamp (
+ | a int,
+ | b timestamp,
+ | c string,
+ | p timestamp
+ |) using parquet
+ |partitioned by (p);
+ |""".stripMargin)
+
+ // Insert some test rows.
+ spark.sql("""
+ |insert into table part_by_timestamp
+ |values
+ |(1, TIMESTAMP '2022-01-01 00:01:20', '2022-01-01 00:01:20',
+ |TIMESTAMP '2022-01-01 00:01:20');
+ |""".stripMargin)
+ compareResultsAgainstVanillaSpark(
+ "select a, b, to_timestamp(c), p from part_by_timestamp",
+ compareResult = true,
+ customCheck = { _ => })
+ }
+
test("TPCH Q2") {
runTPCHQuery(2) {
df =>
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp
index 3c68c70e66..ee54a8ff5e 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FormatFile.cpp
@@ -87,7 +87,7 @@ FormatFilePtr FormatFileUtil::createFile(
DB::ContextPtr context, ReadBufferBuilderPtr read_buffer_builder, const
substrait::ReadRel::LocalFiles::FileOrFiles & file)
{
#if USE_PARQUET
- if (file.has_parquet())
+ if (file.has_parquet() || (file.has_iceberg() &&
file.iceberg().has_parquet()))
{
auto config = ExecutorConfig::loadFromContext(context);
return std::make_shared<ParquetFormatFile>(context, file,
read_buffer_builder, config.use_local_format);
@@ -95,7 +95,7 @@ FormatFilePtr FormatFileUtil::createFile(
#endif
#if USE_ORC
- if (file.has_orc())
+ if (file.has_orc() || (file.has_iceberg() && file.iceberg().has_orc()))
return std::make_shared<ORCFormatFile>(context, file,
read_buffer_builder);
#endif
diff --git
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
index ad8222cff5..a7451355b0 100644
---
a/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
+++
b/gluten-iceberg/src/main/scala/org/apache/iceberg/spark/source/GlutenIcebergSourceUtil.scala
@@ -16,6 +16,7 @@
*/
package org.apache.iceberg.spark.source
+import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.exception.GlutenNotSupportException
import org.apache.gluten.substrait.rel.{IcebergLocalFilesBuilder, SplitInfo}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
@@ -25,7 +26,7 @@ import
org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils
import org.apache.spark.sql.connector.read.{InputPartition, Scan}
import org.apache.spark.sql.types.StructType
-import org.apache.iceberg.{CombinedScanTask, DeleteFile, FileFormat,
FileScanTask, ScanTask, Schema}
+import org.apache.iceberg._
import org.apache.iceberg.spark.SparkSchemaUtil
import java.lang.{Long => JLong}
@@ -50,8 +51,9 @@ object GlutenIcebergSourceUtil {
val tasks = partition.taskGroup[ScanTask]().tasks().asScala
asFileScanTask(tasks.toList).foreach {
task =>
- val filePath = task.file().path().toString
- paths.add(filePath)
+ paths.add(
+ BackendsApiManager.getTransformerApiInstance
+ .encodeFilePathIfNeed(task.file().path().toString))
starts.add(task.start())
lengths.add(task.length())
partitionColumns.add(getPartitionColumns(task, readPartitionSchema))
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
index 9642f63b3b..69cea9c547 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/TransformerApi.scala
@@ -76,4 +76,7 @@ trait TransformerApi {
def invalidateSQLExecutionResource(executionId: String): Unit = {}
def genWriteParameters(fileFormat: FileFormat, writeOptions: Map[String,
String]): Any
+
+ /** use Hadoop Path class to encode the file path */
+ def encodeFilePathIfNeed(filePath: String): String = filePath
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]