This is an automated email from the ASF dual-hosted git repository.

felixybw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new ee75c91741 Include iceberg module in spotless plugin and apply 
formatting fixes
ee75c91741 is described below

commit ee75c91741b045d4ad1b45f37a91962601d08ea8
Author: PHILO-HE <[email protected]>
AuthorDate: Wed Nov 5 09:59:08 2025 +0800

    Include iceberg module in spotless plugin and apply formatting fixes
    
    The backends-clickhouse module has its own setting for "includes" for scala 
code, which overrides the setting in parent pom. The code under "src-icerberg" 
should be included.
---
 backends-clickhouse/pom.xml                        |   2 +
 .../ClickHouseIcebergHiveTableSupport.scala        |  22 +-
 ...ClickHouseIcebergMOREqualityDeletionSuite.scala |  14 +-
 .../execution/iceberg/ClickHouseIcebergSuite.scala | 657 ++++++++++-----------
 4 files changed, 329 insertions(+), 366 deletions(-)

diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml
index b1ca75609e..ceef10d22d 100644
--- a/backends-clickhouse/pom.xml
+++ b/backends-clickhouse/pom.xml
@@ -417,6 +417,8 @@
               <include>src-delta/test/scala/**/*.scala</include>
               
<include>src-delta${delta.binary.version}/main/scala/**/*.scala</include>
               
<include>src-delta${delta.binary.version}/test/scala/**/*.scala</include>
+              <include>src-iceberg/main/scala/**/*.scala</include>
+              <include>src-iceberg/test/scala/**/*.scala</include>
             </includes>
             <excludes>
               
<exclude>src-delta${delta.binary.version}/main/scala/org/apache/spark/sql/delta/commands/*.scala</exclude>
diff --git 
a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
 
b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
index f17274cf98..efd26a8572 100644
--- 
a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
+++ 
b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
@@ -14,17 +14,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.gluten.execution.iceberg
 
-import com.google.common.base.Strings
-
 import org.apache.gluten.config.GlutenConfig
 
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.SparkSession
 import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
 
+import com.google.common.base.Strings
+
 class ClickHouseIcebergHiveTableSupport {
 
   private val sparkConf: SparkConf = new SparkConf()
@@ -61,7 +60,8 @@ class ClickHouseIcebergHiveTableSupport {
       .set("spark.unsafe.exceptionOnMemoryLeak", "true")
       .set("spark.sql.autoBroadcastJoinThreshold", "-1")
       .setCHConfig("use_local_format", true)
-      .set("spark.sql.extensions",
+      .set(
+        "spark.sql.extensions",
         "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
       .set("spark.sql.catalog.spark_catalog", 
"org.apache.iceberg.spark.SparkSessionCatalog")
       .set("spark.sql.catalog.spark_catalog.type", "hive")
@@ -70,7 +70,8 @@ class ClickHouseIcebergHiveTableSupport {
       sparkConf.set("spark.hadoop.hive.metastore.uris", url)
     }
     if (!Strings.isNullOrEmpty(catalog)) {
-      sparkConf.set("spark.sql.catalog." + catalog, 
"org.apache.iceberg.spark.SparkCatalog")
+      sparkConf
+        .set("spark.sql.catalog." + catalog, 
"org.apache.iceberg.spark.SparkCatalog")
         .set("spark.sql.catalog." + catalog + ".type", "hive")
     }
     if (!Strings.isNullOrEmpty(path)) {
@@ -81,12 +82,11 @@ class ClickHouseIcebergHiveTableSupport {
 
   def initializeSession(): Unit = {
     if (_hiveSpark == null) {
-      _hiveSpark =
-        SparkSession
-          .builder()
-          .config(sparkConf)
-          .enableHiveSupport()
-          .getOrCreate()
+      _hiveSpark = SparkSession
+        .builder()
+        .config(sparkConf)
+        .enableHiveSupport()
+        .getOrCreate()
     }
   }
 
diff --git 
a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergMOREqualityDeletionSuite.scala
 
b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergMOREqualityDeletionSuite.scala
index 742827f7bf..1335ef0cc7 100644
--- 
a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergMOREqualityDeletionSuite.scala
+++ 
b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergMOREqualityDeletionSuite.scala
@@ -16,10 +16,12 @@
  */
 package org.apache.gluten.execution.iceberg
 
-import org.apache.commons.io.FileUtils
 import 
org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite, 
IcebergScanTransformer}
+
 import org.apache.spark.SparkConf
 
+import org.apache.commons.io.FileUtils
+
 import java.io.File
 
 class ClickHouseIcebergMOREqualityDeletionSuite extends 
GlutenClickHouseWholeStageTransformerSuite {
@@ -58,15 +60,13 @@ class ClickHouseIcebergMOREqualityDeletionSuite extends 
GlutenClickHouseWholeSta
     FileUtils.copyDirectory(equalityDeletedData, icebergPathDir)
   }
 
-  testWithSpecifiedSparkVersion(
-    "iceberg read mor table with equality deletion", "3.3", "3.5") {
+  testWithSpecifiedSparkVersion("iceberg read mor table with equality 
deletion", "3.3", "3.5") {
     // The table 'test_upsert_query' was generated by Flink + Iceberg from the 
iceberg ut,
     // the root path must be the '/tmp/junit6640909127060857423/default'
     val testTableName = "local.db.test_upsert_query"
-    runQueryAndCompare(
-      s"""
-        |select * from $testTableName;
-        |""".stripMargin) {
+    runQueryAndCompare(s"""
+                          |select * from $testTableName;
+                          |""".stripMargin) {
       checkGlutenOperatorMatch[IcebergScanTransformer]
     }
   }
diff --git 
a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
 
b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
index 95de2e0414..5dc02d02ab 100644
--- 
a/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
+++ 
b/backends-clickhouse/src-iceberg/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergSuite.scala
@@ -18,6 +18,7 @@ package org.apache.gluten.execution.iceberg
 
 import org.apache.gluten.config.GlutenConfig
 import 
org.apache.gluten.execution.{GlutenClickHouseWholeStageTransformerSuite, 
IcebergScanTransformer}
+
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.Row
 
@@ -41,35 +42,31 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
 
   test("iceberg transformer exists") {
     withTable("iceberg_tb") {
-      spark.sql(
-        """
-          |create table iceberg_tb using iceberg as
-          |(select 1 as col1, 2 as col2, 3 as col3)
-          |""".stripMargin)
-
-      runQueryAndCompare(
-        """
-          |select * from iceberg_tb;
-          |""".stripMargin) {
+      spark.sql("""
+                  |create table iceberg_tb using iceberg as
+                  |(select 1 as col1, 2 as col2, 3 as col3)
+                  |""".stripMargin)
+
+      runQueryAndCompare("""
+                           |select * from iceberg_tb;
+                           |""".stripMargin) {
         checkGlutenOperatorMatch[IcebergScanTransformer]
       }
     }
   }
 
-  testWithSpecifiedSparkVersion(
-    "iceberg bucketed join", "3.3", "3.5") {
+  testWithSpecifiedSparkVersion("iceberg bucketed join", "3.3", "3.5") {
     val leftTable = "p_str_tb"
     val rightTable = "p_int_tb"
     withTable(leftTable, rightTable) {
       // Partition key of string type.
       withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
         // Gluten does not support write iceberg table.
-        spark.sql(
-          s"""
-             |create table $leftTable(id int, name string, p string)
-             |using iceberg
-             |partitioned by (bucket(4, id));
-             |""".stripMargin)
+        spark.sql(s"""
+                     |create table $leftTable(id int, name string, p string)
+                     |using iceberg
+                     |partitioned by (bucket(4, id));
+                     |""".stripMargin)
         spark.sql(
           s"""
              |insert into table $leftTable values
@@ -87,12 +84,11 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
         GlutenConfig.GLUTEN_ENABLED.key -> "false"
       ) {
         // Gluten does not support write iceberg table.
-        spark.sql(
-          s"""
-             |create table $rightTable(id int, name string, p int)
-             |using iceberg
-             |partitioned by (bucket(4, id));
-             |""".stripMargin)
+        spark.sql(s"""
+                     |create table $rightTable(id int, name string, p int)
+                     |using iceberg
+                     |partitioned by (bucket(4, id));
+                     |""".stripMargin)
         spark.sql(
           s"""
              |insert into table $rightTable values
@@ -113,46 +109,44 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
         "spark.sql.autoBroadcastJoinThreshold" -> "-1",
         "spark.sql.sources.v2.bucketing.pushPartValues.enabled" -> "true"
       ) {
-        runQueryAndCompare(
-          s"""
-             |select s.id, s.name, i.name, i.p
-             | from $leftTable s inner join $rightTable i
-             | on s.id = i.id;
-             |""".stripMargin) {
-          df => {
-            assert(
-              getExecutedPlan(df).count(
-                plan => {
-                  plan.isInstanceOf[IcebergScanTransformer]
-                }) == 2)
-            getExecutedPlan(df).map {
-              case plan if plan.isInstanceOf[IcebergScanTransformer] =>
-                assert(
-                  
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-                
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
-              case _ => // do nothing
+        runQueryAndCompare(s"""
+                              |select s.id, s.name, i.name, i.p
+                              | from $leftTable s inner join $rightTable i
+                              | on s.id = i.id;
+                              |""".stripMargin) {
+          df =>
+            {
+              assert(
+                getExecutedPlan(df).count(
+                  plan => {
+                    plan.isInstanceOf[IcebergScanTransformer]
+                  }) == 2)
+              getExecutedPlan(df).map {
+                case plan if plan.isInstanceOf[IcebergScanTransformer] =>
+                  assert(
+                    
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
+                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
+                case _ => // do nothing
+              }
+              checkLengthAndPlan(df, 7)
             }
-            checkLengthAndPlan(df, 7)
-          }
         }
       }
     }
   }
 
-  testWithSpecifiedSparkVersion(
-    "iceberg bucketed join with partition", "3.3", "3.5") {
+  testWithSpecifiedSparkVersion("iceberg bucketed join with partition", "3.3", 
"3.5") {
     val leftTable = "p_str_tb"
     val rightTable = "p_int_tb"
     withTable(leftTable, rightTable) {
       // Partition key of string type.
       withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
         // Gluten does not support write iceberg table.
-        spark.sql(
-          s"""
-             |create table $leftTable(id int, name string, p int)
-             |using iceberg
-             |partitioned by (bucket(4, id), p);
-             |""".stripMargin)
+        spark.sql(s"""
+                     |create table $leftTable(id int, name string, p int)
+                     |using iceberg
+                     |partitioned by (bucket(4, id), p);
+                     |""".stripMargin)
         spark.sql(
           s"""
              |insert into table $leftTable values
@@ -170,12 +164,11 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
         GlutenConfig.GLUTEN_ENABLED.key -> "false"
       ) {
         // Gluten does not support write iceberg table.
-        spark.sql(
-          s"""
-             |create table $rightTable(id int, name string, p int)
-             |using iceberg
-             |partitioned by (bucket(4, id), p);
-             |""".stripMargin)
+        spark.sql(s"""
+                     |create table $rightTable(id int, name string, p int)
+                     |using iceberg
+                     |partitioned by (bucket(4, id), p);
+                     |""".stripMargin)
         spark.sql(
           s"""
              |insert into table $rightTable values
@@ -196,27 +189,27 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
         "spark.sql.autoBroadcastJoinThreshold" -> "-1",
         "spark.sql.sources.v2.bucketing.pushPartValues.enabled" -> "true"
       ) {
-        runQueryAndCompare(
-          s"""
-             |select s.id, s.name, i.name, i.p
-             | from $leftTable s inner join $rightTable i
-             | on s.id = i.id and s.p = i.p;
-             |""".stripMargin) {
-          df => {
-            assert(
-              getExecutedPlan(df).count(
-                plan => {
-                  plan.isInstanceOf[IcebergScanTransformer]
-                }) == 2)
-            getExecutedPlan(df).map {
-              case plan if plan.isInstanceOf[IcebergScanTransformer] =>
-                assert(
-                  
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-                
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
-              case _ => // do nothing
+        runQueryAndCompare(s"""
+                              |select s.id, s.name, i.name, i.p
+                              | from $leftTable s inner join $rightTable i
+                              | on s.id = i.id and s.p = i.p;
+                              |""".stripMargin) {
+          df =>
+            {
+              assert(
+                getExecutedPlan(df).count(
+                  plan => {
+                    plan.isInstanceOf[IcebergScanTransformer]
+                  }) == 2)
+              getExecutedPlan(df).map {
+                case plan if plan.isInstanceOf[IcebergScanTransformer] =>
+                  assert(
+                    
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
+                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 3)
+                case _ => // do nothing
+              }
+              checkLengthAndPlan(df, 7)
             }
-            checkLengthAndPlan(df, 7)
-          }
         }
       }
     }
@@ -228,12 +221,11 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
     withTable(leftTable, rightTable) {
       withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
         // Gluten does not support write iceberg table.
-        spark.sql(
-          s"""
-             |create table $leftTable(id int, name string, p string)
-             |using iceberg
-             |partitioned by (bucket(4, id));
-             |""".stripMargin)
+        spark.sql(s"""
+                     |create table $leftTable(id int, name string, p string)
+                     |using iceberg
+                     |partitioned by (bucket(4, id));
+                     |""".stripMargin)
         spark.sql(
           s"""
              |insert into table $leftTable values
@@ -253,12 +245,11 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
              |(10, 'a4', 'p3');
              |""".stripMargin
         )
-        spark.sql(
-          s"""
-             |create table $rightTable(id int, name string, p int)
-             |using iceberg
-             |partitioned by (bucket(4, id));
-             |""".stripMargin)
+        spark.sql(s"""
+                     |create table $rightTable(id int, name string, p int)
+                     |using iceberg
+                     |partitioned by (bucket(4, id));
+                     |""".stripMargin)
         spark.sql(
           s"""
              |insert into table $rightTable values
@@ -277,43 +268,43 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
         "spark.sql.sources.v2.bucketing.pushPartValues.enabled" -> "true",
         
"spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled" -> 
"false"
       ) {
-        runQueryAndCompare(
-          s"""
-             |select s.id, s.name, i.name, i.p
-             | from $leftTable s inner join $rightTable i
-             | on s.id = i.id;
-             |""".stripMargin) {
-          df => {
-            assert(
-              getExecutedPlan(df).count(
-                plan => {
-                  plan.isInstanceOf[IcebergScanTransformer]
-                }) == 2)
-            getExecutedPlan(df).map {
-              case plan: IcebergScanTransformer =>
-                assert(plan.getKeyGroupPartitioning.isDefined)
-                assert(plan.getSplitInfos.length == 3)
-              case _ => // do nothing
+        runQueryAndCompare(s"""
+                              |select s.id, s.name, i.name, i.p
+                              | from $leftTable s inner join $rightTable i
+                              | on s.id = i.id;
+                              |""".stripMargin) {
+          df =>
+            {
+              assert(
+                getExecutedPlan(df).count(
+                  plan => {
+                    plan.isInstanceOf[IcebergScanTransformer]
+                  }) == 2)
+              getExecutedPlan(df).map {
+                case plan: IcebergScanTransformer =>
+                  assert(plan.getKeyGroupPartitioning.isDefined)
+                  assert(plan.getSplitInfos.length == 3)
+                case _ => // do nothing
+              }
             }
-          }
         }
       }
     }
   }
 
   testWithMinSparkVersion(
-    "iceberg bucketed join partition value not exists partial cluster", "3.4") 
{
+    "iceberg bucketed join partition value not exists partial cluster",
+    "3.4") {
     val leftTable = "p_str_tb"
     val rightTable = "p_int_tb"
     withTable(leftTable, rightTable) {
       withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
         // Gluten does not support write iceberg table.
-        spark.sql(
-          s"""
-             |create table $leftTable(id int, name string, p string)
-             |using iceberg
-             |partitioned by (bucket(4, id));
-             |""".stripMargin)
+        spark.sql(s"""
+                     |create table $leftTable(id int, name string, p string)
+                     |using iceberg
+                     |partitioned by (bucket(4, id));
+                     |""".stripMargin)
         spark.sql(
           s"""
              |insert into table $leftTable values
@@ -333,12 +324,11 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
              |(10, 'a4', 'p3');
              |""".stripMargin
         )
-        spark.sql(
-          s"""
-             |create table $rightTable(id int, name string, p int)
-             |using iceberg
-             |partitioned by (bucket(4, id));
-             |""".stripMargin)
+        spark.sql(s"""
+                     |create table $rightTable(id int, name string, p int)
+                     |using iceberg
+                     |partitioned by (bucket(4, id));
+                     |""".stripMargin)
         spark.sql(
           s"""
              |insert into table $rightTable values
@@ -357,44 +347,42 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
         "spark.sql.sources.v2.bucketing.pushPartValues.enabled" -> "true",
         
"spark.sql.sources.v2.bucketing.partiallyClusteredDistribution.enabled" -> 
"true"
       ) {
-        runQueryAndCompare(
-          s"""
-             |select s.id, s.name, i.name, i.p
-             | from $leftTable s inner join $rightTable i
-             | on s.id = i.id;
-             |""".stripMargin) {
-          df => {
-            assert(
-              getExecutedPlan(df).count(
-                plan => {
-                  plan.isInstanceOf[IcebergScanTransformer]
-                }) == 2)
-            getExecutedPlan(df).map {
-              case plan: IcebergScanTransformer =>
-                assert(plan.getKeyGroupPartitioning.isDefined)
-                assert(plan.getSplitInfos.length == 3)
-              case _ => // do nothing
+        runQueryAndCompare(s"""
+                              |select s.id, s.name, i.name, i.p
+                              | from $leftTable s inner join $rightTable i
+                              | on s.id = i.id;
+                              |""".stripMargin) {
+          df =>
+            {
+              assert(
+                getExecutedPlan(df).count(
+                  plan => {
+                    plan.isInstanceOf[IcebergScanTransformer]
+                  }) == 2)
+              getExecutedPlan(df).map {
+                case plan: IcebergScanTransformer =>
+                  assert(plan.getKeyGroupPartitioning.isDefined)
+                  assert(plan.getSplitInfos.length == 3)
+                case _ => // do nothing
+              }
             }
-          }
         }
       }
     }
   }
 
-  testWithSpecifiedSparkVersion(
-    "iceberg bucketed join with partition filter", "3.3", "3.5") {
+  testWithSpecifiedSparkVersion("iceberg bucketed join with partition filter", 
"3.3", "3.5") {
     val leftTable = "p_str_tb"
     val rightTable = "p_int_tb"
     withTable(leftTable, rightTable) {
       // Partition key of string type.
       withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
         // Gluten does not support write iceberg table.
-        spark.sql(
-          s"""
-             |create table $leftTable(id int, name string, p int)
-             |using iceberg
-             |partitioned by (bucket(4, id), p);
-             |""".stripMargin)
+        spark.sql(s"""
+                     |create table $leftTable(id int, name string, p int)
+                     |using iceberg
+                     |partitioned by (bucket(4, id), p);
+                     |""".stripMargin)
         spark.sql(
           s"""
              |insert into table $leftTable values
@@ -412,12 +400,11 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
         GlutenConfig.GLUTEN_ENABLED.key -> "false"
       ) {
         // Gluten does not support write iceberg table.
-        spark.sql(
-          s"""
-             |create table $rightTable(id int, name string, p int)
-             |using iceberg
-             |partitioned by (bucket(4, id), p);
-             |""".stripMargin)
+        spark.sql(s"""
+                     |create table $rightTable(id int, name string, p int)
+                     |using iceberg
+                     |partitioned by (bucket(4, id), p);
+                     |""".stripMargin)
         spark.sql(
           s"""
              |insert into table $rightTable values
@@ -438,28 +425,28 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
         "spark.sql.autoBroadcastJoinThreshold" -> "-1",
         "spark.sql.sources.v2.bucketing.pushPartValues.enabled" -> "true"
       ) {
-        runQueryAndCompare(
-          s"""
-             |select s.id, s.name, i.name, i.p
-             | from $leftTable s inner join $rightTable i
-             | on s.id = i.id
-             | where s.p = 1 and i.p = 1;
-             |""".stripMargin) {
-          df => {
-            assert(
-              getExecutedPlan(df).count(
-                plan => {
-                  plan.isInstanceOf[IcebergScanTransformer]
-                }) == 2)
-            getExecutedPlan(df).map {
-              case plan if plan.isInstanceOf[IcebergScanTransformer] =>
-                assert(
-                  
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
-                
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1)
-              case _ => // do nothing
+        runQueryAndCompare(s"""
+                              |select s.id, s.name, i.name, i.p
+                              | from $leftTable s inner join $rightTable i
+                              | on s.id = i.id
+                              | where s.p = 1 and i.p = 1;
+                              |""".stripMargin) {
+          df =>
+            {
+              assert(
+                getExecutedPlan(df).count(
+                  plan => {
+                    plan.isInstanceOf[IcebergScanTransformer]
+                  }) == 2)
+              getExecutedPlan(df).map {
+                case plan if plan.isInstanceOf[IcebergScanTransformer] =>
+                  assert(
+                    
plan.asInstanceOf[IcebergScanTransformer].getKeyGroupPartitioning.isDefined)
+                  
assert(plan.asInstanceOf[IcebergScanTransformer].getSplitInfos.length == 1)
+                case _ => // do nothing
+              }
+              checkLengthAndPlan(df, 5)
             }
-            checkLengthAndPlan(df, 5)
-          }
         }
       }
     }
@@ -467,18 +454,15 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
 
   test("iceberg: time travel") {
     withTable("iceberg_tm") {
-      spark.sql(
-        s"""
-           |create table iceberg_tm (id int, name string) using iceberg
-           |""".stripMargin)
-      spark.sql(
-        s"""
-           |insert into iceberg_tm values (1, "v1"), (2, "v2")
-           |""".stripMargin)
-      spark.sql(
-        s"""
-           |insert into iceberg_tm values (3, "v3"), (4, "v4")
-           |""".stripMargin)
+      spark.sql(s"""
+                   |create table iceberg_tm (id int, name string) using iceberg
+                   |""".stripMargin)
+      spark.sql(s"""
+                   |insert into iceberg_tm values (1, "v1"), (2, "v2")
+                   |""".stripMargin)
+      spark.sql(s"""
+                   |insert into iceberg_tm values (3, "v3"), (4, "v4")
+                   |""".stripMargin)
 
       val df =
         spark.sql("select snapshot_id from default.iceberg_tm.snapshots where 
parent_id is null")
@@ -492,15 +476,13 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
 
   test("iceberg: partition filters") {
     withTable("iceberg_pf") {
-      spark.sql(
-        s"""
-           |create table iceberg_pf (id int, name string)
-           | using iceberg partitioned by (name)
-           |""".stripMargin)
-      spark.sql(
-        s"""
-           |insert into iceberg_pf values (1, "v1"), (2, "v2"), (3, "v1"), (4, 
"v2")
-           |""".stripMargin)
+      spark.sql(s"""
+                   |create table iceberg_pf (id int, name string)
+                   | using iceberg partitioned by (name)
+                   |""".stripMargin)
+      spark.sql(s"""
+                   |insert into iceberg_pf values (1, "v1"), (2, "v2"), (3, 
"v1"), (4, "v2")
+                   |""".stripMargin)
       val df1 = runQueryAndCompare("select * from iceberg_pf where name = 
'v1'") { _ => }
       checkLengthAndPlan(df1, 2)
       checkAnswer(df1, Row(1, "v1") :: Row(3, "v1") :: Nil)
@@ -510,26 +492,24 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
   test("iceberg read cow table - delete and update") {
     withTable("iceberg_cow_tb") {
       withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
-        spark.sql(
-          """
-            |create table iceberg_cow_tb (
-            |  id int,
-            |  name string,
-            |  p string
-            |) using iceberg
-            |tblproperties (
-            |  'format-version' = '2'
-            |)
-            |partitioned by (p);
-            |""".stripMargin)
+        spark.sql("""
+                    |create table iceberg_cow_tb (
+                    |  id int,
+                    |  name string,
+                    |  p string
+                    |) using iceberg
+                    |tblproperties (
+                    |  'format-version' = '2'
+                    |)
+                    |partitioned by (p);
+                    |""".stripMargin)
 
         // Insert some test rows.
-        spark.sql(
-          """
-            |insert into table iceberg_cow_tb
-            |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
-            |       (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
-            |""".stripMargin)
+        spark.sql("""
+                    |insert into table iceberg_cow_tb
+                    |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
+                    |       (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
+                    |""".stripMargin)
 
         // Delete row.
         spark.sql(
@@ -550,10 +530,9 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
             |""".stripMargin
         )
       }
-      runQueryAndCompare(
-        """
-          |select * from iceberg_cow_tb;
-          |""".stripMargin) {
+      runQueryAndCompare("""
+                           |select * from iceberg_cow_tb;
+                           |""".stripMargin) {
         checkGlutenOperatorMatch[IcebergScanTransformer]
       }
     }
@@ -562,29 +541,27 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
   test("iceberg read mor table - delete and update") {
     withTable("iceberg_mor_tb") {
       withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
-        spark.sql(
-          """
-            |create table iceberg_mor_tb (
-            |  id int,
-            |  name string,
-            |  p string
-            |) using iceberg
-            |tblproperties (
-            |  'format-version' = '2',
-            |  'write.delete.mode' = 'merge-on-read',
-            |  'write.update.mode' = 'merge-on-read',
-            |  'write.merge.mode' = 'merge-on-read'
-            |)
-            |partitioned by (p);
-            |""".stripMargin)
+        spark.sql("""
+                    |create table iceberg_mor_tb (
+                    |  id int,
+                    |  name string,
+                    |  p string
+                    |) using iceberg
+                    |tblproperties (
+                    |  'format-version' = '2',
+                    |  'write.delete.mode' = 'merge-on-read',
+                    |  'write.update.mode' = 'merge-on-read',
+                    |  'write.merge.mode' = 'merge-on-read'
+                    |)
+                    |partitioned by (p);
+                    |""".stripMargin)
 
         // Insert some test rows.
-        spark.sql(
-          """
-            |insert into table iceberg_mor_tb
-            |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
-            |       (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
-            |""".stripMargin)
+        spark.sql("""
+                    |insert into table iceberg_mor_tb
+                    |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2'),
+                    |       (4, 'a4', 'p1'), (5, 'a5', 'p2'), (6, 'a6', 'p1');
+                    |""".stripMargin)
 
         // Delete row.
         spark.sql(
@@ -605,10 +582,9 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
             |""".stripMargin
         )
       }
-      runQueryAndCompare(
-        """
-          |select * from iceberg_mor_tb;
-          |""".stripMargin) {
+      runQueryAndCompare("""
+                           |select * from iceberg_mor_tb;
+                           |""".stripMargin) {
         checkGlutenOperatorMatch[IcebergScanTransformer]
       }
     }
@@ -617,39 +593,35 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
   test("iceberg read cow table - merge into") {
     withTable("iceberg_cow_tb", "merge_into_source_tb") {
       withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
-        spark.sql(
-          """
-            |create table iceberg_cow_tb (
-            |  id int,
-            |  name string,
-            |  p string
-            |) using iceberg
-            |tblproperties (
-            |  'format-version' = '2'
-            |)
-            |partitioned by (p);
-            |""".stripMargin)
-        spark.sql(
-          """
-            |create table merge_into_source_tb (
-            |  id int,
-            |  name string,
-            |  p string
-            |) using iceberg;
-            |""".stripMargin)
+        spark.sql("""
+                    |create table iceberg_cow_tb (
+                    |  id int,
+                    |  name string,
+                    |  p string
+                    |) using iceberg
+                    |tblproperties (
+                    |  'format-version' = '2'
+                    |)
+                    |partitioned by (p);
+                    |""".stripMargin)
+        spark.sql("""
+                    |create table merge_into_source_tb (
+                    |  id int,
+                    |  name string,
+                    |  p string
+                    |) using iceberg;
+                    |""".stripMargin)
 
         // Insert some test rows.
-        spark.sql(
-          """
-            |insert into table iceberg_cow_tb
-            |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
-            |""".stripMargin)
-        spark.sql(
-          """
-            |insert into table merge_into_source_tb
-            |values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1', 'p1'),
-            |       (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2');
-            |""".stripMargin)
+        spark.sql("""
+                    |insert into table iceberg_cow_tb
+                    |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
+                    |""".stripMargin)
+        spark.sql("""
+                    |insert into table merge_into_source_tb
+                    |values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1', 
'p1'),
+                    |       (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2');
+                    |""".stripMargin)
 
         // Delete row.
         spark.sql(
@@ -677,10 +649,9 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
             |""".stripMargin
         )
       }
-      runQueryAndCompare(
-        """
-          |select * from iceberg_cow_tb;
-          |""".stripMargin) {
+      runQueryAndCompare("""
+                           |select * from iceberg_cow_tb;
+                           |""".stripMargin) {
         checkGlutenOperatorMatch[IcebergScanTransformer]
       }
     }
@@ -689,42 +660,38 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
   test("iceberg read mor table - merge into with merge-on-read mode") {
     withTable("iceberg_mor_tb", "merge_into_source_tb") {
       withSQLConf(GlutenConfig.GLUTEN_ENABLED.key -> "false") {
-        spark.sql(
-          """
-            |create table iceberg_mor_tb (
-            |  id int,
-            |  name string,
-            |  p string
-            |) using iceberg
-            |tblproperties (
-            |  'format-version' = '2',
-            |  'write.delete.mode' = 'merge-on-read',
-            |  'write.update.mode' = 'merge-on-read',
-            |  'write.merge.mode' = 'merge-on-read'
-            |)
-            |partitioned by (p);
-            |""".stripMargin)
-        spark.sql(
-          """
-            |create table merge_into_source_tb (
-            |  id int,
-            |  name string,
-            |  p string
-            |) using iceberg;
-            |""".stripMargin)
+        spark.sql("""
+                    |create table iceberg_mor_tb (
+                    |  id int,
+                    |  name string,
+                    |  p string
+                    |) using iceberg
+                    |tblproperties (
+                    |  'format-version' = '2',
+                    |  'write.delete.mode' = 'merge-on-read',
+                    |  'write.update.mode' = 'merge-on-read',
+                    |  'write.merge.mode' = 'merge-on-read'
+                    |)
+                    |partitioned by (p);
+                    |""".stripMargin)
+        spark.sql("""
+                    |create table merge_into_source_tb (
+                    |  id int,
+                    |  name string,
+                    |  p string
+                    |) using iceberg;
+                    |""".stripMargin)
 
         // Insert some test rows.
-        spark.sql(
-          """
-            |insert into table iceberg_mor_tb
-            |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
-            |""".stripMargin)
-        spark.sql(
-          """
-            |insert into table merge_into_source_tb
-            |values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1', 'p1'),
-            |       (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2');
-            |""".stripMargin)
+        spark.sql("""
+                    |insert into table iceberg_mor_tb
+                    |values (1, 'a1', 'p1'), (2, 'a2', 'p1'), (3, 'a3', 'p2');
+                    |""".stripMargin)
+        spark.sql("""
+                    |insert into table merge_into_source_tb
+                    |values (1, 'a1_1', 'p2'), (2, 'a2_1', 'p2'), (3, 'a3_1', 
'p1'),
+                    |       (4, 'a4', 'p2'), (5, 'a5', 'p1'), (6, 'a6', 'p2');
+                    |""".stripMargin)
 
         // Delete row.
         spark.sql(
@@ -752,10 +719,9 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
             |""".stripMargin
         )
       }
-      runQueryAndCompare(
-        """
-          |select * from iceberg_mor_tb;
-          |""".stripMargin) {
+      runQueryAndCompare("""
+                           |select * from iceberg_mor_tb;
+                           |""".stripMargin) {
         checkGlutenOperatorMatch[IcebergScanTransformer]
       }
     }
@@ -771,23 +737,21 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
           "spark.sql.iceberg.handle-timestamp-without-timezone" -> flag,
           "spark.sql.iceberg.use-timestamp-without-timezone-in-new-tables" -> 
flag) {
           withTable("part_by_timestamp") {
-            spark.sql(
-              """
-                |create table part_by_timestamp (
-                |  p timestamp
-                |) using iceberg
-                |tblproperties (
-                |  'format-version' = '1'
-                |)
-                |partitioned by (p);
-                |""".stripMargin)
+            spark.sql("""
+                        |create table part_by_timestamp (
+                        |  p timestamp
+                        |) using iceberg
+                        |tblproperties (
+                        |  'format-version' = '1'
+                        |)
+                        |partitioned by (p);
+                        |""".stripMargin)
 
             // Insert some test rows.
-            spark.sql(
-              """
-                |insert into table part_by_timestamp
-                |values (TIMESTAMP '2022-01-01 00:01:20');
-                |""".stripMargin)
+            spark.sql("""
+                        |insert into table part_by_timestamp
+                        |values (TIMESTAMP '2022-01-01 00:01:20');
+                        |""".stripMargin)
             val df = spark.sql("select * from part_by_timestamp")
             checkAnswer(df, Row(java.sql.Timestamp.valueOf("2022-01-01 
00:01:20")) :: Nil)
           }
@@ -798,24 +762,21 @@ class ClickHouseIcebergSuite extends 
GlutenClickHouseWholeStageTransformerSuite
   test("test read v1 iceberg with partition drop") {
     val testTable = "test_table_with_partition"
     withTable(testTable) {
-      spark.sql(
-        s"""
-           |CREATE TABLE $testTable (id INT, data STRING, p1 STRING, p2 STRING)
-           |USING iceberg
-           |tblproperties (
-           |  'format-version' = '1'
-           |)
-           |PARTITIONED BY (p1, p2);
-           |""".stripMargin)
-      spark.sql(
-        s"""
-           |INSERT INTO $testTable VALUES
-           |(1, 'test_data', 'test_p1', 'test_p2');
-           |""".stripMargin)
-      spark.sql(
-        s"""
-           |ALTER TABLE $testTable DROP PARTITION FIELD p2
-           |""".stripMargin)
+      spark.sql(s"""
+                   |CREATE TABLE $testTable (id INT, data STRING, p1 STRING, 
p2 STRING)
+                   |USING iceberg
+                   |tblproperties (
+                   |  'format-version' = '1'
+                   |)
+                   |PARTITIONED BY (p1, p2);
+                   |""".stripMargin)
+      spark.sql(s"""
+                   |INSERT INTO $testTable VALUES
+                   |(1, 'test_data', 'test_p1', 'test_p2');
+                   |""".stripMargin)
+      spark.sql(s"""
+                   |ALTER TABLE $testTable DROP PARTITION FIELD p2
+                   |""".stripMargin)
       val resultDf = spark.sql(s"SELECT id, data, p1, p2 FROM $testTable")
       val result = resultDf.collect()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to