This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 81b3310ab4 Spark 3.5: Support preserving schema nullability in CTAS
and RTAS (#10074)
81b3310ab4 is described below
commit 81b3310ab469408022cc14af51257b7e8b36614f
Author: Yujiang Zhong <[email protected]>
AuthorDate: Sat Apr 13 05:12:28 2024 +0800
Spark 3.5: Support preserving schema nullability in CTAS and RTAS (#10074)
---
docs/docs/spark-configuration.md | 1 +
.../java/org/apache/iceberg/spark/BaseCatalog.java | 20 ++++
.../org/apache/iceberg/spark/SparkCatalog.java | 2 +
.../apache/iceberg/spark/SparkSessionCatalog.java | 2 +
.../iceberg/spark/TestSparkCatalogOperations.java | 102 +++++++++++++++++++++
5 files changed, 127 insertions(+)
diff --git a/docs/docs/spark-configuration.md b/docs/docs/spark-configuration.md
index 6ac4f1e9c8..9ff7396498 100644
--- a/docs/docs/spark-configuration.md
+++ b/docs/docs/spark-configuration.md
@@ -77,6 +77,7 @@ Both catalogs are configured using properties nested under
the catalog name. Com
| spark.sql.catalog._catalog-name_.cache.expiration-interval-ms | `30000` (30
seconds) | Duration after which cached catalog entries are expired; Only
effective if `cache-enabled` is `true`. `-1` disables cache expiration and `0`
disables caching entirely, irrespective of `cache-enabled`. Default is `30000`
(30 seconds) |
| spark.sql.catalog._catalog-name_.table-default._propertyKey_ |
| Default Iceberg table property value for property key
_propertyKey_, which will be set on tables created by this catalog if not
overridden
|
| spark.sql.catalog._catalog-name_.table-override._propertyKey_ |
| Enforced Iceberg table property value for property key
_propertyKey_, which cannot be overridden by user
|
+| spark.sql.catalog._catalog-name_.use-nullable-query-schema | `true` or
`false` | Whether to preserve fields' nullability when creating the table using
CTAS and RTAS. If set to `true`, all fields will be marked as nullable. If set
to `false`, fields' nullability will be preserved. The default value is `true`.
Available in Spark 3.5 and above. |
Additional properties can be found in common [catalog
configuration](configuration.md#catalog-properties).
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
index 38f15a4295..2082c05846 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/BaseCatalog.java
@@ -21,12 +21,14 @@ package org.apache.iceberg.spark;
import org.apache.iceberg.spark.procedures.SparkProcedures;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.spark.source.HasIcebergCatalog;
+import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.sql.catalyst.analysis.NoSuchProcedureException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.iceberg.catalog.Procedure;
import org.apache.spark.sql.connector.iceberg.catalog.ProcedureCatalog;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
abstract class BaseCatalog
implements StagingTableCatalog,
@@ -34,6 +36,10 @@ abstract class BaseCatalog
SupportsNamespaces,
HasIcebergCatalog,
SupportsFunctions {
+ private static final String USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS =
"use-nullable-query-schema";
+ private static final boolean USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT =
true;
+
+ private boolean useNullableQuerySchema =
USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT;
@Override
public Procedure loadProcedure(Identifier ident) throws
NoSuchProcedureException {
@@ -66,6 +72,20 @@ abstract class BaseCatalog
return namespaceExists(namespace);
}
+ @Override
+ public void initialize(String name, CaseInsensitiveStringMap options) {
+ this.useNullableQuerySchema =
+ PropertyUtil.propertyAsBoolean(
+ options,
+ USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS,
+ USE_NULLABLE_QUERY_SCHEMA_CTAS_RTAS_DEFAULT);
+ }
+
+ @Override
+ public boolean useNullableQuerySchema() {
+ return useNullableQuerySchema;
+ }
+
private static boolean isSystemNamespace(String[] namespace) {
return namespace.length == 1 && namespace[0].equalsIgnoreCase("system");
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
index 0be9fd9484..0c36159862 100644
--- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
+++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java
@@ -726,6 +726,8 @@ public class SparkCatalog extends BaseCatalog
@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
+ super.initialize(name, options);
+
this.cacheEnabled =
PropertyUtil.propertyAsBoolean(
options, CatalogProperties.CACHE_ENABLED,
CatalogProperties.CACHE_ENABLED_DEFAULT);
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
index 33384e3eff..fa3f1fbe4b 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSessionCatalog.java
@@ -304,6 +304,8 @@ public class SparkSessionCatalog<T extends TableCatalog &
FunctionCatalog & Supp
@Override
public final void initialize(String name, CaseInsensitiveStringMap options) {
+ super.initialize(name, options);
+
if (options.containsKey(CatalogUtil.ICEBERG_CATALOG_TYPE)
&& options
.get(CatalogUtil.ICEBERG_CATALOG_TYPE)
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkCatalogOperations.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkCatalogOperations.java
index 0f29faf274..d0860ff014 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkCatalogOperations.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkCatalogOperations.java
@@ -20,8 +20,13 @@ package org.apache.iceberg.spark;
import static org.assertj.core.api.Assertions.assertThat;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.iceberg.Parameters;
import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.Table;
@@ -33,6 +38,47 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
public class TestSparkCatalogOperations extends CatalogTestBase {
+ private static boolean useNullableQuerySchema =
ThreadLocalRandom.current().nextBoolean();
+
+ @Parameters(name = "catalogName = {0}, implementation = {1}, config = {2}")
+ protected static Object[][] parameters() {
+ return new Object[][] {
+ {
+ SparkCatalogConfig.HIVE.catalogName(),
+ SparkCatalogConfig.HIVE.implementation(),
+ ImmutableMap.of(
+ "type", "hive",
+ "default-namespace", "default",
+ "use-nullable-query-schema",
Boolean.toString(useNullableQuerySchema))
+ },
+ {
+ SparkCatalogConfig.HADOOP.catalogName(),
+ SparkCatalogConfig.HADOOP.implementation(),
+ ImmutableMap.of(
+ "type",
+ "hadoop",
+ "cache-enabled",
+ "false",
+ "use-nullable-query-schema",
+ Boolean.toString(useNullableQuerySchema))
+ },
+ {
+ SparkCatalogConfig.SPARK.catalogName(),
+ SparkCatalogConfig.SPARK.implementation(),
+ ImmutableMap.of(
+ "type",
+ "hive",
+ "default-namespace",
+ "default",
+ "parquet-enabled",
+ "true",
+ "cache-enabled",
+ "false", // Spark will delete tables using v1, leaving the cache
out of sync
+ "use-nullable-query-schema",
+ Boolean.toString(useNullableQuerySchema)),
+ }
+ };
+ }
@BeforeEach
public void createTable() {
@@ -86,4 +132,60 @@ public class TestSparkCatalogOperations extends
CatalogTestBase {
sql("REFRESH TABLE %s", tableName);
sql("SELECT count(1) FROM %s", tableName);
}
+
+ @TestTemplate
+ public void testCTASUseNullableQuerySchema() {
+ sql("INSERT INTO %s VALUES(1, 'abc'), (2, null)", tableName);
+
+ String ctasTableName = tableName("ctas_table");
+
+ sql("CREATE TABLE %s USING iceberg AS SELECT * FROM %s", ctasTableName,
tableName);
+
+ org.apache.iceberg.Table ctasTable =
+
validationCatalog.loadTable(TableIdentifier.parse("default.ctas_table"));
+
+ Schema expectedSchema =
+ new Schema(
+ useNullableQuerySchema
+ ? Types.NestedField.optional(1, "id", Types.LongType.get())
+ : Types.NestedField.required(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get()));
+
+ assertThat(ctasTable.schema().asStruct())
+ .as("Should have expected schema")
+ .isEqualTo(expectedSchema.asStruct());
+
+ sql("DROP TABLE IF EXISTS %s", ctasTableName);
+ }
+
+ @TestTemplate
+ public void testRTASUseNullableQuerySchema() {
+ sql("INSERT INTO %s VALUES(1, 'abc'), (2, null)", tableName);
+
+ String rtasTableName = tableName("rtas_table");
+ sql("CREATE TABLE %s (id bigint NOT NULL, data string) USING iceberg",
rtasTableName);
+
+ sql("REPLACE TABLE %s USING iceberg AS SELECT * FROM %s", rtasTableName,
tableName);
+
+ org.apache.iceberg.Table rtasTable =
+
validationCatalog.loadTable(TableIdentifier.parse("default.rtas_table"));
+
+ Schema expectedSchema =
+ new Schema(
+ useNullableQuerySchema
+ ? Types.NestedField.optional(1, "id", Types.LongType.get())
+ : Types.NestedField.required(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get()));
+
+ assertThat(rtasTable.schema().asStruct())
+ .as("Should have expected schema")
+ .isEqualTo(expectedSchema.asStruct());
+
+ assertEquals(
+ "Should have rows matching the source table",
+ sql("SELECT * FROM %s ORDER BY id", tableName),
+ sql("SELECT * FROM %s ORDER BY id", rtasTableName));
+
+ sql("DROP TABLE IF EXISTS %s", rtasTableName);
+ }
}