This is an automated email from the ASF dual-hosted git repository.

honahx pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new e05d6ae09 Add Integration tests for Delta tables for Spark Client 
(#1500)
e05d6ae09 is described below

commit e05d6ae097db4cfcf2c65f7a90e67e03497ee837
Author: gh-yzou <167037035+gh-y...@users.noreply.github.com>
AuthorDate: Tue May 6 11:55:52 2025 -0700

    Add Integration tests for Delta tables for Spark Client (#1500)
---
 plugins/spark/README.md                            |   2 +-
 plugins/spark/v3.5/integration/build.gradle.kts    |  11 +-
 .../polaris/spark/quarkus/it/SparkDeltaIT.java     | 255 +++++++++++++++++++++
 .../apache/polaris/spark/quarkus/it/SparkIT.java   | 108 ++++++++-
 .../spark/quarkus/it/SparkIntegrationBase.java     |  48 ++--
 .../integration/src/intTest/resources/logback.xml  |  38 +++
 .../org/apache/polaris/spark/SparkCatalog.java     |  19 +-
 .../polaris/spark/utils/PolarisCatalogUtils.java   |  10 +
 .../in-dev/unreleased/polaris-spark-client.md      |   2 +-
 9 files changed, 453 insertions(+), 40 deletions(-)

diff --git a/plugins/spark/README.md b/plugins/spark/README.md
index 07fa8ed86..60978fe17 100644
--- a/plugins/spark/README.md
+++ b/plugins/spark/README.md
@@ -96,5 +96,5 @@ Following describes the current functionality limitations of 
the Polaris Spark c
    is also not supported, since it relies on the CTAS support.
 2) Create a Delta table without explicit location is not supported.
 3) Rename a Delta table is not supported.
-4) ALTER TABLE ... SET LOCATION/SET FILEFORMAT/ADD PARTITION is not supported 
for DELTA table.
+4) ALTER TABLE ... SET LOCATION is not supported for DELTA table.
 5) For other non-Iceberg tables like csv, it is not supported today.
diff --git a/plugins/spark/v3.5/integration/build.gradle.kts 
b/plugins/spark/v3.5/integration/build.gradle.kts
index 26841f356..b259f528a 100644
--- a/plugins/spark/v3.5/integration/build.gradle.kts
+++ b/plugins/spark/v3.5/integration/build.gradle.kts
@@ -49,12 +49,19 @@ dependencies {
   
testImplementation(project(":polaris-spark-${sparkMajorVersion}_${scalaVersion}"))
 
   
testImplementation("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}")
 {
-    // exclude log4j dependencies
+    // exclude log4j dependencies. Explicit dependencies for the log4j 
libraries are
+    // enforced below to ensure the version compatibility
     exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
-    exclude("org.apache.logging.log4j", "log4j-api")
     exclude("org.apache.logging.log4j", "log4j-1.2-api")
+    exclude("org.apache.logging.log4j", "log4j-core")
     exclude("org.slf4j", "jul-to-slf4j")
   }
+  // enforce the usage of log4j 2.24.3. This is for the log4j-api compatibility
+  // of spark-sql dependency
+  testRuntimeOnly("org.apache.logging.log4j:log4j-core:2.24.3")
+  testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j2-impl:2.24.3")
+
+  testImplementation("io.delta:delta-spark_${scalaVersion}:3.3.1")
 
   testImplementation(platform(libs.jackson.bom))
   testImplementation("com.fasterxml.jackson.core:jackson-annotations")
diff --git 
a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkDeltaIT.java
 
b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkDeltaIT.java
new file mode 100644
index 000000000..df5b478be
--- /dev/null
+++ 
b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkDeltaIT.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.quarkus.it;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import io.quarkus.test.junit.QuarkusIntegrationTest;
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.polaris.service.it.env.IntegrationTestsHelper;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.delta.DeltaAnalysisException;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+@QuarkusIntegrationTest
+public class SparkDeltaIT extends SparkIntegrationBase {
+  private String defaultNs;
+  private String tableRootDir;
+
+  private String getTableLocation(String tableName) {
+    return String.format("%s/%s", tableRootDir, tableName);
+  }
+
+  private String getTableNameWithRandomSuffix() {
+    return generateName("deltatb");
+  }
+
+  @BeforeEach
+  public void createDefaultResources(@TempDir Path tempDir) {
+    spark.sparkContext().setLogLevel("WARN");
+    defaultNs = generateName("delta");
+    // create a default namespace
+    sql("CREATE NAMESPACE %s", defaultNs);
+    sql("USE NAMESPACE %s", defaultNs);
+    tableRootDir =
+        
IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve(defaultNs).getPath();
+  }
+
+  @AfterEach
+  public void cleanupDeltaData() {
+    // clean up delta data
+    File dirToDelete = new File(tableRootDir);
+    FileUtils.deleteQuietly(dirToDelete);
+    sql("DROP NAMESPACE %s", defaultNs);
+  }
+
+  @Test
+  public void testBasicTableOperations() {
+    // create a regular delta table
+    String deltatb1 = "deltatb1";
+    sql(
+        "CREATE TABLE %s (id INT, name STRING) USING DELTA LOCATION '%s'",
+        deltatb1, getTableLocation(deltatb1));
+    sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", deltatb1);
+    List<Object[]> results = sql("SELECT * FROM %s WHERE id > 1 ORDER BY id 
DESC", deltatb1);
+    assertThat(results.size()).isEqualTo(1);
+    assertThat(results.get(0)).isEqualTo(new Object[] {2, "bob"});
+
+    // create a detla table with partition
+    String deltatb2 = "deltatb2";
+    sql(
+        "CREATE TABLE %s (name String, age INT, country STRING) USING DELTA 
PARTITIONED BY (country) LOCATION '%s'",
+        deltatb2, getTableLocation(deltatb2));
+    sql(
+        "INSERT INTO %s VALUES ('anna', 10, 'US'), ('james', 32, 'US'), 
('yan', 16, 'CHINA')",
+        deltatb2);
+    results = sql("SELECT name, country FROM %s ORDER BY age", deltatb2);
+    assertThat(results.size()).isEqualTo(3);
+    assertThat(results.get(0)).isEqualTo(new Object[] {"anna", "US"});
+    assertThat(results.get(1)).isEqualTo(new Object[] {"yan", "CHINA"});
+    assertThat(results.get(2)).isEqualTo(new Object[] {"james", "US"});
+
+    // verify the partition dir is created
+    List<String> subDirs = listDirs(getTableLocation(deltatb2));
+    assertThat(subDirs).contains("_delta_log", "country=CHINA", "country=US");
+
+    // test listTables
+    List<Object[]> tables = sql("SHOW TABLES");
+    assertThat(tables.size()).isEqualTo(2);
+    assertThat(tables)
+        .contains(
+            new Object[] {defaultNs, deltatb1, false}, new Object[] 
{defaultNs, deltatb2, false});
+
+    sql("DROP TABLE %s", deltatb1);
+    sql("DROP TABLE %s", deltatb2);
+    tables = sql("SHOW TABLES");
+    assertThat(tables.size()).isEqualTo(0);
+  }
+
+  @Test
+  public void testAlterOperations() {
+    String deltatb = getTableNameWithRandomSuffix();
+    sql(
+        "CREATE TABLE %s (id INT, name STRING) USING DELTA LOCATION '%s'",
+        deltatb, getTableLocation(deltatb));
+    sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", deltatb);
+
+    // test alter columns
+    // add two new columns to the table
+    sql("Alter TABLE %s ADD COLUMNS (city STRING, age INT)", deltatb);
+    // add one more row to the table
+    sql("INSERT INTO %s VALUES (3, 'john', 'SFO', 20)", deltatb);
+    // verify the table now have 4 columns with correct result
+    List<Object[]> results = sql("SELECT * FROM %s ORDER BY id", deltatb);
+    assertThat(results.size()).isEqualTo(3);
+    assertThat(results).contains(new Object[] {1, "anna", null, null});
+    assertThat(results).contains(new Object[] {2, "bob", null, null});
+    assertThat(results).contains(new Object[] {3, "john", "SFO", 20});
+
+    // drop and rename column require set the delta.columnMapping property
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('delta.columnMapping.mode' = 
'name')", deltatb);
+    // drop column age
+    sql("Alter TABLE %s DROP COLUMN age", deltatb);
+    // verify the table now have 3 columns with correct result
+    results = sql("SELECT * FROM %s ORDER BY id", deltatb);
+    assertThat(results.size()).isEqualTo(3);
+    assertThat(results).contains(new Object[] {1, "anna", null});
+    assertThat(results).contains(new Object[] {2, "bob", null});
+    assertThat(results).contains(new Object[] {3, "john", "SFO"});
+
+    // rename column city to address
+    sql("Alter TABLE %s RENAME COLUMN city TO address", deltatb);
+    // verify column address exists
+    results = sql("SELECT id, address FROM %s ORDER BY id", deltatb);
+    assertThat(results.size()).isEqualTo(3);
+    assertThat(results).contains(new Object[] {1, null});
+    assertThat(results).contains(new Object[] {2, null});
+    assertThat(results).contains(new Object[] {3, "SFO"});
+
+    // test alter properties
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('description' = 'people table', 
'test-owner' = 'test-user')",
+        deltatb);
+    List<Object[]> tableInfo = sql("DESCRIBE TABLE EXTENDED %s", deltatb);
+    // find the table properties result
+    String properties = null;
+    for (Object[] info : tableInfo) {
+      if (info[0].equals("Table Properties")) {
+        properties = (String) info[1];
+        break;
+      }
+    }
+    assertThat(properties).contains("description=people 
table,test-owner=test-user");
+    sql("DROP TABLE %s", deltatb);
+  }
+
+  @Test
+  public void testUnsupportedAlterTableOperations() {
+    String deltatb = getTableNameWithRandomSuffix();
+    sql(
+        "CREATE TABLE %s (name String, age INT, country STRING) USING DELTA 
PARTITIONED BY (country) LOCATION '%s'",
+        deltatb, getTableLocation(deltatb));
+
+    // ALTER TABLE ... RENAME TO ... fails
+    assertThatThrownBy(() -> sql("ALTER TABLE %s RENAME TO new_delta", 
deltatb))
+        .isInstanceOf(UnsupportedOperationException.class);
+
+    // ALTER TABLE ... SET LOCATION ... fails
+    assertThatThrownBy(() -> sql("ALTER TABLE %s SET LOCATION 
'/tmp/new/path'", deltatb))
+        .isInstanceOf(DeltaAnalysisException.class);
+
+    sql("DROP TABLE %s", deltatb);
+  }
+
+  @Test
+  public void testUnsupportedTableCreateOperations() {
+    String deltatb = getTableNameWithRandomSuffix();
+    // create delta table with no location
+    assertThatThrownBy(() -> sql("CREATE TABLE %s (id INT, name STRING) USING 
DELTA", deltatb))
+        .isInstanceOf(UnsupportedOperationException.class);
+
+    // CTAS fails
+    assertThatThrownBy(
+            () ->
+                sql(
+                    "CREATE TABLE %s USING DELTA LOCATION '%s' AS SELECT 1 AS 
id",
+                    deltatb, getTableLocation(deltatb)))
+        .isInstanceOf(IllegalArgumentException.class);
+  }
+
+  @Test
+  public void testDataframeSaveOperations() {
+    List<Row> data = Arrays.asList(RowFactory.create("Alice", 30), 
RowFactory.create("Bob", 25));
+    StructType schema =
+        new StructType(
+            new StructField[] {
+              new StructField("name", DataTypes.StringType, false, 
Metadata.empty()),
+              new StructField("age", DataTypes.IntegerType, false, 
Metadata.empty())
+            });
+    Dataset<Row> df = spark.createDataFrame(data, schema);
+
+    String deltatb = getTableNameWithRandomSuffix();
+    // saveAsTable requires support for delta requires CTAS support for third 
party catalog
+    // in delta catalog, which is currently not supported.
+    assertThatThrownBy(
+            () ->
+                df.write()
+                    .format("delta")
+                    .option("path", getTableLocation(deltatb))
+                    .saveAsTable(deltatb))
+        .isInstanceOf(IllegalArgumentException.class);
+
+    // verify regular dataframe saving still works
+    df.write().format("delta").save(getTableLocation(deltatb));
+
+    // verify the partition dir is created
+    List<String> subDirs = listDirs(getTableLocation(deltatb));
+    assertThat(subDirs).contains("_delta_log");
+
+    // verify we can create a table out of the exising delta location
+    sql("CREATE TABLE %s USING DELTA LOCATION '%s'", deltatb, 
getTableLocation(deltatb));
+    List<Object[]> tables = sql("SHOW TABLES");
+    assertThat(tables.size()).isEqualTo(1);
+    assertThat(tables).contains(new Object[] {defaultNs, deltatb, false});
+
+    sql("INSERT INTO %s VALUES ('Anna', 11)", deltatb);
+
+    List<Object[]> results = sql("SELECT * FROM %s ORDER BY name", deltatb);
+    assertThat(results.size()).isEqualTo(3);
+    assertThat(results.get(0)).isEqualTo(new Object[] {"Alice", 30});
+    assertThat(results.get(1)).isEqualTo(new Object[] {"Anna", 11});
+    assertThat(results.get(2)).isEqualTo(new Object[] {"Bob", 25});
+
+    sql("DROP TABLE %s", deltatb);
+  }
+}
diff --git 
a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java
 
b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java
index f9af2609d..a4e060a52 100644
--- 
a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java
+++ 
b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIT.java
@@ -22,8 +22,13 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import io.quarkus.test.junit.QuarkusIntegrationTest;
+import java.io.File;
+import java.nio.file.Path;
 import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.polaris.service.it.env.IntegrationTestsHelper;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 @QuarkusIntegrationTest
 public class SparkIT extends SparkIntegrationBase {
@@ -64,7 +69,7 @@ public class SparkIT extends SparkIntegrationBase {
 
   @Test
   public void testCreatDropView() {
-    String namespace = "ns";
+    String namespace = generateName("ns");
     // create namespace ns
     sql("CREATE NAMESPACE %s", namespace);
     sql("USE %s", namespace);
@@ -88,23 +93,112 @@ public class SparkIT extends SparkIntegrationBase {
     sql("DROP VIEW %s", view2Name);
     views = sql("SHOW VIEWS");
     assertThat(views.size()).isEqualTo(0);
+
+    sql("DROP NAMESPACE %s", namespace);
   }
 
   @Test
-  public void renameView() {
-    sql("CREATE NAMESPACE ns");
-    sql("USE ns");
+  public void renameIcebergViewAndTable() {
+    String namespace = generateName("ns");
+    sql("CREATE NAMESPACE %s", namespace);
+    sql("USE %s", namespace);
 
+    // create one view and one table
     String viewName = "originalView";
-    String renamedView = "renamedView";
     sql("CREATE VIEW %s AS SELECT 1 AS id", viewName);
+
+    String icebergTable = "iceberg_table";
+    sql("CREATE TABLE %s (col1 int, col2 string)", icebergTable);
+
+    // verify view and table is showing correctly
     List<Object[]> views = sql("SHOW VIEWS");
     assertThat(views.size()).isEqualTo(1);
-    assertThat(views).contains(new Object[] {"ns", viewName, false});
+    assertThat(views).contains(new Object[] {namespace, viewName, false});
+
+    List<Object[]> tables = sql("SHOW TABLES");
+    assertThat(tables.size()).isEqualTo(1);
+    assertThat(tables).contains(new Object[] {namespace, icebergTable, false});
 
+    // rename the view
+    String renamedView = "renamedView";
     sql("ALTER VIEW %s RENAME TO %s", viewName, renamedView);
     views = sql("SHOW VIEWS");
     assertThat(views.size()).isEqualTo(1);
-    assertThat(views).contains(new Object[] {"ns", renamedView, false});
+    assertThat(views).contains(new Object[] {namespace, renamedView, false});
+
+    // rename the table
+    String newIcebergTable = "iceberg_table_new";
+    sql("ALTER TABLE %s RENAME TO %s", icebergTable, newIcebergTable);
+    tables = sql("SHOW TABLES");
+    assertThat(tables.size()).isEqualTo(1);
+    assertThat(tables).contains(new Object[] {namespace, newIcebergTable, 
false});
+
+    // clean up the resources
+    sql("DROP VIEW %s", renamedView);
+    sql("DROP TABLE %s", newIcebergTable);
+    sql("DROP NAMESPACE %s", namespace);
+  }
+
+  @Test
+  public void testMixedTableAndViews(@TempDir Path tempDir) {
+    String namespace = generateName("ns");
+    sql("CREATE NAMESPACE %s", namespace);
+    sql("USE %s", namespace);
+
+    // create one iceberg table, iceberg view and one delta table
+    String icebergTable = "icebergtb";
+    sql("CREATE TABLE %s (col1 int, col2 String)", icebergTable);
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b')", icebergTable);
+
+    String viewName = "icebergview";
+    sql("CREATE VIEW %s AS SELECT col1 + 2 AS col1, col2 FROM %s", viewName, 
icebergTable);
+
+    String deltaTable = "deltatb";
+    String deltaDir =
+        
IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve(namespace).getPath();
+    sql(
+        "CREATE TABLE %s (col1 int, col2 int) using delta location '%s/%s'",
+        deltaTable, deltaDir, deltaTable);
+    sql("INSERT INTO %s VALUES (1, 3), (2, 5), (11, 20)", deltaTable);
+    // join the iceberg and delta table
+    List<Object[]> joinResult =
+        sql(
+            "SELECT icebergtb.col1 as id, icebergtb.col2 as str_col, 
deltatb.col2 as int_col from icebergtb inner join deltatb on icebergtb.col1 = 
deltatb.col1 order by id");
+    assertThat(joinResult.get(0)).isEqualTo(new Object[] {1, "a", 3});
+    assertThat(joinResult.get(1)).isEqualTo(new Object[] {2, "b", 5});
+
+    // show tables shows all tables
+    List<Object[]> tables = sql("SHOW TABLES");
+    assertThat(tables.size()).isEqualTo(2);
+    assertThat(tables)
+        .contains(
+            new Object[] {namespace, icebergTable, false},
+            new Object[] {namespace, deltaTable, false});
+
+    // verify the table and view content
+    List<Object[]> results = sql("SELECT * FROM %s ORDER BY col1", 
icebergTable);
+    assertThat(results.size()).isEqualTo(2);
+    assertThat(results.get(0)).isEqualTo(new Object[] {1, "a"});
+    assertThat(results.get(1)).isEqualTo(new Object[] {2, "b"});
+
+    // verify the table and view content
+    results = sql("SELECT * FROM %s ORDER BY col1", viewName);
+    assertThat(results.size()).isEqualTo(2);
+    assertThat(results.get(0)).isEqualTo(new Object[] {3, "a"});
+    assertThat(results.get(1)).isEqualTo(new Object[] {4, "b"});
+
+    List<Object[]> views = sql("SHOW VIEWS");
+    assertThat(views.size()).isEqualTo(1);
+    assertThat(views).contains(new Object[] {namespace, viewName, false});
+
+    // drop views and tables
+    sql("DROP TABLE %s", icebergTable);
+    sql("DROP TABLE %s", deltaTable);
+    sql("DROP VIEW %s", viewName);
+    sql("DROP NAMESPACE %s", namespace);
+
+    // clean up delta directory
+    File dirToDelete = new File(deltaDir);
+    FileUtils.deleteQuietly(dirToDelete);
   }
 }
diff --git 
a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java
 
b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java
index b5006d6a7..be456716c 100644
--- 
a/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java
+++ 
b/plugins/spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkIntegrationBase.java
@@ -20,9 +20,14 @@ package org.apache.polaris.spark.quarkus.it;
 
 import com.google.common.collect.ImmutableList;
 import com.google.errorprone.annotations.FormatMethod;
+import java.io.File;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.DirectoryFileFilter;
+import org.apache.commons.io.filefilter.FalseFileFilter;
 import org.apache.polaris.service.it.ext.PolarisSparkIntegrationTestBase;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
@@ -32,12 +37,14 @@ public abstract class SparkIntegrationBase extends 
PolarisSparkIntegrationTestBa
   @Override
   protected SparkSession.Builder withCatalog(SparkSession.Builder builder, 
String catalogName) {
     return builder
+        .config(
+            "spark.sql.extensions",
+            
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,io.delta.sql.DeltaSparkSessionExtension")
+        .config(
+            "spark.sql.catalog.spark_catalog", 
"org.apache.spark.sql.delta.catalog.DeltaCatalog")
         .config(
             String.format("spark.sql.catalog.%s", catalogName),
             "org.apache.polaris.spark.SparkCatalog")
-        .config(
-            "spark.sql.extensions",
-            
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
         .config("spark.sql.warehouse.dir", warehouseDir.toString())
         .config(String.format("spark.sql.catalog.%s.type", catalogName), 
"rest")
         .config(
@@ -54,26 +61,6 @@ public abstract class SparkIntegrationBase extends 
PolarisSparkIntegrationTestBa
         .config(String.format("spark.sql.catalog.%s.s3.region", catalogName), 
"us-west-2");
   }
 
-  @Override
-  protected void cleanupCatalog(String catalogName) {
-    onSpark("USE " + catalogName);
-    List<Row> namespaces = onSpark("SHOW NAMESPACES").collectAsList();
-    for (Row namespace : namespaces) {
-      // TODO: once all table operations are supported, remove the override of 
this function
-      // List<Row> tables = onSpark("SHOW TABLES IN " + 
namespace.getString(0)).collectAsList();
-      // for (Row table : tables) {
-      // onSpark("DROP TABLE " + namespace.getString(0) + "." + 
table.getString(1));
-      // }
-      List<Row> views = onSpark("SHOW VIEWS IN " + 
namespace.getString(0)).collectAsList();
-      for (Row view : views) {
-        onSpark("DROP VIEW " + namespace.getString(0) + "." + 
view.getString(1));
-      }
-      onSpark("DROP NAMESPACE " + namespace.getString(0));
-    }
-
-    managementApi.deleteCatalog(catalogName);
-  }
-
   @FormatMethod
   protected List<Object[]> sql(String query, Object... args) {
     List<Row> rows = spark.sql(String.format(query, args)).collectAsList();
@@ -108,4 +95,19 @@ public abstract class SparkIntegrationBase extends 
PolarisSparkIntegrationTestBa
             })
         .toArray(Object[]::new);
   }
+
+  /** List the name of directories under a given path non-recursively. */
+  protected List<String> listDirs(String path) {
+    File directory = new File(path);
+    return FileUtils.listFilesAndDirs(
+            directory, FalseFileFilter.INSTANCE, DirectoryFileFilter.DIRECTORY)
+        .stream()
+        .map(File::getName)
+        .toList();
+  }
+
+  /** Generate a string name with given prefix and a random suffix */
+  protected String generateName(String prefix) {
+    return prefix + "_" + UUID.randomUUID().toString().replaceAll("-", "");
+  }
 }
diff --git a/plugins/spark/v3.5/integration/src/intTest/resources/logback.xml 
b/plugins/spark/v3.5/integration/src/intTest/resources/logback.xml
new file mode 100644
index 000000000..5ec1efd5f
--- /dev/null
+++ b/plugins/spark/v3.5/integration/src/intTest/resources/logback.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements.  See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership.  The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied.  See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<!--
+Spark by default set the logging at DEBUG level, which dumps a lot of code 
details in
+the Intelij console, which impacts the IDE performance significantly when 
running the
+tests. This configuration allows only log back the ERROR logs for the IDE, you 
can comment
+out the configuration if you would like ot see all spark debug log during the 
run.
+-->
+<configuration>
+  <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+    <encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
+      <pattern>%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level - %msg%n</pattern>
+    </encoder>
+  </appender>
+
+  <root level="ERROR">
+    <appender-ref ref="CONSOLE"/>
+  </root>
+</configuration>
diff --git 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java
 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java
index e88628a70..9d8d1163e 100644
--- 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java
+++ 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java
@@ -151,13 +151,20 @@ public class SparkCatalog
     String provider = properties.get(PolarisCatalogUtils.TABLE_PROVIDER_KEY);
     if (PolarisCatalogUtils.useIceberg(provider)) {
       return this.icebergsSparkCatalog.createTable(ident, schema, transforms, 
properties);
-    } else if (PolarisCatalogUtils.useDelta(provider)) {
-      // For delta table, we load the delta catalog to help dealing with the
-      // delta log creation.
-      TableCatalog deltaCatalog = 
deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
-      return deltaCatalog.createTable(ident, schema, transforms, properties);
     } else {
-      return this.polarisSparkCatalog.createTable(ident, schema, transforms, 
properties);
+      if (PolarisCatalogUtils.isTableWithSparkManagedLocation(properties)) {
+        throw new UnsupportedOperationException(
+            "Create table without location key is not supported by Polaris. 
Please provide location or path on table creation.");
+      }
+
+      if (PolarisCatalogUtils.useDelta(provider)) {
+        // For delta table, we load the delta catalog to help dealing with the
+        // delta log creation.
+        TableCatalog deltaCatalog = 
deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
+        return deltaCatalog.createTable(ident, schema, transforms, properties);
+      } else {
+        return this.polarisSparkCatalog.createTable(ident, schema, transforms, 
properties);
+      }
     }
   }
 
diff --git 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java
 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java
index 01a4af45d..8dac78b23 100644
--- 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java
+++ 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java
@@ -50,6 +50,16 @@ public class PolarisCatalogUtils {
     return "delta".equalsIgnoreCase(provider);
   }
 
+  /**
+   * For tables whose location is manged by Spark Session Catalog, there will 
be no location or path
+   * in the properties.
+   */
+  public static boolean isTableWithSparkManagedLocation(Map<String, String> 
properties) {
+    boolean hasLocationClause = 
properties.containsKey(TableCatalog.PROP_LOCATION);
+    boolean hasPathClause = properties.containsKey(TABLE_PATH_KEY);
+    return !hasLocationClause && !hasPathClause;
+  }
+
   /**
    * Load spark table using DataSourceV2.
    *
diff --git a/site/content/in-dev/unreleased/polaris-spark-client.md 
b/site/content/in-dev/unreleased/polaris-spark-client.md
index 46796cdc6..9466020a4 100644
--- a/site/content/in-dev/unreleased/polaris-spark-client.md
+++ b/site/content/in-dev/unreleased/polaris-spark-client.md
@@ -126,5 +126,5 @@ The Polaris Spark client has the following functionality 
limitations:
    is also not supported, since it relies on the CTAS support.
 2) Create a Delta table without explicit location is not supported.
 3) Rename a Delta table is not supported.
-4) ALTER TABLE ... SET LOCATION/SET FILEFORMAT/ADD PARTITION is not supported 
for DELTA table.
+4) ALTER TABLE ... SET LOCATION is not supported for DELTA table.
 5) For other non-Iceberg tables like csv, it is not supported.

Reply via email to