This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 33180440f5 [GLUTEN-8846][CH] [Part 4] Add full-chain UT (#9256)
33180440f5 is described below
commit 33180440f5daf668d51cb24a619ca129ae1d3966
Author: jlf <[email protected]>
AuthorDate: Tue Apr 22 17:16:57 2025 +0800
[GLUTEN-8846][CH] [Part 4] Add full-chain UT (#9256)
* [CH][draft] iceberg UT
* [CH][draft] add UT
* [CH][draft] run iceberg UT with diff spark version
* Fixed occasional UT anomalies
---
backends-clickhouse/pom.xml | 396 +++++++++++++--
.../gluten/execution/iceberg/TestFlinkUpsert.java | 538 +++++++++++++++++++++
.../iceberg/TestPositionDeletesTableGluten.java | 91 ++++
.../ClickHouseIcebergHiveTableSupport.scala | 101 ++++
.../iceberg/TestPositionDeletesTableGluten.java | 89 ++++
.../ClickHouseIcebergHiveTableSupport.scala | 101 ++++
.../cache/GlutenClickHouseCacheBaseTestSuite.scala | 4 +-
.../org/apache/gluten/utils/CacheTestHelper.scala | 33 +-
.../Storages/SubstraitSource/FileReader.cpp | 4 +-
9 files changed, 1299 insertions(+), 58 deletions(-)
diff --git a/backends-clickhouse/pom.xml b/backends-clickhouse/pom.xml
index 8a20a0a2cb..f095d49903 100644
--- a/backends-clickhouse/pom.xml
+++ b/backends-clickhouse/pom.xml
@@ -43,11 +43,189 @@
</dependency>
</dependencies>
</profile>
+ <profile>
+ <id>spark-3.3</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-spark33-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.basedir}/src-spark33/main/scala</source>
+ <source>${project.basedir}src-spark33/main/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-spark33-resources</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>add-resource</goal>
+ </goals>
+ <configuration>
+ <resources>
+ <resource>
+
<directory>${project.basedir}/src-spark33/main/resources</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-spark33-test-sources</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.basedir}/src-spark33/test/scala</source>
+ <source>${project.basedir}/src-spark33/test/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-spark33-test-resources</id>
+ <phase>generate-test-resources</phase>
+ <goals>
+ <goal>add-test-resource</goal>
+ </goals>
+ <configuration>
+ <resources>
+ <resource>
+
<directory>${project.basedir}/src-spark33/test/resources</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ <profile>
+ <id>spark-3.5</id>
+ <properties>
+ <slf4j.version>1.7.36</slf4j.version>
+ </properties>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>add-spark35-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>add-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.basedir}/src-spark35/main/scala</source>
+ <source>${project.basedir}src-spark35/main/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-spark35-resources</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>add-resource</goal>
+ </goals>
+ <configuration>
+ <resources>
+ <resource>
+
<directory>${project.basedir}/src-spark35/main/resources</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-spark35-test-sources</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.basedir}/src-spark35/test/scala</source>
+ <source>${project.basedir}/src-spark35/test/java</source>
+ </sources>
+ </configuration>
+ </execution>
+ <execution>
+ <id>add-spark35-test-resources</id>
+ <phase>generate-test-resources</phase>
+ <goals>
+ <goal>add-test-resource</goal>
+ </goals>
+ <configuration>
+ <resources>
+ <resource>
+
<directory>${project.basedir}/src-spark35/test/resources</directory>
+ </resource>
+ </resources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
<profile>
<id>iceberg</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
+ <properties>
+ <flink.version>1.16.2</flink.version>
+ </properties>
<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
@@ -64,6 +242,133 @@
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-collection-compat_${scala.binary.version}</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-api</artifactId>
+ <version>${iceberg.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-api</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-flink-runtime-1.16</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-flink-1.16</artifactId>
+ <version>${iceberg.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-flink-1.16</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-hive-metastore</artifactId>
+ <version>${iceberg.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-hive-metastore</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <!-- flink iceberg ut need junit 5, and use 5.10.1 -->
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter</artifactId>
+ <scope>test</scope>
+ <version>5.10.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ <version>5.10.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-bridge-base</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-api-java-bridge</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- flink not support scala 2.13 -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-planner_2.12</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+
<artifactId>iceberg-spark-${sparkbundle.version}_${scala.binary.version}</artifactId>
+ <version>${iceberg.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+
<artifactId>iceberg-spark-${sparkbundle.version}_${scala.binary.version}</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-data</artifactId>
+ <version>${iceberg.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-data</artifactId>
+ <version>${iceberg.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</profile>
<profile>
@@ -116,6 +421,10 @@
</profile>
</profiles>
+ <properties>
+ <surefire.skipTests>true</surefire.skipTests>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.gluten</groupId>
@@ -240,44 +549,44 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.hive.hcatalog</groupId>
- <artifactId>hive-hcatalog-core</artifactId>
- <version>2.3.9</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.pentaho</groupId>
- <artifactId>pentaho-aggdesigner-algorithm</artifactId>
- </exclusion>
- <exclusion>
- <groupId>net.minidev</groupId>
- <artifactId>json-smart</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>guava</artifactId>
- <groupId>com.google.guava</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-common</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>hadoop-hdfs</artifactId>
- <groupId>org.apache.hadoop</groupId>
- </exclusion>
- <exclusion>
- <artifactId>protobuf-java</artifactId>
- <groupId>com.google.protobuf</groupId>
- </exclusion>
- <exclusion>
- <artifactId>jdk.tools</artifactId>
- <groupId>jdk.tools</groupId>
- </exclusion>
- </exclusions>
+ <groupId>org.apache.hive.hcatalog</groupId>
+ <artifactId>hive-hcatalog-core</artifactId>
+ <version>2.3.9</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.pentaho</groupId>
+ <artifactId>pentaho-aggdesigner-algorithm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>net.minidev</groupId>
+ <artifactId>json-smart</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </exclusion>
+ <exclusion>
+ <artifactId>guava</artifactId>
+ <groupId>com.google.guava</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-common</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>hadoop-hdfs</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>protobuf-java</artifactId>
+ <groupId>com.google.protobuf</groupId>
+ </exclusion>
+ <exclusion>
+ <artifactId>jdk.tools</artifactId>
+ <groupId>jdk.tools</groupId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -551,6 +860,17 @@
<treatWarningsAsErrors>true</treatWarningsAsErrors>
</configuration>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <includes>
+ <include>**/*Test.java</include>
+ <include>**/Test*.java</include>
+ </includes>
+ <skipTests>${surefire.skipTests}</skipTests>
+ </configuration>
+ </plugin>
</plugins>
</build>
</project>
diff --git
a/backends-clickhouse/src-spark33/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java
b/backends-clickhouse/src-spark33/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java
new file mode 100644
index 0000000000..429feab086
--- /dev/null
+++
b/backends-clickhouse/src-spark33/test/java/org/apache/gluten/execution/iceberg/TestFlinkUpsert.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution.iceberg;
+
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.util.Date;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Parameter;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.flink.CatalogTestBase;
+import org.apache.iceberg.flink.MiniClusterResource;
+import org.apache.iceberg.flink.TestHelpers;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+
+public class TestFlinkUpsert extends CatalogTestBase {
+
+ @Parameter(index = 2)
+ private FileFormat format;
+
+ @Parameter(index = 3)
+ private boolean isStreamingJob;
+
+ private final Map<String, String> tableUpsertProps = Maps.newHashMap();
+ private TableEnvironment tEnv;
+ private SparkSession spark;
+ private ClickHouseIcebergHiveTableSupport hiveTableSupport;
+
+ @Parameters(name = "catalogName={0}, baseNamespace={1}, format={2},
isStreaming={3}")
+ public static List<Object[]> parameters() {
+ List<Object[]> parameters = Lists.newArrayList();
+ // ignore ORC and AVRO, ch backend only support PARQUET
+ for (FileFormat format : new FileFormat[] {FileFormat.PARQUET}) {
+ for (Boolean isStreaming : new Boolean[] {true, false}) {
+ // Only test with one catalog as this is a file operation concern.
+ // FlinkCatalogTestBase requires the catalog name start with
testhadoop if using hadoop
+ // catalog.
+ String catalogName = "testhive";
+ Namespace baseNamespace = Namespace.empty();
+ parameters.add(new Object[] {catalogName, baseNamespace, format,
isStreaming});
+ }
+ }
+ return parameters;
+ }
+
+ @Override
+ protected TableEnvironment getTableEnv() {
+ if (tEnv == null) {
+ synchronized (this) {
+ EnvironmentSettings.Builder settingsBuilder =
EnvironmentSettings.newInstance();
+ if (isStreamingJob) {
+ settingsBuilder.inStreamingMode();
+ StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment(
+ MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+ env.enableCheckpointing(400);
+ env.setMaxParallelism(2);
+ env.setParallelism(2);
+ tEnv = StreamTableEnvironment.create(env, settingsBuilder.build());
+ } else {
+ settingsBuilder.inBatchMode();
+ tEnv = TableEnvironment.create(settingsBuilder.build());
+ }
+ }
+ }
+ return tEnv;
+ }
+
+ @Override
+ @BeforeEach
+ public void before() {
+ super.before();
+ sql("CREATE DATABASE IF NOT EXISTS %s", flinkDatabase);
+ sql("USE CATALOG %s", catalogName);
+ sql("USE %s", DATABASE);
+ tableUpsertProps.put(TableProperties.FORMAT_VERSION, "2");
+ tableUpsertProps.put(TableProperties.UPSERT_ENABLED, "true");
+ tableUpsertProps.put(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+
+ hiveTableSupport = new ClickHouseIcebergHiveTableSupport();
+ hiveTableSupport.initSparkConf(
+ hiveConf.get("hive.metastore.uris"),
+ catalogName,
+ String.format("file://%s", this.warehouseRoot()));
+ hiveTableSupport.initializeSession();
+ spark = hiveTableSupport.spark();
+ }
+
+ @Override
+ @AfterEach
+ public void clean() {
+ sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+ super.clean();
+
+ hiveTableSupport.clean();
+ }
+
+ static String toWithClause(Map<String, String> props) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("(");
+ int propCount = 0;
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ if (propCount > 0) {
+ builder.append(",");
+ }
+ builder
+ .append("'")
+ .append(entry.getKey())
+ .append("'")
+ .append("=")
+ .append("'")
+ .append(entry.getValue())
+ .append("'");
+ propCount++;
+ }
+ builder.append(")");
+ return builder.toString();
+ }
+
+ @TestTemplate
+ public void testUpsertAndQuery() {
+ String tableName = "test_upsert_query";
+ LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
+ LocalDate dt20220302 = LocalDate.of(2022, 3, 2);
+ Date dt20220301Spark =
+
Date.from(dt20220301.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant());
+ Date dt20220302Spark =
+
Date.from(dt20220302.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant());
+
+ sql(
+ "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, dt DATE, "
+ + "PRIMARY KEY(id,dt) NOT ENFORCED) "
+ + "PARTITIONED BY (dt) WITH %s",
+ tableName, toWithClause(tableUpsertProps));
+
+ try {
+ sql(
+ "INSERT INTO %s VALUES "
+ + "(1, 'Bill', DATE '2022-03-01'),"
+ + "(1, 'Jane', DATE '2022-03-01'),"
+ + "(2, 'Jane', DATE '2022-03-01')",
+ tableName);
+
+ sql(
+ "INSERT INTO %s VALUES "
+ + "(2, 'Bill', DATE '2022-03-01'),"
+ + "(1, 'Jane', DATE '2022-03-02'),"
+ + "(2, 'Jane', DATE '2022-03-02')",
+ tableName);
+
+ List<Row> rowsOn20220301 =
+ Lists.newArrayList(Row.of(1, "Jane", dt20220301), Row.of(2, "Bill",
dt20220301));
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName),
rowsOn20220301);
+ List<Row> rowsOn20220301Spark =
+ Lists.newArrayList(
+ Row.of(1, "Jane", dt20220301Spark), Row.of(2, "Bill",
dt20220301Spark));
+ TestHelpers.assertRows(
+ convertToFlinkRows(
+ spark.sql(
+ String.format(
+ Locale.ROOT,
+ "SELECT * FROM %s.db.%s WHERE dt < '2022-03-02'",
+ catalogName,
+ tableName)),
+ 3),
+ rowsOn20220301Spark);
+
+ List<Row> rowsOn20220302 =
+ Lists.newArrayList(Row.of(1, "Jane", dt20220302), Row.of(2, "Jane",
dt20220302));
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName),
rowsOn20220302);
+ List<Row> rowsOn20220302Spark =
+ Lists.newArrayList(
+ Row.of(1, "Jane", dt20220302Spark), Row.of(2, "Jane",
dt20220302Spark));
+ TestHelpers.assertRows(
+ convertToFlinkRows(
+ spark.sql(
+ String.format(
+ Locale.ROOT,
+ "SELECT * FROM %s.db.%s WHERE dt = '2022-03-02'",
+ catalogName,
+ tableName)),
+ 3),
+ rowsOn20220302Spark);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Iterables.concat(rowsOn20220301,
rowsOn20220302)));
+ TestHelpers.assertRows(
+ convertToFlinkRows(
+ spark.sql(
+ String.format(Locale.ROOT, "SELECT * FROM %s.db.%s",
catalogName, tableName)),
+ 3),
+ Lists.newArrayList(Iterables.concat(rowsOn20220301Spark,
rowsOn20220302Spark)));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+ }
+
+ private List<Row> convertToFlinkRows(Dataset<org.apache.spark.sql.Row> rows,
int columnCount) {
+ return rows.collectAsList().stream()
+ .map(
+ r -> {
+ switch (columnCount) {
+ case 1:
+ return Row.of(r.get(0));
+ case 2:
+ return Row.of(r.get(0), r.get(1));
+ case 3:
+ return Row.of(r.get(0), r.get(1), r.get(2));
+ default:
+ throw new IllegalArgumentException("Unsupported column
count: " + columnCount);
+ }
+ })
+ .collect(Collectors.toList());
+ }
+
+ @TestTemplate
+ public void testUpsertOptions() {
+ String tableName = "test_upsert_options";
+ LocalDate dt20220301 = LocalDate.of(2022, 3, 1);
+ LocalDate dt20220302 = LocalDate.of(2022, 3, 2);
+ Date dt20220301Spark =
+
Date.from(dt20220301.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant());
+ Date dt20220302Spark =
+
Date.from(dt20220302.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant());
+
+ Map<String, String> optionsUpsertProps = Maps.newHashMap(tableUpsertProps);
+ optionsUpsertProps.remove(TableProperties.UPSERT_ENABLED);
+ sql(
+ "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, dt DATE, "
+ + "PRIMARY KEY(id,dt) NOT ENFORCED) "
+ + "PARTITIONED BY (dt) WITH %s",
+ tableName, toWithClause(optionsUpsertProps));
+
+ try {
+ sql(
+ "INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ VALUES "
+ + "(1, 'Bill', DATE '2022-03-01'),"
+ + "(1, 'Jane', DATE '2022-03-01'),"
+ + "(2, 'Jane', DATE '2022-03-01')",
+ tableName);
+
+ sql(
+ "INSERT INTO %s /*+ OPTIONS('upsert-enabled'='true')*/ VALUES "
+ + "(2, 'Bill', DATE '2022-03-01'),"
+ + "(1, 'Jane', DATE '2022-03-02'),"
+ + "(2, 'Jane', DATE '2022-03-02')",
+ tableName);
+
+ List<Row> rowsOn20220301 =
+ Lists.newArrayList(Row.of(1, "Jane", dt20220301), Row.of(2, "Bill",
dt20220301));
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s WHERE dt < '2022-03-02'", tableName),
rowsOn20220301);
+ List<Row> rowsOn20220301Spark =
+ Lists.newArrayList(
+ Row.of(1, "Jane", dt20220301Spark), Row.of(2, "Bill",
dt20220301Spark));
+ TestHelpers.assertRows(
+ convertToFlinkRows(
+ spark.sql(
+ String.format(
+ Locale.ROOT,
+ "SELECT * FROM %s.db.%s WHERE dt < '2022-03-02'",
+ catalogName,
+ tableName)),
+ 3),
+ rowsOn20220301Spark);
+
+ List<Row> rowsOn20220302 =
+ Lists.newArrayList(Row.of(1, "Jane", dt20220302), Row.of(2, "Jane",
dt20220302));
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s WHERE dt = '2022-03-02'", tableName),
rowsOn20220302);
+ List<Row> rowsOn20220302Spark =
+ Lists.newArrayList(
+ Row.of(1, "Jane", dt20220302Spark), Row.of(2, "Jane",
dt20220302Spark));
+ TestHelpers.assertRows(
+ convertToFlinkRows(
+ spark.sql(
+ String.format(
+ Locale.ROOT,
+ "SELECT * FROM %s.db.%s WHERE dt = '2022-03-02'",
+ catalogName,
+ tableName)),
+ 3),
+ rowsOn20220302Spark);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Iterables.concat(rowsOn20220301,
rowsOn20220302)));
+ TestHelpers.assertRows(
+ convertToFlinkRows(
+ spark.sql(
+ String.format(Locale.ROOT, "SELECT * FROM %s.db.%s",
catalogName, tableName)),
+ 3),
+ Lists.newArrayList(Iterables.concat(rowsOn20220301Spark,
rowsOn20220302Spark)));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+ }
+
+ @TestTemplate
+ public void testPrimaryKeyEqualToPartitionKey() {
+ // This is an SQL based reproduction of
TestFlinkIcebergSinkV2#testUpsertOnDataKey
+ String tableName = "upsert_on_id_key";
+ try {
+ sql(
+ "CREATE TABLE %s(id INT NOT NULL, name STRING NOT NULL, PRIMARY
KEY(id) NOT ENFORCED) "
+ + "PARTITIONED BY (id) WITH %s",
+ tableName, toWithClause(tableUpsertProps));
+
+ sql("INSERT INTO %s VALUES " + "(1, 'Bill')," + "(1, 'Jane')," + "(2,
'Bill')", tableName);
+
+ List<Row> rows = Lists.newArrayList(Row.of(1, "Jane"), Row.of(2,
"Bill"));
+ TestHelpers.assertRows(sql("SELECT * FROM %s", tableName), rows);
+ TestHelpers.assertRows(
+ convertToFlinkRows(
+ spark.sql(
+ String.format(Locale.ROOT, "SELECT * FROM %s.db.%s",
catalogName, tableName)),
+ 2),
+ rows);
+
+ sql("INSERT INTO %s VALUES " + "(1, 'Bill')," + "(2, 'Jane')",
tableName);
+
+ List<Row> rows2 = Lists.newArrayList(Row.of(1, "Bill"), Row.of(2,
"Jane"));
+ TestHelpers.assertRows(sql("SELECT * FROM %s", tableName), rows2);
+ spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s",
catalogName, tableName));
+ TestHelpers.assertRows(
+ convertToFlinkRows(
+ spark.sql(
+ String.format(Locale.ROOT, "SELECT * FROM %s.db.%s",
catalogName, tableName)),
+ 2),
+ rows2);
+
+ sql("INSERT INTO %s VALUES " + "(3, 'Bill')," + "(4, 'Jane')",
tableName);
+
+ List<Row> rows3 =
+ Lists.newArrayList(
+ Row.of(1, "Bill"), Row.of(2, "Jane"), Row.of(3, "Bill"),
Row.of(4, "Jane"));
+ TestHelpers.assertRows(sql("SELECT * FROM %s", tableName), rows3);
+ spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s",
catalogName, tableName));
+ TestHelpers.assertRows(
+ convertToFlinkRows(
+ spark.sql(
+ String.format(Locale.ROOT, "SELECT * FROM %s.db.%s",
catalogName, tableName)),
+ 2),
+ rows3);
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+ }
+
+ @TestTemplate
+ public void testPrimaryKeyFieldsAtBeginningOfSchema() {
+ String tableName = "upsert_on_pk_at_schema_start";
+ LocalDate dt = LocalDate.of(2022, 3, 1);
+ Date dtSpark =
Date.from(dt.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant());
+
+ try {
+ sql(
+ "CREATE TABLE %s(id INT, dt DATE NOT NULL, name STRING NOT NULL, "
+ + "PRIMARY KEY(id,dt) NOT ENFORCED) "
+ + "PARTITIONED BY (dt) WITH %s",
+ tableName, toWithClause(tableUpsertProps));
+
+ sql(
+ "INSERT INTO %s VALUES "
+ + "(1, DATE '2022-03-01', 'Andy'),"
+ + "(1, DATE '2022-03-01', 'Bill'),"
+ + "(2, DATE '2022-03-01', 'Jane')",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(1, dt, "Bill"), Row.of(2, dt, "Jane")));
+ TestHelpers.assertRows(
+ convertToFlinkRows(
+ spark.sql(
+ String.format(Locale.ROOT, "SELECT * FROM %s.db.%s",
catalogName, tableName)),
+ 3),
+ Lists.newArrayList(Row.of(1, dtSpark, "Bill"), Row.of(2, dtSpark,
"Jane")));
+
+ sql(
+ "INSERT INTO %s VALUES "
+ + "(1, DATE '2022-03-01', 'Jane'),"
+ + "(2, DATE '2022-03-01', 'Bill')",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of(1, dt, "Jane"), Row.of(2, dt, "Bill")));
+ spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s",
catalogName, tableName));
+ TestHelpers.assertRows(
+ convertToFlinkRows(
+ spark.sql(
+ String.format(Locale.ROOT, "SELECT * FROM %s.db.%s",
catalogName, tableName)),
+ 3),
+ Lists.newArrayList(Row.of(1, dtSpark, "Jane"), Row.of(2, dtSpark,
"Bill")));
+
+ sql(
+ "INSERT INTO %s VALUES "
+ + "(3, DATE '2022-03-01', 'Duke'),"
+ + "(4, DATE '2022-03-01', 'Leon')",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(
+ Row.of(1, dt, "Jane"),
+ Row.of(2, dt, "Bill"),
+ Row.of(3, dt, "Duke"),
+ Row.of(4, dt, "Leon")));
+ spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s",
catalogName, tableName));
+ TestHelpers.assertRows(
+ convertToFlinkRows(
+ spark.sql(
+ String.format(Locale.ROOT, "SELECT * FROM %s.db.%s",
catalogName, tableName)),
+ 3),
+ Lists.newArrayList(
+ Row.of(1, dtSpark, "Jane"),
+ Row.of(2, dtSpark, "Bill"),
+ Row.of(3, dtSpark, "Duke"),
+ Row.of(4, dtSpark, "Leon")));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+ }
+
+ @TestTemplate
+ public void testPrimaryKeyFieldsAtEndOfTableSchema() {
+ // This is the same test case as testPrimaryKeyFieldsAtBeginningOfSchema,
but the primary key
+ // fields
+ // are located at the end of the flink schema.
+ String tableName = "upsert_on_pk_at_schema_end";
+ LocalDate dt = LocalDate.of(2022, 3, 1);
+ Date dtSpark =
Date.from(dt.atStartOfDay().atZone(ZoneId.systemDefault()).toInstant());
+ try {
+ sql(
+ "CREATE TABLE %s(name STRING NOT NULL, id INT, dt DATE NOT NULL, "
+ + "PRIMARY KEY(id,dt) NOT ENFORCED) "
+ + "PARTITIONED BY (dt) WITH %s",
+ tableName, toWithClause(tableUpsertProps));
+
+ sql(
+ "INSERT INTO %s VALUES "
+ + "('Andy', 1, DATE '2022-03-01'),"
+ + "('Bill', 1, DATE '2022-03-01'),"
+ + "('Jane', 2, DATE '2022-03-01')",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of("Bill", 1, dt), Row.of("Jane", 2, dt)));
+ TestHelpers.assertRows(
+ convertToFlinkRows(
+ spark.sql(
+ String.format(Locale.ROOT, "SELECT * FROM %s.db.%s",
catalogName, tableName)),
+ 3),
+ Lists.newArrayList(Row.of("Bill", 1, dtSpark), Row.of("Jane", 2,
dtSpark)));
+
+ sql(
+ "INSERT INTO %s VALUES "
+ + "('Jane', 1, DATE '2022-03-01'),"
+ + "('Bill', 2, DATE '2022-03-01')",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(Row.of("Jane", 1, dt), Row.of("Bill", 2, dt)));
+ spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s",
catalogName, tableName));
+ TestHelpers.assertRows(
+ convertToFlinkRows(
+ spark.sql(
+ String.format(Locale.ROOT, "SELECT * FROM %s.db.%s",
catalogName, tableName)),
+ 3),
+ Lists.newArrayList(Row.of("Jane", 1, dtSpark), Row.of("Bill", 2,
dtSpark)));
+
+ sql(
+ "INSERT INTO %s VALUES "
+ + "('Duke', 3, DATE '2022-03-01'),"
+ + "('Leon', 4, DATE '2022-03-01')",
+ tableName);
+
+ TestHelpers.assertRows(
+ sql("SELECT * FROM %s", tableName),
+ Lists.newArrayList(
+ Row.of("Jane", 1, dt),
+ Row.of("Bill", 2, dt),
+ Row.of("Duke", 3, dt),
+ Row.of("Leon", 4, dt)));
+ spark.sql(String.format(Locale.ROOT, "REFRESH TABLE %s.db.%s",
catalogName, tableName));
+ TestHelpers.assertRows(
+ convertToFlinkRows(
+ spark.sql(
+ String.format(Locale.ROOT, "SELECT * FROM %s.db.%s",
catalogName, tableName)),
+ 3),
+ Lists.newArrayList(
+ Row.of("Jane", 1, dtSpark),
+ Row.of("Bill", 2, dtSpark),
+ Row.of("Duke", 3, dtSpark),
+ Row.of("Leon", 4, dtSpark)));
+ } finally {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, tableName);
+ }
+ }
+}
diff --git
a/backends-clickhouse/src-spark33/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java
b/backends-clickhouse/src-spark33/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java
new file mode 100644
index 0000000000..103cdd7438
--- /dev/null
+++
b/backends-clickhouse/src-spark33/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.execution.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.source.TestPositionDeletesTable;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runners.Parameterized;
+
+public class TestPositionDeletesTableGluten extends TestPositionDeletesTable {
+ private static final Map<String, String> CATALOG_PROPS =
+ ImmutableMap.of("type", "hive", "default-namespace", "default",
"cache-enabled", "false");
+ private static ClickHouseIcebergHiveTableSupport hiveTableSupport;
+
+ @BeforeClass
+ public static void startMetastoreAndSpark() {
+ metastore = new TestHiveMetastore();
+ metastore.start();
+ hiveConf = metastore.hiveConf();
+ hiveTableSupport = new ClickHouseIcebergHiveTableSupport();
+ hiveTableSupport.initSparkConf(
+ hiveConf.get("hive.metastore.uris"),
SparkCatalogConfig.HIVE.catalogName(), null);
+ hiveTableSupport.initializeSession();
+ spark = hiveTableSupport.spark();
+ sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+ catalog =
+ (HiveCatalog)
+ CatalogUtil.loadCatalog(
+ HiveCatalog.class.getName(), "hive", ImmutableMap.of(),
hiveConf);
+
+ try {
+ catalog.createNamespace(Namespace.of(new String[] {"default"}));
+ } catch (AlreadyExistsException ignore) {
+ }
+ }
+
+ @AfterClass
+ public static void stopMetastoreAndSpark() throws Exception {
+ catalog = null;
+ if (metastore != null) {
+ metastore.stop();
+ metastore = null;
+ }
+ hiveTableSupport.clean();
+ }
+
+ @Parameterized.Parameters(
+ name =
+ "formatVersion = {0}, catalogName = {1}, implementation = {2},
config = {3}, fileFormat = {4}")
+ public static Object[][] parameters() {
+ // ignore ORC and AVRO, ch backend only support PARQUET
+ return new Object[][] {
+ {
+ SparkCatalogConfig.HIVE.catalogName(),
+ SparkCatalogConfig.HIVE.implementation(),
+ CATALOG_PROPS,
+ FileFormat.PARQUET
+ }
+ };
+ }
+
+ public TestPositionDeletesTableGluten(
+ String catalogName, String implementation, Map<String, String> config,
FileFormat format) {
+ super(catalogName, implementation, config, format);
+ }
+}
diff --git
a/backends-clickhouse/src-spark33/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
b/backends-clickhouse/src-spark33/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
new file mode 100644
index 0000000000..c9e6a8ee5e
--- /dev/null
+++
b/backends-clickhouse/src-spark33/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.execution.iceberg
+
+import com.google.common.base.Strings
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import
org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
+
+class ClickHouseIcebergHiveTableSupport {
+
+ private val sparkConf: SparkConf = new SparkConf()
+
+ private var _hiveSpark: SparkSession = _
+
+ def spark: SparkSession = _hiveSpark
+
+ def initSparkConf(url: String, catalog: String, path: String): SparkConf = {
+ import org.apache.gluten.backendsapi.clickhouse.CHConfig._
+
+ sparkConf
+ .set("spark.plugins", "org.apache.gluten.GlutenPlugin")
+ .set("spark.memory.offHeap.enabled", "true")
+ .set("spark.memory.offHeap.size", "536870912")
+ .set("spark.sql.catalogImplementation", "hive")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set("spark.sql.files.maxPartitionBytes", "1g")
+ .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.adaptive.enabled", "false")
+ .set("spark.sql.files.minPartitionNum", "1")
+ .set(ClickHouseConfig.CLICKHOUSE_WORKER_ID, "1")
+ .set("spark.gluten.sql.columnar.iterator", "true")
+ .set("spark.gluten.sql.columnar.hashagg.enablefinal", "true")
+ .set("spark.gluten.sql.enable.native.validation", "false")
+ .set("spark.gluten.sql.parquet.maxmin.index", "true")
+ .set("spark.hive.exec.dynamic.partition.mode", "nonstrict")
+ .set("spark.gluten.supported.hive.udfs", "my_add")
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set("spark.sql.shuffle.partitions", "2")
+ .set("spark.memory.offHeap.size", "2g")
+ .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+ .set("spark.sql.autoBroadcastJoinThreshold", "-1")
+ .setCHConfig("use_local_format", true)
+ .set("spark.sql.extensions",
+ "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+ .set("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog")
+ .set("spark.sql.catalog.spark_catalog.type", "hive")
+ .setMaster("local[*]")
+ if (!Strings.isNullOrEmpty(url)) {
+ sparkConf.set("spark.hadoop.hive.metastore.uris", url)
+ }
+ if (!Strings.isNullOrEmpty(catalog)) {
+ sparkConf.set("spark.sql.catalog." + catalog,
"org.apache.iceberg.spark.SparkCatalog")
+ .set("spark.sql.catalog." + catalog + ".type", "hive")
+ }
+ if (!Strings.isNullOrEmpty(path)) {
+ sparkConf.set("spark.sql.warehouse.dir", path)
+ }
+ sparkConf
+ }
+
+ def initializeSession(): Unit = {
+ if (_hiveSpark == null) {
+ _hiveSpark =
+ SparkSession
+ .builder()
+ .config(sparkConf)
+ .enableHiveSupport()
+ .getOrCreate()
+ }
+ }
+
+ def clean(): Unit = {
+ try {
+ if (_hiveSpark != null) {
+ _hiveSpark.stop()
+ _hiveSpark = null
+ }
+ } finally {
+ SparkSession.clearActiveSession()
+ SparkSession.clearDefaultSession()
+ }
+ }
+}
diff --git
a/backends-clickhouse/src-spark35/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java
b/backends-clickhouse/src-spark35/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java
new file mode 100644
index 0000000000..3421a094fa
--- /dev/null
+++
b/backends-clickhouse/src-spark35/test/java/org/apache/gluten/execution/iceberg/TestPositionDeletesTableGluten.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.execution.iceberg;
+
+import java.util.Map;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.ParameterizedTestExtension;
+import org.apache.iceberg.Parameters;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.spark.SparkCatalogConfig;
+import org.apache.iceberg.spark.source.TestPositionDeletesTable;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith({ParameterizedTestExtension.class})
+public class TestPositionDeletesTableGluten extends TestPositionDeletesTable {
+ private static final Map<String, String> CATALOG_PROPS =
+ ImmutableMap.of("type", "hive", "default-namespace", "default",
"cache-enabled", "false");
+ private static ClickHouseIcebergHiveTableSupport hiveTableSupport;
+
+ @BeforeAll
+ public static void startMetastoreAndSpark() {
+ metastore = new TestHiveMetastore();
+ metastore.start();
+ hiveConf = metastore.hiveConf();
+ hiveTableSupport = new ClickHouseIcebergHiveTableSupport();
+ hiveTableSupport.initSparkConf(
+ hiveConf.get("hive.metastore.uris"),
SparkCatalogConfig.HIVE.catalogName(), null);
+ hiveTableSupport.initializeSession();
+ spark = hiveTableSupport.spark();
+ sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
+ catalog =
+ (HiveCatalog)
+ CatalogUtil.loadCatalog(
+ HiveCatalog.class.getName(), "hive", ImmutableMap.of(),
hiveConf);
+
+ try {
+ catalog.createNamespace(Namespace.of(new String[] {"default"}));
+ } catch (AlreadyExistsException ignore) {
+ }
+ }
+
+ @AfterAll
+ public static void stopMetastoreAndSpark() throws Exception {
+ catalog = null;
+ if (metastore != null) {
+ metastore.stop();
+ metastore = null;
+ }
+ hiveTableSupport.clean();
+ }
+
+ public TestPositionDeletesTableGluten() {}
+
+ @Parameters(name = "catalogName = {1}, implementation = {2}, config = {3},
fileFormat = {4}")
+ public static Object[][] parameters() {
+ // ignore ORC and AVRO, ch backend only support PARQUET
+ return new Object[][] {
+ {
+ SparkCatalogConfig.HIVE.catalogName(),
+ SparkCatalogConfig.HIVE.implementation(),
+ CATALOG_PROPS,
+ FileFormat.PARQUET
+ }
+ };
+ }
+}
diff --git
a/backends-clickhouse/src-spark35/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
b/backends-clickhouse/src-spark35/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
new file mode 100644
index 0000000000..c9e6a8ee5e
--- /dev/null
+++
b/backends-clickhouse/src-spark35/test/scala/org/apache/gluten/execution/iceberg/ClickHouseIcebergHiveTableSupport.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.execution.iceberg
+
+import com.google.common.base.Strings
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.SparkSession
+import
org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig
+
+class ClickHouseIcebergHiveTableSupport {
+
+ private val sparkConf: SparkConf = new SparkConf()
+
+ private var _hiveSpark: SparkSession = _
+
+ def spark: SparkSession = _hiveSpark
+
+ def initSparkConf(url: String, catalog: String, path: String): SparkConf = {
+ import org.apache.gluten.backendsapi.clickhouse.CHConfig._
+
+ sparkConf
+ .set("spark.plugins", "org.apache.gluten.GlutenPlugin")
+ .set("spark.memory.offHeap.enabled", "true")
+ .set("spark.memory.offHeap.size", "536870912")
+ .set("spark.sql.catalogImplementation", "hive")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set("spark.sql.files.maxPartitionBytes", "1g")
+ .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+ .set("spark.sql.shuffle.partitions", "5")
+ .set("spark.sql.adaptive.enabled", "false")
+ .set("spark.sql.files.minPartitionNum", "1")
+ .set(ClickHouseConfig.CLICKHOUSE_WORKER_ID, "1")
+ .set("spark.gluten.sql.columnar.iterator", "true")
+ .set("spark.gluten.sql.columnar.hashagg.enablefinal", "true")
+ .set("spark.gluten.sql.enable.native.validation", "false")
+ .set("spark.gluten.sql.parquet.maxmin.index", "true")
+ .set("spark.hive.exec.dynamic.partition.mode", "nonstrict")
+ .set("spark.gluten.supported.hive.udfs", "my_add")
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ .set("spark.sql.adaptive.enabled", "true")
+ .set("spark.sql.shuffle.partitions", "2")
+ .set("spark.memory.offHeap.size", "2g")
+ .set("spark.unsafe.exceptionOnMemoryLeak", "true")
+ .set("spark.sql.autoBroadcastJoinThreshold", "-1")
+ .setCHConfig("use_local_format", true)
+ .set("spark.sql.extensions",
+ "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+ .set("spark.sql.catalog.spark_catalog",
"org.apache.iceberg.spark.SparkSessionCatalog")
+ .set("spark.sql.catalog.spark_catalog.type", "hive")
+ .setMaster("local[*]")
+ if (!Strings.isNullOrEmpty(url)) {
+ sparkConf.set("spark.hadoop.hive.metastore.uris", url)
+ }
+ if (!Strings.isNullOrEmpty(catalog)) {
+ sparkConf.set("spark.sql.catalog." + catalog,
"org.apache.iceberg.spark.SparkCatalog")
+ .set("spark.sql.catalog." + catalog + ".type", "hive")
+ }
+ if (!Strings.isNullOrEmpty(path)) {
+ sparkConf.set("spark.sql.warehouse.dir", path)
+ }
+ sparkConf
+ }
+
+ def initializeSession(): Unit = {
+ if (_hiveSpark == null) {
+ _hiveSpark =
+ SparkSession
+ .builder()
+ .config(sparkConf)
+ .enableHiveSupport()
+ .getOrCreate()
+ }
+ }
+
+ def clean(): Unit = {
+ try {
+ if (_hiveSpark != null) {
+ _hiveSpark.stop()
+ _hiveSpark = null
+ }
+ } finally {
+ SparkSession.clearActiveSession()
+ SparkSession.clearDefaultSession()
+ }
+ }
+}
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala
index bb72e34e3e..47307cf2fa 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/cache/GlutenClickHouseCacheBaseTestSuite.scala
@@ -37,7 +37,9 @@ abstract class GlutenClickHouseCacheBaseTestSuite
override protected val queriesResults: String = rootPath + "queries-output"
// Abstract methods to be implemented by subclasses
- protected def cleanupCache(): Unit = cacheHelper.deleteCache(spark,
tablesPath)
+ protected def cleanupCache(): Unit =
+ cacheHelper.deleteCache(spark, s"$tablesPath/lineitem",
s"$tablesPath/$SPARK_DIR_NAME")
+
protected def copyDataIfNeeded(): Unit
// Initialize the cache helper - accessible to subclasses
diff --git
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala
index 6cdc16e2a1..4b694086a9 100644
---
a/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala
+++
b/backends-clickhouse/src/test/scala/org/apache/gluten/utils/CacheTestHelper.scala
@@ -45,22 +45,21 @@ class CacheTestHelper(val TMP_PREFIX: String) {
}
/** Delete cache files for all tables in the data path */
- def deleteCache(spark: SparkSession, dataPath: String): Unit = {
- val targetFile = new Path(dataPath)
- val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
- fs.listStatus(targetFile)
- .foreach(
- table => {
- if (table.isDirectory) {
- fs.listStatus(table.getPath)
- .foreach(
- data => {
- if (data.isFile) {
- CHNativeCacheManager
- .removeFiles(data.getPath.toUri.getPath.substring(1),
CACHE_NAME)
- }
- })
- }
- })
+ def deleteCache(spark: SparkSession, dataPaths: String*): Unit = {
+ dataPaths.foreach(
+ dataPath => {
+ val targetFile = new Path(dataPath)
+ val fs = targetFile.getFileSystem(spark.sessionState.newHadoopConf())
+ if (fs.isDirectory(targetFile)) {
+ fs.listStatus(targetFile)
+ .foreach(
+ data => {
+ if (data.isFile) {
+ CHNativeCacheManager
+ .removeFiles(data.getPath.toUri.getPath.substring(1),
CACHE_NAME)
+ }
+ })
+ }
+ })
}
}
diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
index fedd043500..dfbbe4267d 100644
--- a/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
+++ b/cpp-ch/local-engine/Storages/SubstraitSource/FileReader.cpp
@@ -59,12 +59,12 @@ DB::Columns BaseReader::addVirtualColumn(DB::Chunk
dataChunk, size_t rowNum) con
std::back_inserter(res_columns),
[&](const auto & column) -> DB::ColumnPtr
{
- if (readHeader.has(column.name))
- return read_columns[readHeader.getPositionByName(column.name)];
if (auto it =
normalized_partition_values.find(boost::to_lower_copy(column.name)); it !=
normalized_partition_values.end())
return createPartitionColumn(it->second, column.type, rows);
if (file->fileMetaColumns().virtualColumn(column.name))
return file->fileMetaColumns().createMetaColumn(column.name,
column.type, rows);
+ if (readHeader.has(column.name))
+ return read_columns[readHeader.getPositionByName(column.name)];
throw DB::Exception(
DB::ErrorCodes::LOGICAL_ERROR, "Not found column = {} when
reading file: {}.", column.name, file->getURIPath());
});
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]