This is an automated email from the ASF dual-hosted git repository. honahx pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push: new e05d6ae09 Add Integration tests for Delta tables for Spark Client (#1500) e05d6ae09 is described below commit e05d6ae097db4cfcf2c65f7a90e67e03497ee837 Author: gh-yzou <167037035+gh-y...@users.noreply.github.com> AuthorDate: Tue May 6 11:55:52 2025 -0700 Add Integration tests for Delta tables for Spark Client (#1500) --- plugins/spark/README.md | 2 +- plugins/spark/v3.5/integration/build.gradle.kts | 11 +- .../polaris/spark/quarkus/it/SparkDeltaIT.java | 255 +++++++++++++++++++++ .../apache/polaris/spark/quarkus/it/SparkIT.java | 108 ++++++++- .../spark/quarkus/it/SparkIntegrationBase.java | 48 ++-- .../integration/src/intTest/resources/logback.xml | 38 +++ .../org/apache/polaris/spark/SparkCatalog.java | 19 +- .../polaris/spark/utils/PolarisCatalogUtils.java | 10 + .../in-dev/unreleased/polaris-spark-client.md | 2 +- 9 files changed, 453 insertions(+), 40 deletions(-) diff --git a/plugins/spark/README.md b/plugins/spark/README.md index 07fa8ed86..60978fe17 100644 --- a/plugins/spark/README.md +++ b/plugins/spark/README.md @@ -96,5 +96,5 @@ Following describes the current functionality limitations of the Polaris Spark c is also not supported, since it relies on the CTAS support. 2) Create a Delta table without explicit location is not supported. 3) Rename a Delta table is not supported. -4) ALTER TABLE ... SET LOCATION/SET FILEFORMAT/ADD PARTITION is not supported for DELTA table. +4) ALTER TABLE ... SET LOCATION is not supported for DELTA table. 5) For other non-Iceberg tables like csv, it is not supported today. diff --git a/plugins/spark/v3.5/integration/build.gradle.kts b/plugins/spark/v3.5/integration/build.gradle.kts index 26841f356..b259f528a 100644 --- a/plugins/spark/v3.5/integration/build.gradle.kts +++ b/plugins/spark/v3.5/integration/build.gradle.kts @@ -49,12 +49,19 @@ dependencies { testImplementation(project(":polaris-spark-${sparkMajorVersion}_${scalaVersion}")) testImplementation("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") { - // exclude log4j dependencies + // exclude log4j dependencies. Explicit dependencies for the log4j libraries are + // enforced below to ensure the version compatibility exclude("org.apache.logging.log4j", "log4j-slf4j2-impl") - exclude("org.apache.logging.log4j", "log4j-api") exclude("org.apache.logging.log4j", "log4j-1.2-api") + exclude("org.apache.logging.log4j", "log4j-core") exclude("org.slf4j", "jul-to-slf4j") } + // enforce the usage of log4j 2.24.3. This is for the log4j-api compatibility + // of spark-sql dependency + testRuntimeOnly("org.apache.logging.log4j:log4j-core:2.24.3") + testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j2-impl:2.24.3") + + testImplementation("io.delta:delta-spark_${scalaVersion}:3.3.1") testImplementation(platform(libs.jackson.bom)) testImplementation("com.fasterxml.jackson.core:jackson-annotations") diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkDeltaIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkDeltaIT.java new file mode 100644 index 000000000..df5b478be --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkDeltaIT.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.quarkus.it; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.quarkus.test.junit.QuarkusIntegrationTest; +import java.io.File; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.polaris.service.it.env.IntegrationTestsHelper; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.delta.DeltaAnalysisException; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +@QuarkusIntegrationTest +public class SparkDeltaIT extends SparkIntegrationBase { + private String defaultNs; + private String tableRootDir; + + private String getTableLocation(String tableName) { + return String.format("%s/%s", tableRootDir, tableName); + } + + private String getTableNameWithRandomSuffix() { + return generateName("deltatb"); + } + + @BeforeEach + public void createDefaultResources(@TempDir Path tempDir) { + spark.sparkContext().setLogLevel("WARN"); + defaultNs = generateName("delta"); + // create a default namespace + sql("CREATE NAMESPACE %s", defaultNs); + sql("USE NAMESPACE %s", defaultNs); + tableRootDir = + IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve(defaultNs).getPath(); + } + + @AfterEach + public void cleanupDeltaData() { + // clean up delta data + File dirToDelete = new File(tableRootDir); + FileUtils.deleteQuietly(dirToDelete); + sql("DROP NAMESPACE %s", defaultNs); + } + + @Test + public void testBasicTableOperations() { + // create a regular delta table + String deltatb1 = "deltatb1"; + sql( + "CREATE TABLE %s (id INT, name STRING) USING DELTA LOCATION '%s'", + deltatb1, getTableLocation(deltatb1)); + sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", deltatb1); + List<Object[]> results = sql("SELECT * FROM %s WHERE id > 1 ORDER BY id DESC", deltatb1); + assertThat(results.size()).isEqualTo(1); + assertThat(results.get(0)).isEqualTo(new Object[] {2, "bob"}); + + // create a detla table with partition + String deltatb2 = "deltatb2"; + sql( + "CREATE TABLE %s (name String, age INT, country STRING) USING DELTA PARTITIONED BY (country) LOCATION '%s'", + deltatb2, getTableLocation(deltatb2)); + sql( + "INSERT INTO %s VALUES ('anna', 10, 'US'), ('james', 32, 'US'), ('yan', 16, 'CHINA')", + deltatb2); + results = sql("SELECT name, country FROM %s ORDER BY age", deltatb2); + assertThat(results.size()).isEqualTo(3); + assertThat(results.get(0)).isEqualTo(new Object[] {"anna", "US"}); + assertThat(results.get(1)).isEqualTo(new Object[] {"yan", "CHINA"}); + assertThat(results.get(2)).isEqualTo(new Object[] {"james", "US"}); + + // verify the partition dir is created + List<String> subDirs = listDirs(getTableLocation(deltatb2)); + assertThat(subDirs).contains("_delta_log", "country=CHINA", "country=US"); + + // test listTables + List<Object[]> tables = sql("SHOW TABLES"); + assertThat(tables.size()).isEqualTo(2); + assertThat(tables) + .contains( + new Object[] {defaultNs, deltatb1, false}, new Object[] {defaultNs, deltatb2, false}); + + sql("DROP TABLE %s", deltatb1); + sql("DROP TABLE %s", deltatb2); + tables = sql("SHOW TABLES"); + assertThat(tables.size()).isEqualTo(0); + } + + @Test + public void testAlterOperations() { + String deltatb = getTableNameWithRandomSuffix(); + sql( + "CREATE TABLE %s (id INT, name STRING) USING DELTA LOCATION '%s'", + deltatb, getTableLocation(deltatb)); + sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", deltatb); + + // test alter columns + // add two new columns to the table + sql("Alter TABLE %s ADD COLUMNS (city STRING, age INT)", deltatb); + // add one more row to the table + sql("INSERT INTO %s VALUES (3, 'john', 'SFO', 20)", deltatb); + // verify the table now have 4 columns with correct result + List<Object[]> results = sql("SELECT * FROM %s ORDER BY id", deltatb); + assertThat(results.size()).isEqualTo(3); + assertThat(results).contains(new Object[] {1, "anna", null, null}); + assertThat(results).contains(new Object[] {2, "bob", null, null}); + assertThat(results).contains(new Object[] {3, "john", "SFO", 20}); + + // drop and rename column require set the delta.columnMapping property + sql("ALTER TABLE %s SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name')", deltatb); + // drop column age + sql("Alter TABLE %s DROP COLUMN age", deltatb); + // verify the table now have 3 columns with correct result + results = sql("SELECT * FROM %s ORDER BY id", deltatb); + assertThat(results.size()).isEqualTo(3); + assertThat(results).contains(new Object[] {1, "anna", null}); + assertThat(results).contains(new Object[] {2, "bob", null}); + assertThat(results).contains(new Object[] {3, "john", "SFO"}); + + // rename column city to address + sql("Alter TABLE %s RENAME COLUMN city TO address", deltatb); + // verify column address exists + results = sql("SELECT id, address FROM %s ORDER BY id", deltatb); + assertThat(results.size()).isEqualTo(3); + assertThat(results).contains(new Object[] {1, null}); + assertThat(results).contains(new Object[] {2, null}); + assertThat(results).contains(new Object[] {3, "SFO"}); + + // test alter properties + sql( + "ALTER TABLE %s SET TBLPROPERTIES ('description' = 'people table', 'test-owner' = 'test-user')", + deltatb); + List<Object[]> tableInfo = sql("DESCRIBE TABLE EXTENDED %s", deltatb); + // find the table properties result + String properties = null; + for (Object[] info : tableInfo) { + if (info[0].equals("Table Properties")) { + properties = (String) info[1]; + break; + } + } + assertThat(properties).contains("description=people table,test-owner=test-user"); + sql("DROP TABLE %s", deltatb); + } + + @Test + public void testUnsupportedAlterTableOperations() { + String deltatb = getTableNameWithRandomSuffix(); + sql( + "CREATE TABLE %s (name String, age INT, country STRING) USING DELTA PARTITIONED BY (country) LOCATION '%s'", + deltatb, getTableLocation(deltatb)); + + // ALTER TABLE ... RENAME TO ... fails + assertThatThrownBy(() -> sql("ALTER TABLE %s RENAME TO new_delta", deltatb)) + .isInstanceOf(UnsupportedOperationException.class); + + // ALTER TABLE ... SET LOCATION ... fails + assertThatThrownBy(() -> sql("ALTER TABLE %s SET LOCATION '/tmp/new/path'", deltatb)) + .isInstanceOf(DeltaAnalysisException.class); + + sql("DROP TABLE %s", deltatb); + } + + @Test + public void testUnsupportedTableCreateOperations() { + String deltatb = getTableNameWithRandomSuffix(); + // create delta table with no location + assertThatThrownBy(() -> sql("CREATE TABLE %s (id INT, name STRING) USING DELTA", deltatb)) + .isInstanceOf(UnsupportedOperationException.class); + + // CTAS fails + assertThatThrownBy( + () -> + sql( + "CREATE TABLE %s USING DELTA LOCATION '%s' AS SELECT 1 AS id", + deltatb, getTableLocation(deltatb))) + .isInstanceOf(IllegalArgumentException.class); + } + + @Test + public void testDataframeSaveOperations() { + List<Row> data = Arrays.asList(RowFactory.create("Alice", 30), RowFactory.create("Bob", 25)); + StructType schema = + new StructType( + new StructField[] { + new StructField("name", DataTypes.StringType, false, Metadata.empty()), + new StructField("age", DataTypes.IntegerType, false, Metadata.empty()) + }); + Dataset<Row> df = spark.createDataFrame(data, schema); + + String deltatb = getTableNameWithRandomSuffix(); + // saveAsTable requires support for delta requires CTAS support for third party catalog + // in delta catalog, which is currently not supported. + assertThatThrownBy( + () -> + df.write() + .format("delta") + .option("path", getTableLocation(deltatb)) + .saveAsTable(deltatb)) + .isInstanceOf(IllegalArgumentException.class); + + // verify regular dataframe saving still works + df.write().format("delta").save(getTableLocation(deltatb)); + + // verify the partition dir is created + List<String> subDirs = listDirs(getTableLocation(deltatb)); + assertThat(subDirs).contains("_delta_log"); + + // verify we can create a table out of the exising delta location + sql("CREATE TABLE %s USING DELTA LOCATION '%s'", deltatb, getTableLocation(deltatb)); + List<Object[]> tables = sql("SHOW TABLES"); + assertThat(tables.size()).isEqualTo(1); + assertThat(tables).contains(new Object[] {defaultNs, deltatb, false}); + + sql("INSERT INTO %s VALUES ('Anna', 11)", deltatb); + + List<Object[]> results = sql("SELECT * FROM %s ORDER BY name", deltatb); + assertThat(results.size()).isEqualTo(3); + assertThat(results.get(0)).isEqualTo(new Object[] {"Alice", 30}); + assertThat(results.get(1)).isEqualTo(new Object[] {"Anna", 11}); + assertThat(results.get(2)).isEqualTo(new Object[] {"Bob", 25}); + + sql("DROP TABLE %s", deltatb); + } +} diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java index f9af2609d..a4e060a52 100644 --- a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java @@ -22,8 +22,13 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import io.quarkus.test.junit.QuarkusIntegrationTest; +import java.io.File; +import java.nio.file.Path; import java.util.List; +import org.apache.commons.io.FileUtils; +import org.apache.polaris.service.it.env.IntegrationTestsHelper; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; @QuarkusIntegrationTest public class SparkIT extends SparkIntegrationBase { @@ -64,7 +69,7 @@ public class SparkIT extends SparkIntegrationBase { @Test public void testCreatDropView() { - String namespace = "ns"; + String namespace = generateName("ns"); // create namespace ns sql("CREATE NAMESPACE %s", namespace); sql("USE %s", namespace); @@ -88,23 +93,112 @@ public class SparkIT extends SparkIntegrationBase { sql("DROP VIEW %s", view2Name); views = sql("SHOW VIEWS"); assertThat(views.size()).isEqualTo(0); + + sql("DROP NAMESPACE %s", namespace); } @Test - public void renameView() { - sql("CREATE NAMESPACE ns"); - sql("USE ns"); + public void renameIcebergViewAndTable() { + String namespace = generateName("ns"); + sql("CREATE NAMESPACE %s", namespace); + sql("USE %s", namespace); + // create one view and one table String viewName = "originalView"; - String renamedView = "renamedView"; sql("CREATE VIEW %s AS SELECT 1 AS id", viewName); + + String icebergTable = "iceberg_table"; + sql("CREATE TABLE %s (col1 int, col2 string)", icebergTable); + + // verify view and table is showing correctly List<Object[]> views = sql("SHOW VIEWS"); assertThat(views.size()).isEqualTo(1); - assertThat(views).contains(new Object[] {"ns", viewName, false}); + assertThat(views).contains(new Object[] {namespace, viewName, false}); + + List<Object[]> tables = sql("SHOW TABLES"); + assertThat(tables.size()).isEqualTo(1); + assertThat(tables).contains(new Object[] {namespace, icebergTable, false}); + // rename the view + String renamedView = "renamedView"; sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView); views = sql("SHOW VIEWS"); assertThat(views.size()).isEqualTo(1); - assertThat(views).contains(new Object[] {"ns", renamedView, false}); + assertThat(views).contains(new Object[] {namespace, renamedView, false}); + + // rename the table + String newIcebergTable = "iceberg_table_new"; + sql("ALTER TABLE %s RENAME TO %s", icebergTable, newIcebergTable); + tables = sql("SHOW TABLES"); + assertThat(tables.size()).isEqualTo(1); + assertThat(tables).contains(new Object[] {namespace, newIcebergTable, false}); + + // clean up the resources + sql("DROP VIEW %s", renamedView); + sql("DROP TABLE %s", newIcebergTable); + sql("DROP NAMESPACE %s", namespace); + } + + @Test + public void testMixedTableAndViews(@TempDir Path tempDir) { + String namespace = generateName("ns"); + sql("CREATE NAMESPACE %s", namespace); + sql("USE %s", namespace); + + // create one iceberg table, iceberg view and one delta table + String icebergTable = "icebergtb"; + sql("CREATE TABLE %s (col1 int, col2 String)", icebergTable); + sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b')", icebergTable); + + String viewName = "icebergview"; + sql("CREATE VIEW %s AS SELECT col1 + 2 AS col1, col2 FROM %s", viewName, icebergTable); + + String deltaTable = "deltatb"; + String deltaDir = + IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve(namespace).getPath(); + sql( + "CREATE TABLE %s (col1 int, col2 int) using delta location '%s/%s'", + deltaTable, deltaDir, deltaTable); + sql("INSERT INTO %s VALUES (1, 3), (2, 5), (11, 20)", deltaTable); + // join the iceberg and delta table + List<Object[]> joinResult = + sql( + "SELECT icebergtb.col1 as id, icebergtb.col2 as str_col, deltatb.col2 as int_col from icebergtb inner join deltatb on icebergtb.col1 = deltatb.col1 order by id"); + assertThat(joinResult.get(0)).isEqualTo(new Object[] {1, "a", 3}); + assertThat(joinResult.get(1)).isEqualTo(new Object[] {2, "b", 5}); + + // show tables shows all tables + List<Object[]> tables = sql("SHOW TABLES"); + assertThat(tables.size()).isEqualTo(2); + assertThat(tables) + .contains( + new Object[] {namespace, icebergTable, false}, + new Object[] {namespace, deltaTable, false}); + + // verify the table and view content + List<Object[]> results = sql("SELECT * FROM %s ORDER BY col1", icebergTable); + assertThat(results.size()).isEqualTo(2); + assertThat(results.get(0)).isEqualTo(new Object[] {1, "a"}); + assertThat(results.get(1)).isEqualTo(new Object[] {2, "b"}); + + // verify the table and view content + results = sql("SELECT * FROM %s ORDER BY col1", viewName); + assertThat(results.size()).isEqualTo(2); + assertThat(results.get(0)).isEqualTo(new Object[] {3, "a"}); + assertThat(results.get(1)).isEqualTo(new Object[] {4, "b"}); + + List<Object[]> views = sql("SHOW VIEWS"); + assertThat(views.size()).isEqualTo(1); + assertThat(views).contains(new Object[] {namespace, viewName, false}); + + // drop views and tables + sql("DROP TABLE %s", icebergTable); + sql("DROP TABLE %s", deltaTable); + sql("DROP VIEW %s", viewName); + sql("DROP NAMESPACE %s", namespace); + + // clean up delta directory + File dirToDelete = new File(deltaDir); + FileUtils.deleteQuietly(dirToDelete); } } diff --git a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java index b5006d6a7..be456716c 100644 --- a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java +++ b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java @@ -20,9 +20,14 @@ package org.apache.polaris.spark.quarkus.it; import com.google.common.collect.ImmutableList; import com.google.errorprone.annotations.FormatMethod; +import java.io.File; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.apache.commons.io.FileUtils; +import org.apache.commons.io.filefilter.DirectoryFileFilter; +import org.apache.commons.io.filefilter.FalseFileFilter; import org.apache.polaris.service.it.ext.PolarisSparkIntegrationTestBase; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; @@ -32,12 +37,14 @@ public abstract class SparkIntegrationBase extends PolarisSparkIntegrationTestBa @Override protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String catalogName) { return builder + .config( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension") + .config( + "spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") .config( String.format("spark.sql.catalog.%s", catalogName), "org.apache.polaris.spark.SparkCatalog") - .config( - "spark.sql.extensions", - "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .config("spark.sql.warehouse.dir", warehouseDir.toString()) .config(String.format("spark.sql.catalog.%s.type", catalogName), "rest") .config( @@ -54,26 +61,6 @@ public abstract class SparkIntegrationBase extends PolarisSparkIntegrationTestBa .config(String.format("spark.sql.catalog.%s.s3.region", catalogName), "us-west-2"); } - @Override - protected void cleanupCatalog(String catalogName) { - onSpark("USE " + catalogName); - List<Row> namespaces = onSpark("SHOW NAMESPACES").collectAsList(); - for (Row namespace : namespaces) { - // TODO: once all table operations are supported, remove the override of this function - // List<Row> tables = onSpark("SHOW TABLES IN " + namespace.getString(0)).collectAsList(); - // for (Row table : tables) { - // onSpark("DROP TABLE " + namespace.getString(0) + "." + table.getString(1)); - // } - List<Row> views = onSpark("SHOW VIEWS IN " + namespace.getString(0)).collectAsList(); - for (Row view : views) { - onSpark("DROP VIEW " + namespace.getString(0) + "." + view.getString(1)); - } - onSpark("DROP NAMESPACE " + namespace.getString(0)); - } - - managementApi.deleteCatalog(catalogName); - } - @FormatMethod protected List<Object[]> sql(String query, Object... args) { List<Row> rows = spark.sql(String.format(query, args)).collectAsList(); @@ -108,4 +95,19 @@ public abstract class SparkIntegrationBase extends PolarisSparkIntegrationTestBa }) .toArray(Object[]::new); } + + /** List the name of directories under a given path non-recursively. */ + protected List<String> listDirs(String path) { + File directory = new File(path); + return FileUtils.listFilesAndDirs( + directory, FalseFileFilter.INSTANCE, DirectoryFileFilter.DIRECTORY) + .stream() + .map(File::getName) + .toList(); + } + + /** Generate a string name with given prefix and a random suffix */ + protected String generateName(String prefix) { + return prefix + "_" + UUID.randomUUID().toString().replaceAll("-", ""); + } } diff --git a/plugins/spark/v3.5/integration/src/intTest/resources/logback.xml b/plugins/spark/v3.5/integration/src/intTest/resources/logback.xml new file mode 100644 index 000000000..5ec1efd5f --- /dev/null +++ b/plugins/spark/v3.5/integration/src/intTest/resources/logback.xml @@ -0,0 +1,38 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<!-- +Spark by default set the logging at DEBUG level, which dumps a lot of code details in +the Intelij console, which impacts the IDE performance significantly when running the +tests. This configuration allows only log back the ERROR logs for the IDE, you can comment +out the configuration if you would like ot see all spark debug log during the run. +--> +<configuration> + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder"> + <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level - %msg%n</pattern> + </encoder> + </appender> + + <root level="ERROR"> + <appender-ref ref="CONSOLE"/> + </root> +</configuration> diff --git a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java index e88628a70..9d8d1163e 100644 --- a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java +++ b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java @@ -151,13 +151,20 @@ public class SparkCatalog String provider = properties.get(PolarisCatalogUtils.TABLE_PROVIDER_KEY); if (PolarisCatalogUtils.useIceberg(provider)) { return this.icebergsSparkCatalog.createTable(ident, schema, transforms, properties); - } else if (PolarisCatalogUtils.useDelta(provider)) { - // For delta table, we load the delta catalog to help dealing with the - // delta log creation. - TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog); - return deltaCatalog.createTable(ident, schema, transforms, properties); } else { - return this.polarisSparkCatalog.createTable(ident, schema, transforms, properties); + if (PolarisCatalogUtils.isTableWithSparkManagedLocation(properties)) { + throw new UnsupportedOperationException( + "Create table without location key is not supported by Polaris. Please provide location or path on table creation."); + } + + if (PolarisCatalogUtils.useDelta(provider)) { + // For delta table, we load the delta catalog to help dealing with the + // delta log creation. + TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog); + return deltaCatalog.createTable(ident, schema, transforms, properties); + } else { + return this.polarisSparkCatalog.createTable(ident, schema, transforms, properties); + } } } diff --git a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java index 01a4af45d..8dac78b23 100644 --- a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java +++ b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java @@ -50,6 +50,16 @@ public class PolarisCatalogUtils { return "delta".equalsIgnoreCase(provider); } + /** + * For tables whose location is manged by Spark Session Catalog, there will be no location or path + * in the properties. + */ + public static boolean isTableWithSparkManagedLocation(Map<String, String> properties) { + boolean hasLocationClause = properties.containsKey(TableCatalog.PROP_LOCATION); + boolean hasPathClause = properties.containsKey(TABLE_PATH_KEY); + return !hasLocationClause && !hasPathClause; + } + /** * Load spark table using DataSourceV2. * diff --git a/site/content/in-dev/unreleased/polaris-spark-client.md b/site/content/in-dev/unreleased/polaris-spark-client.md index 46796cdc6..9466020a4 100644 --- a/site/content/in-dev/unreleased/polaris-spark-client.md +++ b/site/content/in-dev/unreleased/polaris-spark-client.md @@ -126,5 +126,5 @@ The Polaris Spark client has the following functionality limitations: is also not supported, since it relies on the CTAS support. 2) Create a Delta table without explicit location is not supported. 3) Rename a Delta table is not supported. -4) ALTER TABLE ... SET LOCATION/SET FILEFORMAT/ADD PARTITION is not supported for DELTA table. +4) ALTER TABLE ... SET LOCATION is not supported for DELTA table. 5) For other non-Iceberg tables like csv, it is not supported.