This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 81b3310ab4 Spark 3.5: Support preserving schema nullability in CTAS 
and RTAS (#10074)
81b3310ab4 is described below

commit 81b3310ab469408022cc14af51257b7e8b36614f
Author: Yujiang Zhong <[email protected]>
AuthorDate: Sat Apr 13 05:12:28 2024 +0800

    Spark 3.5: Support preserving schema nullability in CTAS and RTAS (#10074)
---
 docs/docs/spark-configuration.md                   |   1 +
 .../java/org/apache/iceberg/spark/BaseCatalog.java |  20 ++++
 .../org/apache/iceberg/spark/SparkCatalog.java     |   2 +
 .../apache/iceberg/spark/SparkSessionCatalog.java  |   2 +
 .../iceberg/spark/TestSparkCatalogOperations.java  | 102 +++++++++++++++++++++
 5 files changed, 127 insertions(+)

diff --git a/docs/docs/spark-configuration.md b/docs/docs/spark-configuration.md
index 6ac4f1e9c8..9ff7396498 100644
--- a/docs/docs/spark-configuration.md
+++ b/docs/docs/spark-configuration.md
@@ -77,6 +77,7 @@ Both catalogs are configured using properties nested under 
the catalog name. Com
 | spark.sql.catalog._catalog-name_.cache.expiration-interval-ms | `30000` (30 
seconds) | Duration after which cached catalog entries are expired; Only 
effective if `cache-enabled` is `true`. `-1` disables cache expiration and `0` 
disables caching entirely, irrespective of `cache-enabled`. Default is `30000` 
(30 seconds) |
 | spark.sql.catalog._catalog-name_.table-default._propertyKey_  |              
                 | Default Iceberg table property value for property key 
_propertyKey_, which will be set on tables created by this catalog if not 
overridden                                                                      
                         |
 | spark.sql.catalog._catalog-name_.table-override._propertyKey_ |              
                 | Enforced Iceberg table property value for property key 
_propertyKey_, which cannot be overridden by user                               
                                                                                
                  |
+| spark.sql.catalog._catalog-name_.use-nullable-query-schema | `true` or 
`false` | Whether to preserve fields' nullability when creating the table using 
CTAS and RTAS. If set to `true`, all fields will be marked as nullable. If set 
to `false`, fields' nullability will be preserved. The default value is `true`. 
Available in Spark 3.5 and above.   |
 
 Additional properties can be found in common [catalog 
configuration](configuration.md#catalog-properties).
 
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
index 38f15a4295..2082c05846 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
@@ -21,12 +21,14 @@ package org.apache.iceberg.spark;
 import org.apache.iceberg.spark.procedures.SparkProcedures;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.iceberg.spark.source.HasIcebergCatalog;
+import org.apache.iceberg.util.PropertyUtil;
 import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
 import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
 import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
 abstract class BaseCatalog
     implements StagingTableCatalog,
@@ -34,6 +36,10 @@ abstract class BaseCatalog
         SupportsNamespaces,
         HasIcebergCatalog,
         SupportsFunctions {
+  private static final String USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS = 
"use-nullable-query-schema";
+  private static final boolean USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT = 
true;
+
+  private boolean useNullableQuerySchema = 
USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT;
 
   @Override
   public Procedure loadProcedure(Identifier ident) throws 
NoSuchProcedureException {
@@ -66,6 +72,20 @@ abstract class BaseCatalog
     return namespaceExists(namespace);
   }
 
+  @Override
+  public void initialize(String name, CaseInsensitiveStringMap options) {
+    this.useNullableQuerySchema =
+        PropertyUtil.propertyAsBoolean(
+            options,
+            USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS,
+            USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT);
+  }
+
+  @Override
+  public boolean useNullableQuerySchema() {
+    return useNullableQuerySchema;
+  }
+
   private static boolean isSystemNamespace(String[] namespace) {
     return namespace.length == 1 && namespace[0].equalsIgnoreCase("system");
   }
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index 0be9fd9484..0c36159862 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -726,6 +726,8 @@ public class SparkCatalog extends BaseCatalog
 
   @Override
   public final void initialize(String name, CaseInsensitiveStringMap options) {
+    super.initialize(name, options);
+
     this.cacheEnabled =
         PropertyUtil.propertyAsBoolean(
             options, CatalogProperties.CACHE_ENABLED, 
CatalogProperties.CACHE_ENABLED_DEFAULT);
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
index 33384e3eff..fa3f1fbe4b 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
@@ -304,6 +304,8 @@ public class SparkSessionCatalog<T extends TableCatalog & 
FunctionCatalog & Supp
 
   @Override
   public final void initialize(String name, CaseInsensitiveStringMap options) {
+    super.initialize(name, options);
+
     if (options.containsKey(CatalogUtil.ICEBERG_CATALOG_TYPE)
         && options
             .get(CatalogUtil.ICEBERG_CATALOG_TYPE)
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkCatalogOperations.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkCatalogOperations.java
index 0f29faf274..d0860ff014 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkCatalogOperations.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkCatalogOperations.java
@@ -20,8 +20,13 @@ package org.apache.iceberg.spark;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.iceberg.Parameters;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.Table;
@@ -33,6 +38,47 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.TestTemplate;
 
 public class TestSparkCatalogOperations extends CatalogTestBase {
+  private static boolean useNullableQuerySchema = 
ThreadLocalRandom.current().nextBoolean();
+
+  @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
+  protected static Object[][] parameters() {
+    return new Object[][] {
+      {
+        SparkCatalogConfig.HIVE.catalogName(),
+        SparkCatalogConfig.HIVE.implementation(),
+        ImmutableMap.of(
+            "type", "hive",
+            "default-namespace", "default",
+            "use-nullable-query-schema", 
Boolean.toString(useNullableQuerySchema))
+      },
+      {
+        SparkCatalogConfig.HADOOP.catalogName(),
+        SparkCatalogConfig.HADOOP.implementation(),
+        ImmutableMap.of(
+            "type",
+            "hadoop",
+            "cache-enabled",
+            "false",
+            "use-nullable-query-schema",
+            Boolean.toString(useNullableQuerySchema))
+      },
+      {
+        SparkCatalogConfig.SPARK.catalogName(),
+        SparkCatalogConfig.SPARK.implementation(),
+        ImmutableMap.of(
+            "type",
+            "hive",
+            "default-namespace",
+            "default",
+            "parquet-enabled",
+            "true",
+            "cache-enabled",
+            "false", // Spark will delete tables using v1, leaving the cache 
out of sync
+            "use-nullable-query-schema",
+            Boolean.toString(useNullableQuerySchema)),
+      }
+    };
+  }
 
   @BeforeEach
   public void createTable() {
@@ -86,4 +132,60 @@ public class TestSparkCatalogOperations extends 
CatalogTestBase {
     sql("REFRESH TABLE %s", tableName);
     sql("SELECT count(1) FROM %s", tableName);
   }
+
+  @TestTemplate
+  public void testCTASUseNullableQuerySchema() {
+    sql("INSERT INTO %s VALUES(1, 'abc'), (2, null)", tableName);
+
+    String ctasTableName = tableName("ctas_table");
+
+    sql("CREATE TABLE %s USING iceberg AS SELECT * FROM %s", ctasTableName, 
tableName);
+
+    org.apache.iceberg.Table ctasTable =
+        
validationCatalog.loadTable(TableIdentifier.parse("default.ctas_table"));
+
+    Schema expectedSchema =
+        new Schema(
+            useNullableQuerySchema
+                ? Types.NestedField.optional(1, "id", Types.LongType.get())
+                : Types.NestedField.required(1, "id", Types.LongType.get()),
+            Types.NestedField.optional(2, "data", Types.StringType.get()));
+
+    assertThat(ctasTable.schema().asStruct())
+        .as("Should have expected schema")
+        .isEqualTo(expectedSchema.asStruct());
+
+    sql("DROP TABLE IF EXISTS %s", ctasTableName);
+  }
+
+  @TestTemplate
+  public void testRTASUseNullableQuerySchema() {
+    sql("INSERT INTO %s VALUES(1, 'abc'), (2, null)", tableName);
+
+    String rtasTableName = tableName("rtas_table");
+    sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg", 
rtasTableName);
+
+    sql("REPLACE TABLE %s USING iceberg AS SELECT * FROM %s", rtasTableName, 
tableName);
+
+    org.apache.iceberg.Table rtasTable =
+        
validationCatalog.loadTable(TableIdentifier.parse("default.rtas_table"));
+
+    Schema expectedSchema =
+        new Schema(
+            useNullableQuerySchema
+                ? Types.NestedField.optional(1, "id", Types.LongType.get())
+                : Types.NestedField.required(1, "id", Types.LongType.get()),
+            Types.NestedField.optional(2, "data", Types.StringType.get()));
+
+    assertThat(rtasTable.schema().asStruct())
+        .as("Should have expected schema")
+        .isEqualTo(expectedSchema.asStruct());
+
+    assertEquals(
+        "Should have rows matching the source table",
+        sql("SELECT * FROM %s ORDER BY id", tableName),
+        sql("SELECT * FROM %s ORDER BY id", rtasTableName));
+
+    sql("DROP TABLE IF EXISTS %s", rtasTableName);
+  }
 }

Reply via email to