This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 33180440f5 [GLUTEN-8846][CH] [Part 4] Add full-chain UT (#9256)
33180440f5 is described below

commit 33180440f5daf668d51cb24a619ca129ae1d3966
Author: jlf <[email protected]>
AuthorDate: Tue Apr 22 17:16:57 2025 +0800

    [GLUTEN-8846][CH] [Part 4] Add full-chain UT (#9256)
    
    * [CH][draft] iceberg UT
    
    * [CH][draft] add UT
    
    * [CH][draft] run iceberg UT with diff spark version
    
    * Fixed occasional UT anomalies
---
 backends-clickhouse/pom.xml                        | 396 +++++++++++++--
 .../gluten/execution/iceberg/TestFlinkUpsert.java  | 538 +++++++++++++++++++++
 .../iceberg/TestPositionDeletesTableGluten.java    |  91 ++++
 .../ClickHouseIcebergHiveTableSupport.scala        | 101 ++++
 .../iceberg/TestPositionDeletesTableGluten.java    |  89 ++++
 .../ClickHouseIcebergHiveTableSupport.scala        | 101 ++++
 .../cache/GlutenClickHouseCacheBaseTestSuite.scala |   4 +-
 .../org/apache/gluten/utils/CacheTestHelper.scala  |  33 +-
 .../Storages/SubstraitSource/FileReader.cpp        |   4 +-
 9 files changed, 1299 insertions(+), 58 deletions(-)

diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml
index 8a20a0a2cb..f095d49903 100644
--- a/backends-clickhouse/pom.xml
+++ b/backends-clickhouse/pom.xml
@@ -43,11 +43,189 @@
         </dependency>
       </dependencies>
     </profile>
+    <profile>
+      <id>spark-3.3</id>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>add-spark33-sources</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>${project.basedir}/src-spark33/main/scala</source>
+                    <source>${project.basedir}src-spark33/main/java</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-spark33-resources</id>
+                <phase>generate-resources</phase>
+                <goals>
+                  <goal>add-resource</goal>
+                </goals>
+                <configuration>
+                  <resources>
+                    <resource>
+                      
<directory>${project.basedir}/src-spark33/main/resources</directory>
+                    </resource>
+                  </resources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-spark33-test-sources</id>
+                <phase>generate-test-sources</phase>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>${project.basedir}/src-spark33/test/scala</source>
+                    <source>${project.basedir}/src-spark33/test/java</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-spark33-test-resources</id>
+                <phase>generate-test-resources</phase>
+                <goals>
+                  <goal>add-test-resource</goal>
+                </goals>
+                <configuration>
+                  <resources>
+                    <resource>
+                      
<directory>${project.basedir}/src-spark33/test/resources</directory>
+                    </resource>
+                  </resources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>spark-3.5</id>
+      <properties>
+        <slf4j.version>1.7.36</slf4j.version>
+      </properties>
+      <activation>
+        <activeByDefault>false</activeByDefault>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+          <version>${hadoop.version}</version>
+          <scope>provided</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>org.apache.logging.log4j</groupId>
+              <artifactId>log4j-slf4j-impl</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client</artifactId>
+          <version>${hadoop.version}</version>
+          <scope>test</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>org.apache.logging.log4j</groupId>
+              <artifactId>log4j-slf4j-impl</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <dependency>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+          <version>${guava.version}</version>
+          <scope>provided</scope>
+        </dependency>
+      </dependencies>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.codehaus.mojo</groupId>
+            <artifactId>build-helper-maven-plugin</artifactId>
+            <executions>
+              <execution>
+                <id>add-spark35-sources</id>
+                <phase>generate-sources</phase>
+                <goals>
+                  <goal>add-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>${project.basedir}/src-spark35/main/scala</source>
+                    <source>${project.basedir}src-spark35/main/java</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-spark35-resources</id>
+                <phase>generate-resources</phase>
+                <goals>
+                  <goal>add-resource</goal>
+                </goals>
+                <configuration>
+                  <resources>
+                    <resource>
+                      
<directory>${project.basedir}/src-spark35/main/resources</directory>
+                    </resource>
+                  </resources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-spark35-test-sources</id>
+                <phase>generate-test-sources</phase>
+                <goals>
+                  <goal>add-test-source</goal>
+                </goals>
+                <configuration>
+                  <sources>
+                    <source>${project.basedir}/src-spark35/test/scala</source>
+                    <source>${project.basedir}/src-spark35/test/java</source>
+                  </sources>
+                </configuration>
+              </execution>
+              <execution>
+                <id>add-spark35-test-resources</id>
+                <phase>generate-test-resources</phase>
+                <goals>
+                  <goal>add-test-resource</goal>
+                </goals>
+                <configuration>
+                  <resources>
+                    <resource>
+                      
<directory>${project.basedir}/src-spark35/test/resources</directory>
+                    </resource>
+                  </resources>
+                </configuration>
+              </execution>
+            </executions>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
     <profile>
       <id>iceberg</id>
       <activation>
         <activeByDefault>false</activeByDefault>
       </activation>
+      <properties>
+        <flink.version>1.16.2</flink.version>
+      </properties>
       <dependencies>
         <dependency>
           <groupId>org.apache.gluten</groupId>
@@ -64,6 +242,133 @@
           <groupId>org.scala-lang.modules</groupId>
           
<artifactId>scala-collection-compat_${scala.binary.version}</artifactId>
         </dependency>
+        <dependency>
+          <groupId>org.apache.iceberg</groupId>
+          <artifactId>iceberg-api</artifactId>
+          <version>${iceberg.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.iceberg</groupId>
+          <artifactId>iceberg-api</artifactId>
+          <version>${iceberg.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.iceberg</groupId>
+          <artifactId>iceberg-flink-runtime-1.16</artifactId>
+          <version>${iceberg.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.iceberg</groupId>
+          <artifactId>iceberg-flink-1.16</artifactId>
+          <version>${iceberg.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.iceberg</groupId>
+          <artifactId>iceberg-flink-1.16</artifactId>
+          <version>${iceberg.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.iceberg</groupId>
+          <artifactId>iceberg-hive-metastore</artifactId>
+          <version>${iceberg.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.iceberg</groupId>
+          <artifactId>iceberg-hive-metastore</artifactId>
+          <version>${iceberg.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <!-- flink iceberg ut need junit 5, and use 5.10.1 -->
+        <dependency>
+          <groupId>org.junit.jupiter</groupId>
+          <artifactId>junit-jupiter</artifactId>
+          <scope>test</scope>
+          <version>5.10.1</version>
+        </dependency>
+        <dependency>
+          <groupId>org.junit.jupiter</groupId>
+          <artifactId>junit-jupiter-engine</artifactId>
+          <scope>test</scope>
+          <version>5.10.1</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-streaming-java</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-api-bridge-base</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-api-java</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-api-java-bridge</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-test-utils</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+        </dependency>
+        <!-- flink not support scala 2.13 -->
+        <dependency>
+          <groupId>org.apache.flink</groupId>
+          <artifactId>flink-table-planner_2.12</artifactId>
+          <version>${flink.version}</version>
+          <scope>provided</scope>
+          <exclusions>
+            <exclusion>
+              <groupId>org.scala-lang</groupId>
+              <artifactId>scala-library</artifactId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.iceberg</groupId>
+          
<artifactId>iceberg-spark-${sparkbundle.version}_${scala.binary.version}</artifactId>
+          <version>${iceberg.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.iceberg</groupId>
+          
<artifactId>iceberg-spark-${sparkbundle.version}_${scala.binary.version}</artifactId>
+          <version>${iceberg.version}</version>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.iceberg</groupId>
+          <artifactId>iceberg-data</artifactId>
+          <version>${iceberg.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.iceberg</groupId>
+          <artifactId>iceberg-data</artifactId>
+          <version>${iceberg.version}</version>
+          <scope>test</scope>
+        </dependency>
       </dependencies>
     </profile>
     <profile>
@@ -116,6 +421,10 @@
     </profile>
   </profiles>
 
+  <properties>
+    <surefire.skipTests>true</surefire.skipTests>
+  </properties>
+
   <dependencies>
     <dependency>
       <groupId>org.apache.gluten</groupId>
@@ -240,44 +549,44 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-       <groupId>org.apache.hive.hcatalog</groupId>
-       <artifactId>hive-hcatalog-core</artifactId>
-           <version>2.3.9</version>
-           <scope>test</scope>
-           <exclusions>
-          <exclusion>
-            <groupId>org.pentaho</groupId>
-            <artifactId>pentaho-aggdesigner-algorithm</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>net.minidev</groupId>
-            <artifactId>json-smart</artifactId>
-          </exclusion>
-          <exclusion>
-            <groupId>org.apache.hive</groupId>
-            <artifactId>hive-exec</artifactId>
-          </exclusion>
-          <exclusion>
-            <artifactId>guava</artifactId>
-            <groupId>com.google.guava</groupId>
-          </exclusion>
-          <exclusion>
-            <artifactId>hadoop-common</artifactId>
-            <groupId>org.apache.hadoop</groupId>
-          </exclusion>
-          <exclusion>
-            <artifactId>hadoop-hdfs</artifactId>
-            <groupId>org.apache.hadoop</groupId>
-          </exclusion>
-          <exclusion>
-            <artifactId>protobuf-java</artifactId>
-            <groupId>com.google.protobuf</groupId>
-          </exclusion>
-          <exclusion>
-            <artifactId>jdk.tools</artifactId>
-            <groupId>jdk.tools</groupId>
-          </exclusion>
-        </exclusions>
+      <groupId>org.apache.hive.hcatalog</groupId>
+      <artifactId>hive-hcatalog-core</artifactId>
+      <version>2.3.9</version>
+      <scope>test</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>org.pentaho</groupId>
+          <artifactId>pentaho-aggdesigner-algorithm</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>net.minidev</groupId>
+          <artifactId>json-smart</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hive</groupId>
+          <artifactId>hive-exec</artifactId>
+        </exclusion>
+        <exclusion>
+          <artifactId>guava</artifactId>
+          <groupId>com.google.guava</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-common</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>hadoop-hdfs</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>protobuf-java</artifactId>
+          <groupId>com.google.protobuf</groupId>
+        </exclusion>
+        <exclusion>
+          <artifactId>jdk.tools</artifactId>
+          <groupId>jdk.tools</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
@@ -551,6 +860,17 @@
           <treatWarningsAsErrors>true</treatWarningsAsErrors>
         </configuration>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <includes>
+            <include>**/*Test.java</include>
+            <include>**/Test*.java</include>
+          </includes>
+          <skipTests>${surefire.skipTests}</skipTests>
+        </configuration>
+      </plugin>
     </plugins>
   </build>
 </project>
diff --git 
a/backends-clickhouse/src-spark33/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java
 
b/backends-clickhouse/src-spark33/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java
new file mode 100644
index 0000000000..429feab086
--- /dev/null
+++ 
b/backends-clickhouse/src-spark33/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.iceberg;
+
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.util.Date;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.flink.CatalogTestBase;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+
+public class TestFlinkUpsert extends CatalogTestBase {
+
+  @Parameter(index = 2)
+  private FileFormat format;
+
+  @Parameter(index = 3)
+  private boolean isStreamingJob;
+
+  private final Map<String, String> tableUpsertProps = Maps.newHashMap();
+  private TableEnvironment tEnv;
+  private SparkSession spark;
+  private ClickHouseIcebergHiveTableSupport hiveTableSupport;
+
+  @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2}, 
isStreaming={3}")
+  public static List<Object[]> parameters() {
+    List<Object[]> parameters = Lists.newArrayList();
+    // ignore ORC and AVRO, ch backend only support PARQUET
+    for (FileFormat format : new FileFormat[] {FileFormat.PARQUET}) {
+      for (Boolean isStreaming : new Boolean[] {true, false}) {
+        // Only test with one catalog as this is a file operation concern.
+        // FlinkCatalogTestBase requires the catalog name start with 
testhadoop if using hadoop
+        // catalog.
+        String catalogName = "testhive";
+        Namespace baseNamespace = Namespace.empty();
+        parameters.add(new Object[] {catalogName, baseNamespace, format, 
isStreaming});
+      }
+    }
+    return parameters;
+  }
+
+  @Override
+  protected TableEnvironment getTableEnv() {
+    if (tEnv == null) {
+      synchronized (this) {
+        EnvironmentSettings.Builder settingsBuilder = 
EnvironmentSettings.newInstance();
+        if (isStreamingJob) {
+          settingsBuilder.inStreamingMode();
+          StreamExecutionEnvironment env =
+              StreamExecutionEnvironment.getExecutionEnvironment(
+                  MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+          env.enableCheckpointing(400);
+          env.setMaxParallelism(2);
+          env.setParallelism(2);
+          tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
+        } else {
+          settingsBuilder.inBatchMode();
+          tEnv = TableEnvironment.create(settingsBuilder.build());
+        }
+      }
+    }
+    return tEnv;
+  }
+
+  @Override
+  @BeforeEach
+  public void before() {
+    super.before();
+    sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
+    sql("USE CATALOG %s", catalogName);
+    sql("USE %s", DATABASE);
+    tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
+    tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
+    tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+
+    hiveTableSupport = new ClickHouseIcebergHiveTableSupport();
+    hiveTableSupport.initSparkConf(
+        hiveConf.get("hive.metastore.uris"),
+        catalogName,
+        String.format("file://%s", this.warehouseRoot()));
+    hiveTableSupport.initializeSession();
+    spark = hiveTableSupport.spark();
+  }
+
+  @Override
+  @AfterEach
+  public void clean() {
+    sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+    super.clean();
+
+    hiveTableSupport.clean();
+  }
+
+  static String toWithClause(Map<String, String> props) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("(");
+    int propCount = 0;
+    for (Map.Entry<String, String> entry : props.entrySet()) {
+      if (propCount > 0) {
+        builder.append(",");
+      }
+      builder
+          .append("'")
+          .append(entry.getKey())
+          .append("'")
+          .append("=")
+          .append("'")
+          .append(entry.getValue())
+          .append("'");
+      propCount++;
+    }
+    builder.append(")");
+    return builder.toString();
+  }
+
+  @TestTemplate
+  public void testUpsertAndQuery() {
+    String tableName = "test_upsert_query";
+    LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
+    LocalDate dt20220302 = LocalDate.of(2022, 3, 2);
+    Date dt20220301Spark =
+        
Date.from(dt20220301.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant());
+    Date dt20220302Spark =
+        
Date.from(dt20220302.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant());
+
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, dt DATE, "
+            + "PRIMARY KEY(id,dt) NOT ENFORCED) "
+            + "PARTITIONED BY (dt) WITH %s",
+        tableName, toWithClause(tableUpsertProps));
+
+    try {
+      sql(
+          "INSERT INTO %s VALUES "
+              + "(1, 'Bill', DATE '2022-03-01'),"
+              + "(1, 'Jane', DATE '2022-03-01'),"
+              + "(2, 'Jane', DATE '2022-03-01')",
+          tableName);
+
+      sql(
+          "INSERT INTO %s VALUES "
+              + "(2, 'Bill', DATE '2022-03-01'),"
+              + "(1, 'Jane', DATE '2022-03-02'),"
+              + "(2, 'Jane', DATE '2022-03-02')",
+          tableName);
+
+      List<Row> rowsOn20220301 =
+          Lists.newArrayList(Row.of(1, "Jane", dt20220301), Row.of(2, "Bill", 
dt20220301));
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName), 
rowsOn20220301);
+      List<Row> rowsOn20220301Spark =
+          Lists.newArrayList(
+              Row.of(1, "Jane", dt20220301Spark), Row.of(2, "Bill", 
dt20220301Spark));
+      TestHelpers.assertRows(
+          convertToFlinkRows(
+              spark.sql(
+                  String.format(
+                      Locale.ROOT,
+                      "SELECT * FROM %s.db.%s WHERE dt < '2022-03-02'",
+                      catalogName,
+                      tableName)),
+              3),
+          rowsOn20220301Spark);
+
+      List<Row> rowsOn20220302 =
+          Lists.newArrayList(Row.of(1, "Jane", dt20220302), Row.of(2, "Jane", 
dt20220302));
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName), 
rowsOn20220302);
+      List<Row> rowsOn20220302Spark =
+          Lists.newArrayList(
+              Row.of(1, "Jane", dt20220302Spark), Row.of(2, "Jane", 
dt20220302Spark));
+      TestHelpers.assertRows(
+          convertToFlinkRows(
+              spark.sql(
+                  String.format(
+                      Locale.ROOT,
+                      "SELECT * FROM %s.db.%s WHERE dt = '2022-03-02'",
+                      catalogName,
+                      tableName)),
+              3),
+          rowsOn20220302Spark);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Iterables.concat(rowsOn20220301, 
rowsOn20220302)));
+      TestHelpers.assertRows(
+          convertToFlinkRows(
+              spark.sql(
+                  String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", 
catalogName, tableName)),
+              3),
+          Lists.newArrayList(Iterables.concat(rowsOn20220301Spark, 
rowsOn20220302Spark)));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
+  }
+
+  private List<Row> convertToFlinkRows(Dataset<org.apache.spark.sql.Row> rows, 
int columnCount) {
+    return rows.collectAsList().stream()
+        .map(
+            r -> {
+              switch (columnCount) {
+                case 1:
+                  return Row.of(r.get(0));
+                case 2:
+                  return Row.of(r.get(0), r.get(1));
+                case 3:
+                  return Row.of(r.get(0), r.get(1), r.get(2));
+                default:
+                  throw new IllegalArgumentException("Unsupported column 
count: " + columnCount);
+              }
+            })
+        .collect(Collectors.toList());
+  }
+
+  @TestTemplate
+  public void testUpsertOptions() {
+    String tableName = "test_upsert_options";
+    LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
+    LocalDate dt20220302 = LocalDate.of(2022, 3, 2);
+    Date dt20220301Spark =
+        
Date.from(dt20220301.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant());
+    Date dt20220302Spark =
+        
Date.from(dt20220302.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant());
+
+    Map<String, String> optionsUpsertProps = Maps.newHashMap(tableUpsertProps);
+    optionsUpsertProps.remove(TableProperties.UPSERT_ENABLED);
+    sql(
+        "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, dt DATE, "
+            + "PRIMARY KEY(id,dt) NOT ENFORCED) "
+            + "PARTITIONED BY (dt) WITH %s",
+        tableName, toWithClause(optionsUpsertProps));
+
+    try {
+      sql(
+          "INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/  VALUES "
+              + "(1, 'Bill', DATE '2022-03-01'),"
+              + "(1, 'Jane', DATE '2022-03-01'),"
+              + "(2, 'Jane', DATE '2022-03-01')",
+          tableName);
+
+      sql(
+          "INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/  VALUES "
+              + "(2, 'Bill', DATE '2022-03-01'),"
+              + "(1, 'Jane', DATE '2022-03-02'),"
+              + "(2, 'Jane', DATE '2022-03-02')",
+          tableName);
+
+      List<Row> rowsOn20220301 =
+          Lists.newArrayList(Row.of(1, "Jane", dt20220301), Row.of(2, "Bill", 
dt20220301));
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName), 
rowsOn20220301);
+      List<Row> rowsOn20220301Spark =
+          Lists.newArrayList(
+              Row.of(1, "Jane", dt20220301Spark), Row.of(2, "Bill", 
dt20220301Spark));
+      TestHelpers.assertRows(
+          convertToFlinkRows(
+              spark.sql(
+                  String.format(
+                      Locale.ROOT,
+                      "SELECT * FROM %s.db.%s WHERE dt < '2022-03-02'",
+                      catalogName,
+                      tableName)),
+              3),
+          rowsOn20220301Spark);
+
+      List<Row> rowsOn20220302 =
+          Lists.newArrayList(Row.of(1, "Jane", dt20220302), Row.of(2, "Jane", 
dt20220302));
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName), 
rowsOn20220302);
+      List<Row> rowsOn20220302Spark =
+          Lists.newArrayList(
+              Row.of(1, "Jane", dt20220302Spark), Row.of(2, "Jane", 
dt20220302Spark));
+      TestHelpers.assertRows(
+          convertToFlinkRows(
+              spark.sql(
+                  String.format(
+                      Locale.ROOT,
+                      "SELECT * FROM %s.db.%s WHERE dt = '2022-03-02'",
+                      catalogName,
+                      tableName)),
+              3),
+          rowsOn20220302Spark);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Iterables.concat(rowsOn20220301, 
rowsOn20220302)));
+      TestHelpers.assertRows(
+          convertToFlinkRows(
+              spark.sql(
+                  String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", 
catalogName, tableName)),
+              3),
+          Lists.newArrayList(Iterables.concat(rowsOn20220301Spark, 
rowsOn20220302Spark)));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
+  }
+
+  @TestTemplate
+  public void testPrimaryKeyEqualToPartitionKey() {
+    // This is an SQL based reproduction of 
TestFlinkIcebergSinkV2#testUpsertOnDataKey
+    String tableName = "upsert_on_id_key";
+    try {
+      sql(
+          "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, PRIMARY 
KEY(id) NOT ENFORCED) "
+              + "PARTITIONED BY (id) WITH %s",
+          tableName, toWithClause(tableUpsertProps));
+
+      sql("INSERT INTO %s VALUES " + "(1, 'Bill')," + "(1, 'Jane')," + "(2, 
'Bill')", tableName);
+
+      List<Row> rows = Lists.newArrayList(Row.of(1, "Jane"), Row.of(2, 
"Bill"));
+      TestHelpers.assertRows(sql("SELECT * FROM %s", tableName), rows);
+      TestHelpers.assertRows(
+          convertToFlinkRows(
+              spark.sql(
+                  String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", 
catalogName, tableName)),
+              2),
+          rows);
+
+      sql("INSERT INTO %s VALUES " + "(1, 'Bill')," + "(2, 'Jane')", 
tableName);
+
+      List<Row> rows2 = Lists.newArrayList(Row.of(1, "Bill"), Row.of(2, 
"Jane"));
+      TestHelpers.assertRows(sql("SELECT * FROM %s", tableName), rows2);
+      spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s", 
catalogName, tableName));
+      TestHelpers.assertRows(
+          convertToFlinkRows(
+              spark.sql(
+                  String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", 
catalogName, tableName)),
+              2),
+          rows2);
+
+      sql("INSERT INTO %s VALUES " + "(3, 'Bill')," + "(4, 'Jane')", 
tableName);
+
+      List<Row> rows3 =
+          Lists.newArrayList(
+              Row.of(1, "Bill"), Row.of(2, "Jane"), Row.of(3, "Bill"), 
Row.of(4, "Jane"));
+      TestHelpers.assertRows(sql("SELECT * FROM %s", tableName), rows3);
+      spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s", 
catalogName, tableName));
+      TestHelpers.assertRows(
+          convertToFlinkRows(
+              spark.sql(
+                  String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", 
catalogName, tableName)),
+              2),
+          rows3);
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
+  }
+
+  @TestTemplate
+  public void testPrimaryKeyFieldsAtBeginningOfSchema() {
+    String tableName = "upsert_on_pk_at_schema_start";
+    LocalDate dt = LocalDate.of(2022, 3, 1);
+    Date dtSpark = 
Date.from(dt.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant());
+
+    try {
+      sql(
+          "CREATE TABLE %s(id INT, dt DATE NOT NULL, name STRING NOT NULL, "
+              + "PRIMARY KEY(id,dt) NOT ENFORCED) "
+              + "PARTITIONED BY (dt) WITH %s",
+          tableName, toWithClause(tableUpsertProps));
+
+      sql(
+          "INSERT INTO %s VALUES "
+              + "(1, DATE '2022-03-01', 'Andy'),"
+              + "(1, DATE '2022-03-01', 'Bill'),"
+              + "(2, DATE '2022-03-01', 'Jane')",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(1, dt, "Bill"), Row.of(2, dt, "Jane")));
+      TestHelpers.assertRows(
+          convertToFlinkRows(
+              spark.sql(
+                  String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", 
catalogName, tableName)),
+              3),
+          Lists.newArrayList(Row.of(1, dtSpark, "Bill"), Row.of(2, dtSpark, 
"Jane")));
+
+      sql(
+          "INSERT INTO %s VALUES "
+              + "(1, DATE '2022-03-01', 'Jane'),"
+              + "(2, DATE '2022-03-01', 'Bill')",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of(1, dt, "Jane"), Row.of(2, dt, "Bill")));
+      spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s", 
catalogName, tableName));
+      TestHelpers.assertRows(
+          convertToFlinkRows(
+              spark.sql(
+                  String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", 
catalogName, tableName)),
+              3),
+          Lists.newArrayList(Row.of(1, dtSpark, "Jane"), Row.of(2, dtSpark, 
"Bill")));
+
+      sql(
+          "INSERT INTO %s VALUES "
+              + "(3, DATE '2022-03-01', 'Duke'),"
+              + "(4, DATE '2022-03-01', 'Leon')",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(
+              Row.of(1, dt, "Jane"),
+              Row.of(2, dt, "Bill"),
+              Row.of(3, dt, "Duke"),
+              Row.of(4, dt, "Leon")));
+      spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s", 
catalogName, tableName));
+      TestHelpers.assertRows(
+          convertToFlinkRows(
+              spark.sql(
+                  String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", 
catalogName, tableName)),
+              3),
+          Lists.newArrayList(
+              Row.of(1, dtSpark, "Jane"),
+              Row.of(2, dtSpark, "Bill"),
+              Row.of(3, dtSpark, "Duke"),
+              Row.of(4, dtSpark, "Leon")));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
+  }
+
+  @TestTemplate
+  public void testPrimaryKeyFieldsAtEndOfTableSchema() {
+    // This is the same test case as testPrimaryKeyFieldsAtBeginningOfSchema, 
but the primary key
+    // fields
+    // are located at the end of the flink schema.
+    String tableName = "upsert_on_pk_at_schema_end";
+    LocalDate dt = LocalDate.of(2022, 3, 1);
+    Date dtSpark = 
Date.from(dt.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant());
+    try {
+      sql(
+          "CREATE TABLE %s(name STRING NOT NULL, id INT, dt DATE NOT NULL, "
+              + "PRIMARY KEY(id,dt) NOT ENFORCED) "
+              + "PARTITIONED BY (dt) WITH %s",
+          tableName, toWithClause(tableUpsertProps));
+
+      sql(
+          "INSERT INTO %s VALUES "
+              + "('Andy', 1, DATE '2022-03-01'),"
+              + "('Bill', 1, DATE '2022-03-01'),"
+              + "('Jane', 2, DATE '2022-03-01')",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of("Bill", 1, dt), Row.of("Jane", 2, dt)));
+      TestHelpers.assertRows(
+          convertToFlinkRows(
+              spark.sql(
+                  String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", 
catalogName, tableName)),
+              3),
+          Lists.newArrayList(Row.of("Bill", 1, dtSpark), Row.of("Jane", 2, 
dtSpark)));
+
+      sql(
+          "INSERT INTO %s VALUES "
+              + "('Jane', 1, DATE '2022-03-01'),"
+              + "('Bill', 2, DATE '2022-03-01')",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(Row.of("Jane", 1, dt), Row.of("Bill", 2, dt)));
+      spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s", 
catalogName, tableName));
+      TestHelpers.assertRows(
+          convertToFlinkRows(
+              spark.sql(
+                  String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", 
catalogName, tableName)),
+              3),
+          Lists.newArrayList(Row.of("Jane", 1, dtSpark), Row.of("Bill", 2, 
dtSpark)));
+
+      sql(
+          "INSERT INTO %s VALUES "
+              + "('Duke', 3, DATE '2022-03-01'),"
+              + "('Leon', 4, DATE '2022-03-01')",
+          tableName);
+
+      TestHelpers.assertRows(
+          sql("SELECT * FROM %s", tableName),
+          Lists.newArrayList(
+              Row.of("Jane", 1, dt),
+              Row.of("Bill", 2, dt),
+              Row.of("Duke", 3, dt),
+              Row.of("Leon", 4, dt)));
+      spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s", 
catalogName, tableName));
+      TestHelpers.assertRows(
+          convertToFlinkRows(
+              spark.sql(
+                  String.format(Locale.ROOT, "SELECT * FROM %s.db.%s", 
catalogName, tableName)),
+              3),
+          Lists.newArrayList(
+              Row.of("Jane", 1, dtSpark),
+              Row.of("Bill", 2, dtSpark),
+              Row.of("Duke", 3, dtSpark),
+              Row.of("Leon", 4, dtSpark)));
+    } finally {
+      sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+    }
+  }
+}
diff --git 
a/backends-clickhouse/src-spark33/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java
 
b/backends-clickhouse/src-spark33/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java
new file mode 100644
index 0000000000..103cdd7438
--- /dev/null
+++ 
b/backends-clickhouse/src-spark33/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.execution.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.source.TestPositionDeletesTable;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runners.Parameterized;
+
+public class TestPositionDeletesTableGluten extends TestPositionDeletesTable {
+  private static final Map<String, String> CATALOG_PROPS =
+      ImmutableMap.of("type", "hive", "default-namespace", "default", 
"cache-enabled", "false");
+  private static ClickHouseIcebergHiveTableSupport hiveTableSupport;
+
+  @BeforeClass
+  public static void startMetastoreAndSpark() {
+    metastore = new TestHiveMetastore();
+    metastore.start();
+    hiveConf = metastore.hiveConf();
+    hiveTableSupport = new ClickHouseIcebergHiveTableSupport();
+    hiveTableSupport.initSparkConf(
+        hiveConf.get("hive.metastore.uris"), 
SparkCatalogConfig.HIVE.catalogName(), null);
+    hiveTableSupport.initializeSession();
+    spark = hiveTableSupport.spark();
+    sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    catalog =
+        (HiveCatalog)
+            CatalogUtil.loadCatalog(
+                HiveCatalog.class.getName(), "hive", ImmutableMap.of(), 
hiveConf);
+
+    try {
+      catalog.createNamespace(Namespace.of(new String[] {"default"}));
+    } catch (AlreadyExistsException ignore) {
+    }
+  }
+
+  @AfterClass
+  public static void stopMetastoreAndSpark() throws Exception {
+    catalog = null;
+    if (metastore != null) {
+      metastore.stop();
+      metastore = null;
+    }
+    hiveTableSupport.clean();
+  }
+
+  @Parameterized.Parameters(
+      name =
+          "formatVersion = {0}, catalogName = {1}, implementation = {2}, 
config = {3}, fileFormat = {4}")
+  public static Object[][] parameters() {
+    // ignore ORC and AVRO, ch backend only support PARQUET
+    return new Object[][] {
+      {
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        CATALOG_PROPS,
+        FileFormat.PARQUET
+      }
+    };
+  }
+
+  public TestPositionDeletesTableGluten(
+      String catalogName, String implementation, Map<String, String> config, 
FileFormat format) {
+    super(catalogName, implementation, config, format);
+  }
+}
diff --git 
a/backends-clickhouse/src-spark33/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
 
b/backends-clickhouse/src-spark33/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
new file mode 100644
index 0000000000..c9e6a8ee5e
--- /dev/null
+++ 
b/backends-clickhouse/src-spark33/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.execution.iceberg
+
+import com.google.common.base.Strings
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
+
+class ClickHouseIcebergHiveTableSupport {
+
+  private val sparkConf: SparkConf = new SparkConf()
+
+  private var _hiveSpark: SparkSession = _
+
+  def spark: SparkSession = _hiveSpark
+
+  def initSparkConf(url: String, catalog: String, path: String): SparkConf = {
+    import org.apache.gluten.backendsapi.clickhouse.CHConfig._
+
+    sparkConf
+      .set("spark.plugins", "org.apache.gluten.GlutenPlugin")
+      .set("spark.memory.offHeap.enabled", "true")
+      .set("spark.memory.offHeap.size", "536870912")
+      .set("spark.sql.catalogImplementation", "hive")
+      .set("spark.sql.adaptive.enabled", "true")
+      .set("spark.sql.files.maxPartitionBytes", "1g")
+      .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+      .set("spark.sql.shuffle.partitions", "5")
+      .set("spark.sql.adaptive.enabled", "false")
+      .set("spark.sql.files.minPartitionNum", "1")
+      .set(ClickHouseConfig.CLICKHOUSE_WORKER_ID, "1")
+      .set("spark.gluten.sql.columnar.iterator", "true")
+      .set("spark.gluten.sql.columnar.hashagg.enablefinal", "true")
+      .set("spark.gluten.sql.enable.native.validation", "false")
+      .set("spark.gluten.sql.parquet.maxmin.index", "true")
+      .set("spark.hive.exec.dynamic.partition.mode", "nonstrict")
+      .set("spark.gluten.supported.hive.udfs", "my_add")
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+      .set("spark.sql.adaptive.enabled", "true")
+      .set("spark.sql.shuffle.partitions", "2")
+      .set("spark.memory.offHeap.size", "2g")
+      .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+      .set("spark.sql.autoBroadcastJoinThreshold", "-1")
+      .setCHConfig("use_local_format", true)
+      .set("spark.sql.extensions",
+        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+      .set("spark.sql.catalog.spark_catalog", 
"org.apache.iceberg.spark.SparkSessionCatalog")
+      .set("spark.sql.catalog.spark_catalog.type", "hive")
+      .setMaster("local[*]")
+    if (!Strings.isNullOrEmpty(url)) {
+      sparkConf.set("spark.hadoop.hive.metastore.uris", url)
+    }
+    if (!Strings.isNullOrEmpty(catalog)) {
+      sparkConf.set("spark.sql.catalog." + catalog, 
"org.apache.iceberg.spark.SparkCatalog")
+        .set("spark.sql.catalog." + catalog + ".type", "hive")
+    }
+    if (!Strings.isNullOrEmpty(path)) {
+      sparkConf.set("spark.sql.warehouse.dir", path)
+    }
+    sparkConf
+  }
+
+  def initializeSession(): Unit = {
+    if (_hiveSpark == null) {
+      _hiveSpark =
+        SparkSession
+          .builder()
+          .config(sparkConf)
+          .enableHiveSupport()
+          .getOrCreate()
+    }
+  }
+
+  def clean(): Unit = {
+    try {
+      if (_hiveSpark != null) {
+        _hiveSpark.stop()
+        _hiveSpark = null
+      }
+    } finally {
+      SparkSession.clearActiveSession()
+      SparkSession.clearDefaultSession()
+    }
+  }
+}
diff --git 
a/backends-clickhouse/src-spark35/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java
 
b/backends-clickhouse/src-spark35/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java
new file mode 100644
index 0000000000..3421a094fa
--- /dev/null
+++ 
b/backends-clickhouse/src-spark35/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.execution.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.source.TestPositionDeletesTable;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith({ParameterizedTestExtension.class})
+public class TestPositionDeletesTableGluten extends TestPositionDeletesTable {
+  private static final Map<String, String> CATALOG_PROPS =
+      ImmutableMap.of("type", "hive", "default-namespace", "default", 
"cache-enabled", "false");
+  private static ClickHouseIcebergHiveTableSupport hiveTableSupport;
+
+  @BeforeAll
+  public static void startMetastoreAndSpark() {
+    metastore = new TestHiveMetastore();
+    metastore.start();
+    hiveConf = metastore.hiveConf();
+    hiveTableSupport = new ClickHouseIcebergHiveTableSupport();
+    hiveTableSupport.initSparkConf(
+        hiveConf.get("hive.metastore.uris"), 
SparkCatalogConfig.HIVE.catalogName(), null);
+    hiveTableSupport.initializeSession();
+    spark = hiveTableSupport.spark();
+    sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+    catalog =
+        (HiveCatalog)
+            CatalogUtil.loadCatalog(
+                HiveCatalog.class.getName(), "hive", ImmutableMap.of(), 
hiveConf);
+
+    try {
+      catalog.createNamespace(Namespace.of(new String[] {"default"}));
+    } catch (AlreadyExistsException ignore) {
+    }
+  }
+
+  @AfterAll
+  public static void stopMetastoreAndSpark() throws Exception {
+    catalog = null;
+    if (metastore != null) {
+      metastore.stop();
+      metastore = null;
+    }
+    hiveTableSupport.clean();
+  }
+
+  public TestPositionDeletesTableGluten() {}
+
+  @Parameters(name = "catalogName = {1}, implementation = {2}, config = {3}, 
fileFormat = {4}")
+  public static Object[][] parameters() {
+    // ignore ORC and AVRO, ch backend only support PARQUET
+    return new Object[][] {
+      {
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        CATALOG_PROPS,
+        FileFormat.PARQUET
+      }
+    };
+  }
+}
diff --git 
a/backends-clickhouse/src-spark35/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
 
b/backends-clickhouse/src-spark35/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
new file mode 100644
index 0000000000..c9e6a8ee5e
--- /dev/null
+++ 
b/backends-clickhouse/src-spark35/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.execution.iceberg
+
+import com.google.common.base.Strings
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import 
org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
+
+class ClickHouseIcebergHiveTableSupport {
+
+  private val sparkConf: SparkConf = new SparkConf()
+
+  private var _hiveSpark: SparkSession = _
+
+  def spark: SparkSession = _hiveSpark
+
+  def initSparkConf(url: String, catalog: String, path: String): SparkConf = {
+    import org.apache.gluten.backendsapi.clickhouse.CHConfig._
+
+    sparkConf
+      .set("spark.plugins", "org.apache.gluten.GlutenPlugin")
+      .set("spark.memory.offHeap.enabled", "true")
+      .set("spark.memory.offHeap.size", "536870912")
+      .set("spark.sql.catalogImplementation", "hive")
+      .set("spark.sql.adaptive.enabled", "true")
+      .set("spark.sql.files.maxPartitionBytes", "1g")
+      .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+      .set("spark.sql.shuffle.partitions", "5")
+      .set("spark.sql.adaptive.enabled", "false")
+      .set("spark.sql.files.minPartitionNum", "1")
+      .set(ClickHouseConfig.CLICKHOUSE_WORKER_ID, "1")
+      .set("spark.gluten.sql.columnar.iterator", "true")
+      .set("spark.gluten.sql.columnar.hashagg.enablefinal", "true")
+      .set("spark.gluten.sql.enable.native.validation", "false")
+      .set("spark.gluten.sql.parquet.maxmin.index", "true")
+      .set("spark.hive.exec.dynamic.partition.mode", "nonstrict")
+      .set("spark.gluten.supported.hive.udfs", "my_add")
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+      .set("spark.sql.adaptive.enabled", "true")
+      .set("spark.sql.shuffle.partitions", "2")
+      .set("spark.memory.offHeap.size", "2g")
+      .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+      .set("spark.sql.autoBroadcastJoinThreshold", "-1")
+      .setCHConfig("use_local_format", true)
+      .set("spark.sql.extensions",
+        "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+      .set("spark.sql.catalog.spark_catalog", 
"org.apache.iceberg.spark.SparkSessionCatalog")
+      .set("spark.sql.catalog.spark_catalog.type", "hive")
+      .setMaster("local[*]")
+    if (!Strings.isNullOrEmpty(url)) {
+      sparkConf.set("spark.hadoop.hive.metastore.uris", url)
+    }
+    if (!Strings.isNullOrEmpty(catalog)) {
+      sparkConf.set("spark.sql.catalog." + catalog, 
"org.apache.iceberg.spark.SparkCatalog")
+        .set("spark.sql.catalog." + catalog + ".type", "hive")
+    }
+    if (!Strings.isNullOrEmpty(path)) {
+      sparkConf.set("spark.sql.warehouse.dir", path)
+    }
+    sparkConf
+  }
+
+  def initializeSession(): Unit = {
+    if (_hiveSpark == null) {
+      _hiveSpark =
+        SparkSession
+          .builder()
+          .config(sparkConf)
+          .enableHiveSupport()
+          .getOrCreate()
+    }
+  }
+
+  def clean(): Unit = {
+    try {
+      if (_hiveSpark != null) {
+        _hiveSpark.stop()
+        _hiveSpark = null
+      }
+    } finally {
+      SparkSession.clearActiveSession()
+      SparkSession.clearDefaultSession()
+    }
+  }
+}
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala
index bb72e34e3e..47307cf2fa 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala
@@ -37,7 +37,9 @@ abstract class GlutenClickHouseCacheBaseTestSuite
   override protected val queriesResults: String = rootPath + "queries-output"
 
   // Abstract methods to be implemented by subclasses
-  protected def cleanupCache(): Unit = cacheHelper.deleteCache(spark, 
tablesPath)
+  protected def cleanupCache(): Unit =
+    cacheHelper.deleteCache(spark, s"$tablesPath/lineitem", 
s"$tablesPath/$SPARK_DIR_NAME")
+
   protected def copyDataIfNeeded(): Unit
 
   // Initialize the cache helper - accessible to subclasses
diff --git 
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala
 
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala
index 6cdc16e2a1..4b694086a9 100644
--- 
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala
+++ 
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala
@@ -45,22 +45,21 @@ class CacheTestHelper(val TMP_PREFIX: String) {
   }
 
   /** Delete cache files for all tables in the data path */
-  def deleteCache(spark: SparkSession, dataPath: String): Unit = {
-    val targetFile = new Path(dataPath)
-    val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
-    fs.listStatus(targetFile)
-      .foreach(
-        table => {
-          if (table.isDirectory) {
-            fs.listStatus(table.getPath)
-              .foreach(
-                data => {
-                  if (data.isFile) {
-                    CHNativeCacheManager
-                      .removeFiles(data.getPath.toUri.getPath.substring(1), 
CACHE_NAME)
-                  }
-                })
-          }
-        })
+  def deleteCache(spark: SparkSession, dataPaths: String*): Unit = {
+    dataPaths.foreach(
+      dataPath => {
+        val targetFile = new Path(dataPath)
+        val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
+        if (fs.isDirectory(targetFile)) {
+          fs.listStatus(targetFile)
+            .foreach(
+              data => {
+                if (data.isFile) {
+                  CHNativeCacheManager
+                    .removeFiles(data.getPath.toUri.getPath.substring(1), 
CACHE_NAME)
+                }
+              })
+        }
+      })
   }
 }
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp 
b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
index fedd043500..dfbbe4267d 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
@@ -59,12 +59,12 @@ DB::Columns BaseReader::addVirtualColumn(DB::Chunk 
dataChunk, size_t rowNum) con
         std::back_inserter(res_columns),
         [&](const auto & column) -> DB::ColumnPtr
         {
-            if (readHeader.has(column.name))
-                return read_columns[readHeader.getPositionByName(column.name)];
             if (auto it = 
normalized_partition_values.find(boost::to_lower_copy(column.name)); it != 
normalized_partition_values.end())
                 return createPartitionColumn(it->second, column.type, rows);
             if (file->fileMetaColumns().virtualColumn(column.name))
                 return file->fileMetaColumns().createMetaColumn(column.name, 
column.type, rows);
+            if (readHeader.has(column.name))
+                return read_columns[readHeader.getPositionByName(column.name)];
             throw DB::Exception(
                 DB::ErrorCodes::LOGICAL_ERROR, "Not found column = {} when 
reading file: {}.", column.name, file->getURIPath());
         });


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to