This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new fb5c8e452b [GLUTEN-8799][VL]Support Iceberg with Gluten test framework 
(#8800)
fb5c8e452b is described below

commit fb5c8e452b32217dc14596686628bd9e221f03df
Author: Jin Chengcheng <[email protected]>
AuthorDate: Thu Mar 6 15:53:45 2025 +0000

    [GLUTEN-8799][VL]Support Iceberg with Gluten test framework (#8800)
---
 backends-velox/pom.xml                             |  88 +++
 .../execution/TestStoragePartitionedJoins.java     | 665 +++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkTestBase.java    | 291 +++++++++
 3 files changed, 1044 insertions(+)

diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml
index ff1314a627..e19cd63330 100755
--- a/backends-velox/pom.xml
+++ b/backends-velox/pom.xml
@@ -100,6 +100,27 @@
           <version>${iceberg.version}</version>
           <scope>provided</scope>
         </dependency>
+        <dependency>
+          <groupId>org.apache.iceberg</groupId>
+          
<artifactId>iceberg-spark-${sparkbundle.version}_${scala.binary.version}</artifactId>
+          <version>${iceberg.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.iceberg</groupId>
+          <artifactId>iceberg-hive-metastore</artifactId>
+          <version>${iceberg.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.iceberg</groupId>
+          <artifactId>iceberg-api</artifactId>
+          <version>${iceberg.version}</version>
+          <type>test-jar</type>
+          <scope>test</scope>
+        </dependency>
       </dependencies>
     </profile>
     <profile>
@@ -147,6 +168,73 @@
         </dependency>
       </dependencies>
     </profile>
+    <profile>
+      <id>spark-3.2</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <testExcludes>
+                <testExclude>**/org/apache/gluten/spark34/**</testExclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>spark-3.3</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <testExcludes>
+                <testExclude>**/org/apache/gluten/spark34/**</testExclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>spark-3.5</id>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <testExcludes>
+                <testExclude>**/org/apache/gluten/spark34/**</testExclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
+    <profile>
+      <id>default</id>
+      <activation>
+        <activeByDefault>true</activeByDefault>
+      </activation>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <testExcludes>
+                <testExclude>**/org/apache/gluten/spark34/**</testExclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
+    </profile>
   </profiles>
 
   <dependencies>
diff --git 
a/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/execution/TestStoragePartitionedJoins.java
 
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/execution/TestStoragePartitionedJoins.java
new file mode 100644
index 0000000000..8155b630b1
--- /dev/null
+++ 
b/backends-velox/src-iceberg/test/java/org/apache/gluten/spark34/execution/TestStoragePartitionedJoins.java
@@ -0,0 +1,665 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.spark34.execution;
+
+import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
+import static org.apache.iceberg.PlanningMode.LOCAL;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.iceberg.PlanningMode;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSQLProperties;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkTestBaseWithCatalog;
+import org.apache.iceberg.spark.SparkWriteOptions;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.StructType;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestStoragePartitionedJoins extends SparkTestBaseWithCatalog {
+
+  @Parameterized.Parameters(name = "planningMode = {0}")
+  public static Object[] parameters() {
+    return new Object[] {LOCAL, DISTRIBUTED};
+  }
+
+  private static final String OTHER_TABLE_NAME = "other_table";
+
+  // open file cost and split size are set as 16 MB to produce a split per file
+  private static final Map<String, String> TABLE_PROPERTIES =
+          ImmutableMap.of(
+                  TableProperties.SPLIT_SIZE, "16777216", 
TableProperties.SPLIT_OPEN_FILE_COST, "16777216");
+
+  // only v2 bucketing and preserve data grouping properties have to be 
enabled to trigger SPJ
+  // other properties are only to simplify testing and validation
+  private static final Map<String, String> ENABLED_SPJ_SQL_CONF =
+          ImmutableMap.of(
+                  SQLConf.V2_BUCKETING_ENABLED().key(),
+                  "true",
+                  SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED().key(),
+                  "true",
+                  SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(),
+                  "false",
+                  SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
+                  "false",
+                  SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(),
+                  "-1",
+                  SparkSQLProperties.PRESERVE_DATA_GROUPING,
+                  "true");
+
+  private static final Map<String, String> DISABLED_SPJ_SQL_CONF =
+          ImmutableMap.of(
+                  SQLConf.V2_BUCKETING_ENABLED().key(),
+                  "false",
+                  SQLConf.REQUIRE_ALL_CLUSTER_KEYS_FOR_CO_PARTITION().key(),
+                  "false",
+                  SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(),
+                  "false",
+                  SQLConf.AUTO_BROADCASTJOIN_THRESHOLD().key(),
+                  "-1",
+                  SparkSQLProperties.PRESERVE_DATA_GROUPING,
+                  "true");
+
+  private final PlanningMode planningMode;
+
+  public TestStoragePartitionedJoins(PlanningMode planningMode) {
+    this.planningMode = planningMode;
+  }
+
+  @BeforeClass
+  public static void setupSparkConf() {
+    spark.conf().set("spark.sql.shuffle.partitions", "4");
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %s", tableName(OTHER_TABLE_NAME));
+  }
+
+  // TODO: add tests for truncate transforms once SPARK-40295 is released
+
+  @Test
+  public void testJoinsWithBucketingOnByteColumn() throws NoSuchTableException 
{
+    checkJoin("byte_col", "TINYINT", "bucket(4, byte_col)");
+  }
+
+  @Test
+  public void testJoinsWithBucketingOnShortColumn() throws 
NoSuchTableException {
+    checkJoin("short_col", "SMALLINT", "bucket(4, short_col)");
+  }
+
+  @Test
+  public void testJoinsWithBucketingOnIntColumn() throws NoSuchTableException {
+    checkJoin("int_col", "INT", "bucket(16, int_col)");
+  }
+
+  @Test
+  public void testJoinsWithBucketingOnLongColumn() throws NoSuchTableException 
{
+    checkJoin("long_col", "BIGINT", "bucket(16, long_col)");
+  }
+
+  @Test
+  public void testJoinsWithBucketingOnTimestampColumn() throws 
NoSuchTableException {
+    checkJoin("timestamp_col", "TIMESTAMP", "bucket(16, timestamp_col)");
+  }
+
+  @Test
+  public void testJoinsWithBucketingOnTimestampNtzColumn() throws 
NoSuchTableException {
+    checkJoin("timestamp_col", "TIMESTAMP_NTZ", "bucket(16, timestamp_col)");
+  }
+
+  @Test
+  public void testJoinsWithBucketingOnDateColumn() throws NoSuchTableException 
{
+    checkJoin("date_col", "DATE", "bucket(8, date_col)");
+  }
+
+  @Test
+  public void testJoinsWithBucketingOnDecimalColumn() throws 
NoSuchTableException {
+    checkJoin("decimal_col", "DECIMAL(20, 2)", "bucket(8, decimal_col)");
+  }
+
+  @Test
+  public void testJoinsWithBucketingOnBinaryColumn() throws 
NoSuchTableException {
+    checkJoin("binary_col", "BINARY", "bucket(8, binary_col)");
+  }
+
+  @Test
+  public void testJoinsWithYearsOnTimestampColumn() throws 
NoSuchTableException {
+    checkJoin("timestamp_col", "TIMESTAMP", "years(timestamp_col)");
+  }
+
+  @Test
+  public void testJoinsWithYearsOnTimestampNtzColumn() throws 
NoSuchTableException {
+    checkJoin("timestamp_col", "TIMESTAMP_NTZ", "years(timestamp_col)");
+  }
+
+  @Test
+  public void testJoinsWithYearsOnDateColumn() throws NoSuchTableException {
+    checkJoin("date_col", "DATE", "years(date_col)");
+  }
+
+  @Test
+  public void testJoinsWithMonthsOnTimestampColumn() throws 
NoSuchTableException {
+    checkJoin("timestamp_col", "TIMESTAMP", "months(timestamp_col)");
+  }
+
+  @Test
+  public void testJoinsWithMonthsOnTimestampNtzColumn() throws 
NoSuchTableException {
+    checkJoin("timestamp_col", "TIMESTAMP_NTZ", "months(timestamp_col)");
+  }
+
+  @Test
+  public void testJoinsWithMonthsOnDateColumn() throws NoSuchTableException {
+    checkJoin("date_col", "DATE", "months(date_col)");
+  }
+
+  @Test
+  public void testJoinsWithDaysOnTimestampColumn() throws NoSuchTableException 
{
+    checkJoin("timestamp_col", "TIMESTAMP", "days(timestamp_col)");
+  }
+
+  @Test
+  public void testJoinsWithDaysOnTimestampNtzColumn() throws 
NoSuchTableException {
+    checkJoin("timestamp_col", "TIMESTAMP_NTZ", "days(timestamp_col)");
+  }
+
+  @Test
+  public void testJoinsWithDaysOnDateColumn() throws NoSuchTableException {
+    checkJoin("date_col", "DATE", "days(date_col)");
+  }
+
+  @Test
+  public void testJoinsWithHoursOnTimestampColumn() throws 
NoSuchTableException {
+    checkJoin("timestamp_col", "TIMESTAMP", "hours(timestamp_col)");
+  }
+
+  @Test
+  public void testJoinsWithHoursOnTimestampNtzColumn() throws 
NoSuchTableException {
+    checkJoin("timestamp_col", "TIMESTAMP_NTZ", "hours(timestamp_col)");
+  }
+
+  @Test
+  public void testJoinsWithMultipleTransformTypes() throws 
NoSuchTableException {
+    String createTableStmt =
+            "CREATE TABLE %s ("
+                    + "  id BIGINT, int_col INT, date_col1 DATE, date_col2 
DATE, date_col3 DATE,"
+                    + "  timestamp_col TIMESTAMP, string_col STRING, dep 
STRING)"
+                    + "USING iceberg "
+                    + "PARTITIONED BY ("
+                    + "  years(date_col1), months(date_col2), days(date_col3), 
hours(timestamp_col), "
+                    + "  bucket(8, int_col), dep)"
+                    + "TBLPROPERTIES (%s)";
+
+    sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES));
+    sql(createTableStmt, tableName(OTHER_TABLE_NAME), 
tablePropsAsString(TABLE_PROPERTIES));
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Dataset<Row> dataDF = randomDataDF(table.schema(), 16);
+
+    // write to the first table 1 time to generate 1 file per partition
+    append(tableName, dataDF);
+
+    // write to the second table 2 times to generate 2 files per partition
+    append(tableName(OTHER_TABLE_NAME), dataDF);
+    append(tableName(OTHER_TABLE_NAME), dataDF);
+
+    // Spark SPJ support is limited at the moment and requires all source 
partitioning columns,
+    // which were projected in the query, to be part of the join condition
+    // suppose a table is partitioned by `p1`, `bucket(8, pk)`
+    // queries covering `p1` and `pk` columns must include equality predicates
+    // on both `p1` and `pk` to benefit from SPJ
+    // this is a temporary Spark limitation that will be removed in a future 
release
+
+    assertPartitioningAwarePlan(
+            1, /* expected num of shuffles with SPJ */
+            3, /* expected num of shuffles without SPJ */
+            "SELECT t1.id "
+                    + "FROM %s t1 "
+                    + "INNER JOIN %s t2 "
+                    + "ON t1.id = t2.id AND t1.dep = t2.dep "
+                    + "ORDER BY t1.id",
+            tableName,
+            tableName(OTHER_TABLE_NAME));
+
+    assertPartitioningAwarePlan(
+            1, /* expected num of shuffles with SPJ */
+            3, /* expected num of shuffles without SPJ */
+            "SELECT t1.id, t1.int_col, t1.date_col1 "
+                    + "FROM %s t1 "
+                    + "INNER JOIN %s t2 "
+                    + "ON t1.id = t2.id AND t1.int_col = t2.int_col AND 
t1.date_col1 = t2.date_col1 "
+                    + "ORDER BY t1.id, t1.int_col, t1.date_col1",
+            tableName,
+            tableName(OTHER_TABLE_NAME));
+
+    assertPartitioningAwarePlan(
+            1, /* expected num of shuffles with SPJ */
+            3, /* expected num of shuffles without SPJ */
+            "SELECT t1.id, t1.timestamp_col, t1.string_col "
+                    + "FROM %s t1 "
+                    + "INNER JOIN %s t2 "
+                    + "ON t1.id = t2.id AND t1.timestamp_col = 
t2.timestamp_col AND t1.string_col = t2.string_col "
+                    + "ORDER BY t1.id, t1.timestamp_col, t1.string_col",
+            tableName,
+            tableName(OTHER_TABLE_NAME));
+
+    assertPartitioningAwarePlan(
+            1, /* expected num of shuffles with SPJ */
+            3, /* expected num of shuffles without SPJ */
+            "SELECT t1.id, t1.date_col1, t1.date_col2, t1.date_col3 "
+                    + "FROM %s t1 "
+                    + "INNER JOIN %s t2 "
+                    + "ON t1.id = t2.id AND t1.date_col1 = t2.date_col1 AND 
t1.date_col2 = t2.date_col2 AND t1.date_col3 = t2.date_col3 "
+                    + "ORDER BY t1.id, t1.date_col1, t1.date_col2, 
t1.date_col3",
+            tableName,
+            tableName(OTHER_TABLE_NAME));
+
+    assertPartitioningAwarePlan(
+            1, /* expected num of shuffles with SPJ */
+            3, /* expected num of shuffles without SPJ */
+            "SELECT t1.id, t1.int_col, t1.timestamp_col, t1.dep "
+                    + "FROM %s t1 "
+                    + "INNER JOIN %s t2 "
+                    + "ON t1.id = t2.id AND t1.int_col = t2.int_col AND 
t1.timestamp_col = t2.timestamp_col AND t1.dep = t2.dep "
+                    + "ORDER BY t1.id, t1.int_col, t1.timestamp_col, t1.dep",
+            tableName,
+            tableName(OTHER_TABLE_NAME));
+  }
+
+  @Test
+  public void testJoinsWithCompatibleSpecEvolution() {
+    // create a table with an empty spec
+    sql(
+            "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+                    + "USING iceberg "
+                    + "TBLPROPERTIES (%s)",
+            tableName, tablePropsAsString(TABLE_PROPERTIES));
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    // evolve the spec in the first table by adding `dep`
+    table.updateSpec().addField("dep").commit();
+
+    // insert data into the first table partitioned by `dep`
+    sql("REFRESH TABLE %s", tableName);
+    sql("INSERT INTO %s VALUES (1L, 100, 'software')", tableName);
+
+    // evolve the spec in the first table by adding `bucket(int_col, 8)`
+    table.updateSpec().addField(Expressions.bucket("int_col", 8)).commit();
+
+    // insert data into the first table partitioned by `dep`, `bucket(8, 
int_col)`
+    sql("REFRESH TABLE %s", tableName);
+    sql("INSERT INTO %s VALUES (2L, 200, 'hr')", tableName);
+
+    // create another table partitioned by `other_dep`
+    sql(
+            "CREATE TABLE %s (other_id BIGINT, other_int_col INT, other_dep 
STRING)"
+                    + "USING iceberg "
+                    + "PARTITIONED BY (other_dep)"
+                    + "TBLPROPERTIES (%s)",
+            tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));
+
+    // insert data into the second table partitioned by 'other_dep'
+    sql("INSERT INTO %s VALUES (1L, 100, 'software')", 
tableName(OTHER_TABLE_NAME));
+    sql("INSERT INTO %s VALUES (2L, 200, 'hr')", tableName(OTHER_TABLE_NAME));
+
+    // SPJ would apply as the grouping keys are compatible
+    // the first table: `dep` (an intersection of all active partition fields 
across scanned specs)
+    // the second table: `other_dep` (the only partition field).
+
+    assertPartitioningAwarePlan(
+            1, /* expected num of shuffles with SPJ */
+            3, /* expected num of shuffles without SPJ */
+            "SELECT * "
+                    + "FROM %s "
+                    + "INNER JOIN %s "
+                    + "ON id = other_id AND int_col = other_int_col AND dep = 
other_dep "
+                    + "ORDER BY id, int_col, dep",
+            tableName,
+            tableName(OTHER_TABLE_NAME));
+  }
+
+  @Test
+  public void testJoinsWithIncompatibleSpecs() {
+    sql(
+            "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+                    + "USING iceberg "
+                    + "PARTITIONED BY (dep)"
+                    + "TBLPROPERTIES (%s)",
+            tableName, tablePropsAsString(TABLE_PROPERTIES));
+
+    sql("INSERT INTO %s VALUES (1L, 100, 'software')", tableName);
+    sql("INSERT INTO %s VALUES (2L, 200, 'software')", tableName);
+    sql("INSERT INTO %s VALUES (3L, 300, 'software')", tableName);
+
+    sql(
+            "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+                    + "USING iceberg "
+                    + "PARTITIONED BY (bucket(8, int_col))"
+                    + "TBLPROPERTIES (%s)",
+            tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));
+
+    sql("INSERT INTO %s VALUES (1L, 100, 'software')", 
tableName(OTHER_TABLE_NAME));
+    sql("INSERT INTO %s VALUES (2L, 200, 'software')", 
tableName(OTHER_TABLE_NAME));
+    sql("INSERT INTO %s VALUES (3L, 300, 'software')", 
tableName(OTHER_TABLE_NAME));
+
+    // queries can't benefit from SPJ as specs are not compatible
+    // the first table: `dep`
+    // the second table: `bucket(8, int_col)`
+
+    assertPartitioningAwarePlan(
+            3, /* expected num of shuffles with SPJ */
+            3, /* expected num of shuffles with SPJ */
+            "SELECT * "
+                    + "FROM %s t1 "
+                    + "INNER JOIN %s t2 "
+                    + "ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.dep 
= t2.dep "
+                    + "ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col, 
t2.dep",
+            tableName,
+            tableName(OTHER_TABLE_NAME));
+  }
+
+  @Test
+  public void testJoinsWithUnpartitionedTables() {
+    sql(
+            "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+                    + "USING iceberg "
+                    + "TBLPROPERTIES ("
+                    + "  'read.split.target-size' = 16777216,"
+                    + "  'read.split.open-file-cost' = 16777216)",
+            tableName);
+
+    sql("INSERT INTO %s VALUES (1L, 100, 'software')", tableName);
+    sql("INSERT INTO %s VALUES (2L, 200, 'software')", tableName);
+    sql("INSERT INTO %s VALUES (3L, 300, 'software')", tableName);
+
+    sql(
+            "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+                    + "USING iceberg "
+                    + "TBLPROPERTIES ("
+                    + "  'read.split.target-size' = 16777216,"
+                    + "  'read.split.open-file-cost' = 16777216)",
+            tableName(OTHER_TABLE_NAME));
+
+    sql("INSERT INTO %s VALUES (1L, 100, 'software')", 
tableName(OTHER_TABLE_NAME));
+    sql("INSERT INTO %s VALUES (2L, 200, 'software')", 
tableName(OTHER_TABLE_NAME));
+    sql("INSERT INTO %s VALUES (3L, 300, 'software')", 
tableName(OTHER_TABLE_NAME));
+
+    // queries covering unpartitioned tables can't benefit from SPJ but 
shouldn't fail
+
+    assertPartitioningAwarePlan(
+            3, /* expected num of shuffles with SPJ */
+            3, /* expected num of shuffles without SPJ */
+            "SELECT * "
+                    + "FROM %s t1 "
+                    + "INNER JOIN %s t2 "
+                    + "ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.dep 
= t2.dep "
+                    + "ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col, 
t2.dep",
+            tableName,
+            tableName(OTHER_TABLE_NAME));
+  }
+
+  @Test
+  public void testJoinsWithEmptyTable() {
+    sql(
+            "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+                    + "USING iceberg "
+                    + "PARTITIONED BY (dep)"
+                    + "TBLPROPERTIES (%s)",
+            tableName, tablePropsAsString(TABLE_PROPERTIES));
+
+    sql(
+            "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+                    + "USING iceberg "
+                    + "PARTITIONED BY (dep)"
+                    + "TBLPROPERTIES (%s)",
+            tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));
+
+    sql("INSERT INTO %s VALUES (1L, 100, 'software')", 
tableName(OTHER_TABLE_NAME));
+    sql("INSERT INTO %s VALUES (2L, 200, 'software')", 
tableName(OTHER_TABLE_NAME));
+    sql("INSERT INTO %s VALUES (3L, 300, 'software')", 
tableName(OTHER_TABLE_NAME));
+
+    assertPartitioningAwarePlan(
+            3, /* expected num of shuffles with SPJ */
+            3, /* expected num of shuffles without SPJ */
+            "SELECT * "
+                    + "FROM %s t1 "
+                    + "INNER JOIN %s t2 "
+                    + "ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.dep 
= t2.dep "
+                    + "ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col, 
t2.dep",
+            tableName,
+            tableName(OTHER_TABLE_NAME));
+  }
+
+  @Test
+  public void testJoinsWithOneSplitTables() {
+    sql(
+            "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+                    + "USING iceberg "
+                    + "PARTITIONED BY (dep)"
+                    + "TBLPROPERTIES (%s)",
+            tableName, tablePropsAsString(TABLE_PROPERTIES));
+
+    sql("INSERT INTO %s VALUES (1L, 100, 'software')", tableName);
+
+    sql(
+            "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+                    + "USING iceberg "
+                    + "PARTITIONED BY (dep)"
+                    + "TBLPROPERTIES (%s)",
+            tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));
+
+    sql("INSERT INTO %s VALUES (1L, 100, 'software')", 
tableName(OTHER_TABLE_NAME));
+
+    // Spark should be able to avoid shuffles even without SPJ if each side 
has only one split
+
+    assertPartitioningAwarePlan(
+            0, /* expected num of shuffles with SPJ */
+            0, /* expected num of shuffles without SPJ */
+            "SELECT * "
+                    + "FROM %s t1 "
+                    + "INNER JOIN %s t2 "
+                    + "ON t1.id = t2.id AND t1.int_col = t2.int_col AND t1.dep 
= t2.dep "
+                    + "ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col, 
t2.dep",
+            tableName,
+            tableName(OTHER_TABLE_NAME));
+  }
+
+  @Test
+  public void testJoinsWithMismatchingPartitionKeys() {
+    sql(
+            "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+                    + "USING iceberg "
+                    + "PARTITIONED BY (dep)"
+                    + "TBLPROPERTIES (%s)",
+            tableName, tablePropsAsString(TABLE_PROPERTIES));
+
+    sql("INSERT INTO %s VALUES (1L, 100, 'software')", tableName);
+    sql("INSERT INTO %s VALUES (2L, 100, 'hr')", tableName);
+
+    sql(
+            "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+                    + "USING iceberg "
+                    + "PARTITIONED BY (dep)"
+                    + "TBLPROPERTIES (%s)",
+            tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));
+
+    sql("INSERT INTO %s VALUES (1L, 100, 'software')", 
tableName(OTHER_TABLE_NAME));
+    sql("INSERT INTO %s VALUES (3L, 300, 'hardware')", 
tableName(OTHER_TABLE_NAME));
+
+    assertPartitioningAwarePlan(
+            1, /* expected num of shuffles with SPJ */
+            3, /* expected num of shuffles without SPJ */
+            "SELECT * "
+                    + "FROM %s t1 "
+                    + "INNER JOIN %s t2 "
+                    + "ON t1.id = t2.id AND t1.dep = t2.dep "
+                    + "ORDER BY t1.id, t1.int_col, t1.dep, t2.id, t2.int_col, 
t2.dep",
+            tableName,
+            tableName(OTHER_TABLE_NAME));
+  }
+
+  @Test
+  public void testAggregates() throws NoSuchTableException {
+    sql(
+            "CREATE TABLE %s (id BIGINT, int_col INT, dep STRING)"
+                    + "USING iceberg "
+                    + "PARTITIONED BY (dep, bucket(8, int_col))"
+                    + "TBLPROPERTIES (%s)",
+            tableName, tablePropsAsString(TABLE_PROPERTIES));
+
+    // write to the table 3 times to generate 3 files per partition
+    Table table = validationCatalog.loadTable(tableIdent);
+    Dataset<Row> dataDF = randomDataDF(table.schema(), 100);
+    append(tableName, dataDF);
+
+    assertPartitioningAwarePlan(
+            1, /* expected num of shuffles with SPJ */
+            3, /* expected num of shuffles without SPJ */
+            "SELECT COUNT (DISTINCT id) AS count FROM %s GROUP BY dep, int_col 
ORDER BY count",
+            tableName,
+            tableName(OTHER_TABLE_NAME));
+
+    assertPartitioningAwarePlan(
+            1, /* expected num of shuffles with SPJ */
+            3, /* expected num of shuffles without SPJ */
+            "SELECT COUNT (DISTINCT id) AS count FROM %s GROUP BY dep ORDER BY 
count",
+            tableName,
+            tableName(OTHER_TABLE_NAME));
+  }
+
+  private void checkJoin(String sourceColumnName, String sourceColumnType, 
String transform)
+          throws NoSuchTableException {
+
+    String createTableStmt =
+            "CREATE TABLE %s (id BIGINT, salary INT, %s %s)"
+                    + "USING iceberg "
+                    + "PARTITIONED BY (%s)"
+                    + "TBLPROPERTIES (%s)";
+
+    sql(
+            createTableStmt,
+            tableName,
+            sourceColumnName,
+            sourceColumnType,
+            transform,
+            tablePropsAsString(TABLE_PROPERTIES));
+    configurePlanningMode(tableName, planningMode);
+
+    sql(
+            createTableStmt,
+            tableName(OTHER_TABLE_NAME),
+            sourceColumnName,
+            sourceColumnType,
+            transform,
+            tablePropsAsString(TABLE_PROPERTIES));
+    configurePlanningMode(tableName(OTHER_TABLE_NAME), planningMode);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Dataset<Row> dataDF = randomDataDF(table.schema(), 200);
+    append(tableName, dataDF);
+    append(tableName(OTHER_TABLE_NAME), dataDF);
+
+    assertPartitioningAwarePlan(
+            1, /* expected num of shuffles with SPJ */
+            3, /* expected num of shuffles without SPJ */
+            "SELECT t1.id, t1.salary, t1.%s "
+                    + "FROM %s t1 "
+                    + "INNER JOIN %s t2 "
+                    + "ON t1.id = t2.id AND t1.%s = t2.%s "
+                    + "ORDER BY t1.id, t1.%s, t1.salary", // add order by 
salary to make test stable
+            sourceColumnName,
+            tableName,
+            tableName(OTHER_TABLE_NAME),
+            sourceColumnName,
+            sourceColumnName,
+            sourceColumnName);
+  }
+
+  private void assertPartitioningAwarePlan(
+          int expectedNumShufflesWithSPJ,
+          int expectedNumShufflesWithoutSPJ,
+          String query,
+          Object... args) {
+
+    AtomicReference<List<Object[]>> rowsWithSPJ = new AtomicReference<>();
+    AtomicReference<List<Object[]>> rowsWithoutSPJ = new AtomicReference<>();
+
+    withSQLConf(
+            ENABLED_SPJ_SQL_CONF,
+            () -> {
+              String plan = executeAndKeepPlan(query, args).toString();
+              int actualNumShuffles = StringUtils.countMatches(plan, 
"Exchange");
+              Assert.assertEquals(
+                      "Number of shuffles with enabled SPJ must match",
+                      expectedNumShufflesWithSPJ,
+                      actualNumShuffles);
+
+              rowsWithSPJ.set(sql(query, args));
+            });
+
+    withSQLConf(
+            DISABLED_SPJ_SQL_CONF,
+            () -> {
+              String plan = executeAndKeepPlan(query, args).toString();
+              int actualNumShuffles = StringUtils.countMatches(plan, 
"Exchange");
+              Assert.assertEquals(
+                      "Number of shuffles with disabled SPJ must match",
+                      expectedNumShufflesWithoutSPJ,
+                      actualNumShuffles);
+
+              rowsWithoutSPJ.set(sql(query, args));
+            });
+
+    assertEquals("SPJ should not change query output", rowsWithoutSPJ.get(), 
rowsWithSPJ.get());
+  }
+
+  private Dataset<Row> randomDataDF(Schema schema, int numRows) {
+    Iterable<InternalRow> rows = RandomData.generateSpark(schema, numRows, 0);
+    JavaRDD<InternalRow> rowRDD = 
sparkContext.parallelize(Lists.newArrayList(rows));
+    StructType rowSparkType = SparkSchemaUtil.convert(schema);
+    return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType, 
false);
+  }
+
+  private void append(String table, Dataset<Row> df) throws 
NoSuchTableException {
+    // fanout writes are enabled as write-time clustering is not supported 
without Spark extensions
+    df.coalesce(1).writeTo(table).option(SparkWriteOptions.FANOUT_ENABLED, 
"true").append();
+  }
+}
+
diff --git 
a/backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/SparkTestBase.java
 
b/backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/SparkTestBase.java
new file mode 100644
index 0000000000..4a9c1d5a7e
--- /dev/null
+++ 
b/backends-velox/src-iceberg/test/java/org/apache/iceberg/spark/SparkTestBase.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.hive.HiveCatalog;
+import org.apache.iceberg.hive.TestHiveMetastore;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.execution.QueryExecution;
+import org.apache.spark.sql.execution.SparkPlan;
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.QueryExecutionListener;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+
+public abstract class SparkTestBase extends SparkTestHelperBase {
+
+    protected static TestHiveMetastore metastore = null;
+    protected static HiveConf hiveConf = null;
+    protected static SparkSession spark = null;
+    protected static JavaSparkContext sparkContext = null;
+    protected static HiveCatalog catalog = null;
+
+    @BeforeClass
+    public static void startMetastoreAndSpark() {
+        SparkTestBase.metastore = new TestHiveMetastore();
+        metastore.start();
+        SparkTestBase.hiveConf = metastore.hiveConf();
+
+        SparkTestBase.spark =
+                SparkSession.builder()
+                        .master("local[2]")
+                        .config(SQLConf.PARTITION_OVERWRITE_MODE().key(), 
"dynamic")
+                        .config("spark.hadoop." + METASTOREURIS.varname, 
hiveConf.get(METASTOREURIS.varname))
+                        
.config("spark.sql.legacy.respectNullabilityInTextDatasetConversion", "true")
+                        .config("spark.plugins", 
"org.apache.gluten.GlutenPlugin")
+                        .config("spark.default.parallelism", "1")
+                        .config("spark.memory.offHeap.enabled", "true")
+                        .config("spark.memory.offHeap.size", "1024MB")
+                        .config("spark.ui.enabled", "false")
+                        .config("spark.gluten.ui.enabled", "false")
+                        .enableHiveSupport()
+                        .getOrCreate();
+
+        SparkTestBase.sparkContext = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
+
+        SparkTestBase.catalog =
+                (HiveCatalog)
+                        CatalogUtil.loadCatalog(
+                                HiveCatalog.class.getName(), "hive", 
ImmutableMap.of(), hiveConf);
+
+        try {
+            catalog.createNamespace(Namespace.of("default"));
+        } catch (AlreadyExistsException ignored) {
+            // the default namespace already exists. ignore the create error
+        }
+    }
+
+    @AfterClass
+    public static void stopMetastoreAndSpark() throws Exception {
+        SparkTestBase.catalog = null;
+        if (metastore != null) {
+            metastore.stop();
+            SparkTestBase.metastore = null;
+        }
+        if (spark != null) {
+            spark.stop();
+            SparkTestBase.spark = null;
+            SparkTestBase.sparkContext = null;
+        }
+    }
+
+    protected long waitUntilAfter(long timestampMillis) {
+        long current = System.currentTimeMillis();
+        while (current <= timestampMillis) {
+            current = System.currentTimeMillis();
+        }
+        return current;
+    }
+
+    protected List<Object[]> sql(String query, Object... args) {
+        List<Row> rows = spark.sql(String.format(query, args)).collectAsList();
+        if (rows.size() < 1) {
+            return ImmutableList.of();
+        }
+
+        return rowsToJava(rows);
+    }
+
+    protected Object scalarSql(String query, Object... args) {
+        List<Object[]> rows = sql(query, args);
+        Assert.assertEquals("Scalar SQL should return one row", 1, 
rows.size());
+        Object[] row = Iterables.getOnlyElement(rows);
+        Assert.assertEquals("Scalar SQL should return one value", 1, 
row.length);
+        return row[0];
+    }
+
+    protected Object[] row(Object... values) {
+        return values;
+    }
+
+    protected static String dbPath(String dbName) {
+        return metastore.getDatabasePath(dbName);
+    }
+
+    protected void withUnavailableFiles(Iterable<? extends ContentFile<?>> 
files, Action action) {
+        Iterable<String> fileLocations = Iterables.transform(files, file -> 
file.path().toString());
+        withUnavailableLocations(fileLocations, action);
+    }
+
+    private void move(String location, String newLocation) {
+        Path path = Paths.get(URI.create(location));
+        Path tempPath = Paths.get(URI.create(newLocation));
+
+        try {
+            Files.move(path, tempPath);
+        } catch (IOException e) {
+            throw new UncheckedIOException("Failed to move: " + location, e);
+        }
+    }
+
+    protected void withUnavailableLocations(Iterable<String> locations, Action 
action) {
+        for (String location : locations) {
+            move(location, location + "_temp");
+        }
+
+        try {
+            action.invoke();
+        } finally {
+            for (String location : locations) {
+                move(location + "_temp", location);
+            }
+        }
+    }
+
+    protected void withDefaultTimeZone(String zoneId, Action action) {
+        TimeZone currentZone = TimeZone.getDefault();
+        try {
+            TimeZone.setDefault(TimeZone.getTimeZone(zoneId));
+            action.invoke();
+        } finally {
+            TimeZone.setDefault(currentZone);
+        }
+    }
+
+    protected void withSQLConf(Map<String, String> conf, Action action) {
+        SQLConf sqlConf = SQLConf.get();
+
+        Map<String, String> currentConfValues = Maps.newHashMap();
+        conf.keySet()
+                .forEach(
+                        confKey -> {
+                            if (sqlConf.contains(confKey)) {
+                                String currentConfValue = 
sqlConf.getConfString(confKey);
+                                currentConfValues.put(confKey, 
currentConfValue);
+                            }
+                        });
+
+        conf.forEach(
+                (confKey, confValue) -> {
+                    if (SQLConf.isStaticConfigKey(confKey)) {
+                        throw new RuntimeException("Cannot modify the value of 
a static config: " + confKey);
+                    }
+                    sqlConf.setConfString(confKey, confValue);
+                });
+
+        try {
+            action.invoke();
+        } finally {
+            conf.forEach(
+                    (confKey, confValue) -> {
+                        if (currentConfValues.containsKey(confKey)) {
+                            sqlConf.setConfString(confKey, 
currentConfValues.get(confKey));
+                        } else {
+                            sqlConf.unsetConf(confKey);
+                        }
+                    });
+        }
+    }
+
+    protected Dataset<Row> jsonToDF(String schema, String... records) {
+        Dataset<String> jsonDF = 
spark.createDataset(ImmutableList.copyOf(records), Encoders.STRING());
+        return spark.read().schema(schema).json(jsonDF);
+    }
+
+    protected void append(String table, String... jsonRecords) {
+        try {
+            String schema = spark.table(table).schema().toDDL();
+            Dataset<Row> df = jsonToDF(schema, jsonRecords);
+            df.coalesce(1).writeTo(table).append();
+        } catch (NoSuchTableException e) {
+            throw new RuntimeException("Failed to write data", e);
+        }
+    }
+
+    protected String tablePropsAsString(Map<String, String> tableProps) {
+        StringBuilder stringBuilder = new StringBuilder();
+
+        for (Map.Entry<String, String> property : tableProps.entrySet()) {
+            if (stringBuilder.length() > 0) {
+                stringBuilder.append(", ");
+            }
+            stringBuilder.append(String.format("'%s' '%s'", property.getKey(), 
property.getValue()));
+        }
+
+        return stringBuilder.toString();
+    }
+
+    protected SparkPlan executeAndKeepPlan(String query, Object... args) {
+        return executeAndKeepPlan(() -> sql(query, args));
+    }
+
+    protected SparkPlan executeAndKeepPlan(Action action) {
+        AtomicReference<SparkPlan> executedPlanRef = new AtomicReference<>();
+
+        QueryExecutionListener listener =
+                new QueryExecutionListener() {
+                    @Override
+                    public void onSuccess(String funcName, QueryExecution qe, 
long durationNs) {
+                        executedPlanRef.set(qe.executedPlan());
+                    }
+
+                    @Override
+                    public void onFailure(String funcName, QueryExecution qe, 
Exception exception) {}
+                };
+
+        spark.listenerManager().register(listener);
+
+        action.invoke();
+
+        try {
+            spark.sparkContext().listenerBus().waitUntilEmpty();
+        } catch (TimeoutException e) {
+            throw new RuntimeException("Timeout while waiting for processing 
events", e);
+        }
+
+        SparkPlan executedPlan = executedPlanRef.get();
+        if (executedPlan instanceof AdaptiveSparkPlanExec) {
+            return ((AdaptiveSparkPlanExec) executedPlan).executedPlan();
+        } else {
+            return executedPlan;
+        }
+    }
+
+    @FunctionalInterface
+    protected interface Action {
+        void invoke();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to