This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 475d10a9c Spark: Add rest table operations (drop, list, purge and 
rename etc) for Spark Client (#1368)
475d10a9c is described below

commit 475d10a9c85cda80528ba5f24c10a7ed71df5cdf
Author: gh-yzou <167037035+gh-y...@users.noreply.github.com>
AuthorDate: Thu Apr 17 17:00:08 2025 -0700

    Spark: Add rest table operations (drop, list, purge and rename etc) for 
Spark Client (#1368)
---
 .../apache/polaris/spark/PolarisRESTCatalog.java   |  55 ++++-
 .../apache/polaris/spark/PolarisSparkCatalog.java  |  20 +-
 .../org/apache/polaris/spark/SparkCatalog.java     |  40 ++-
 .../spark/rest/ListGenericTablesRESTResponse.java  |  45 ++++
 .../org/apache/polaris/spark/NoopDeltaCatalog.java |   9 +
 .../org/apache/polaris/spark/SparkCatalogTest.java | 268 +++++++++++++++++----
 .../polaris/spark/rest/DeserializationTest.java    |  57 ++++-
 7 files changed, 432 insertions(+), 62 deletions(-)

diff --git 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java
 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java
index 0b8743132..72d258511 100644
--- 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java
+++ 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java
@@ -19,8 +19,10 @@
 package org.apache.polaris.spark;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -31,6 +33,7 @@ import java.util.function.Function;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.io.CloseableGroup;
 import org.apache.iceberg.rest.Endpoint;
 import org.apache.iceberg.rest.ErrorHandlers;
@@ -39,7 +42,9 @@ import org.apache.iceberg.rest.RESTClient;
 import org.apache.iceberg.rest.ResourcePaths;
 import org.apache.iceberg.rest.auth.OAuth2Util;
 import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.rest.responses.ListTablesResponse;
 import org.apache.iceberg.util.EnvironmentUtil;
+import org.apache.iceberg.util.PropertyUtil;
 import org.apache.polaris.core.rest.PolarisEndpoints;
 import org.apache.polaris.core.rest.PolarisResourcePaths;
 import org.apache.polaris.service.types.GenericTable;
@@ -52,6 +57,8 @@ import 
org.apache.polaris.spark.rest.LoadGenericTableRESTResponse;
  * objects.
  */
 public class PolarisRESTCatalog implements PolarisCatalog, Closeable {
+  public static final String REST_PAGE_SIZE = "rest-page-size";
+
   private final Function<Map<String, String>, RESTClient> clientBuilder;
 
   private RESTClient restClient = null;
@@ -59,6 +66,7 @@ public class PolarisRESTCatalog implements PolarisCatalog, 
Closeable {
   private Set<Endpoint> endpoints;
   private OAuth2Util.AuthSession catalogAuth = null;
   private PolarisResourcePaths pathGenerator = null;
+  private Integer pageSize = null;
 
   // the default endpoints to config if server doesn't specify the 'endpoints' 
configuration.
   private static final Set<Endpoint> DEFAULT_ENDPOINTS = 
PolarisEndpoints.GENERIC_TABLE_ENDPOINTS;
@@ -101,6 +109,12 @@ public class PolarisRESTCatalog implements PolarisCatalog, 
Closeable {
     this.pathGenerator = 
PolarisResourcePaths.forCatalogProperties(mergedProps);
     this.restClient = 
clientBuilder.apply(mergedProps).withAuthSession(catalogAuth);
 
+    this.pageSize = PropertyUtil.propertyAsNullableInt(mergedProps, 
REST_PAGE_SIZE);
+    if (pageSize != null) {
+      Preconditions.checkArgument(
+          pageSize > 0, "Invalid value for %s, must be a positive integer", 
REST_PAGE_SIZE);
+    }
+
     this.closeables = new CloseableGroup();
     this.closeables.addCloseable(this.restClient);
     this.closeables.setSuppressCloseFailure(true);
@@ -138,12 +152,49 @@ public class PolarisRESTCatalog implements 
PolarisCatalog, Closeable {
 
   @Override
   public List<TableIdentifier> listGenericTables(Namespace ns) {
-    throw new UnsupportedOperationException("listTables not supported");
+    Endpoint.check(endpoints, PolarisEndpoints.V1_LIST_GENERIC_TABLES);
+
+    Map<String, String> queryParams = Maps.newHashMap();
+    ImmutableList.Builder<TableIdentifier> tables = ImmutableList.builder();
+    String pageToken = "";
+    if (pageSize != null) {
+      queryParams.put("pageSize", String.valueOf(pageSize));
+    }
+
+    do {
+      queryParams.put("pageToken", pageToken);
+      ListTablesResponse response =
+          restClient
+              .withAuthSession(this.catalogAuth)
+              .get(
+                  pathGenerator.genericTables(ns),
+                  queryParams,
+                  ListTablesResponse.class,
+                  Map.of(),
+                  ErrorHandlers.namespaceErrorHandler());
+      pageToken = response.nextPageToken();
+      tables.addAll(response.identifiers());
+    } while (pageToken != null);
+
+    return tables.build();
   }
 
   @Override
   public boolean dropGenericTable(TableIdentifier identifier) {
-    throw new UnsupportedOperationException("dropTable not supported");
+    Endpoint.check(endpoints, PolarisEndpoints.V1_DELETE_GENERIC_TABLE);
+
+    try {
+      restClient
+          .withAuthSession(this.catalogAuth)
+          .delete(
+              pathGenerator.genericTable(identifier),
+              null,
+              Map.of(),
+              ErrorHandlers.tableErrorHandler());
+      return true;
+    } catch (NoSuchTableException e) {
+      return false;
+    }
   }
 
   @Override
diff --git 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
index 8f8c07fba..e1658312b 100644
--- 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
+++ 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
@@ -19,6 +19,7 @@
 package org.apache.polaris.spark;
 
 import java.util.Map;
+import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.polaris.service.types.GenericTable;
@@ -90,12 +91,19 @@ public class PolarisSparkCatalog implements TableCatalog {
   @Override
   public Table alterTable(Identifier identifier, TableChange... changes)
       throws NoSuchTableException {
-    throw new NoSuchTableException(identifier);
+    // alterTable currently is not supported for generic tables
+    throw new UnsupportedOperationException("alterTable operation is not 
supported");
+  }
+
+  @Override
+  public boolean purgeTable(Identifier ident) {
+    // purgeTable for generic table will only do a drop without purge
+    return dropTable(ident);
   }
 
   @Override
   public boolean dropTable(Identifier identifier) {
-    return false;
+    return 
this.polarisCatalog.dropGenericTable(Spark3Util.identifierToTableIdentifier(identifier));
   }
 
   @Override
@@ -106,6 +114,12 @@ public class PolarisSparkCatalog implements TableCatalog {
 
   @Override
   public Identifier[] listTables(String[] namespace) {
-    throw new UnsupportedOperationException("listTables operation is not 
supported");
+    try {
+      return 
this.polarisCatalog.listGenericTables(Namespace.of(namespace)).stream()
+          .map(ident -> Identifier.of(ident.namespace().levels(), 
ident.name()))
+          .toArray(Identifier[]::new);
+    } catch (UnsupportedOperationException ex) {
+      return new Identifier[0];
+    }
   }
 }
diff --git 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java
 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java
index cf46d9a15..e88628a70 100644
--- 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java
+++ 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java
@@ -20,7 +20,9 @@ package org.apache.polaris.spark;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import java.util.Arrays;
 import java.util.Map;
+import java.util.stream.Stream;
 import org.apache.arrow.util.VisibleForTesting;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
@@ -161,33 +163,59 @@ public class SparkCatalog
 
   @Override
   public Table alterTable(Identifier ident, TableChange... changes) throws 
NoSuchTableException {
-    throw new UnsupportedOperationException("alterTable");
+    try {
+      return this.icebergsSparkCatalog.alterTable(ident, changes);
+    } catch (NoSuchTableException e) {
+      Table table = this.polarisSparkCatalog.loadTable(ident);
+      String provider = 
table.properties().get(PolarisCatalogUtils.TABLE_PROVIDER_KEY);
+      if (PolarisCatalogUtils.useDelta(provider)) {
+        // For delta table, most of the alter operations is a delta log 
manipulation,
+        // we load the delta catalog to help handling the alter table 
operation.
+        // NOTE: This currently doesn't work for changing file location and 
file format
+        //     using ALTER TABLE ...SET LOCATION, and ALTER TABLE ... SET 
FILEFORMAT.
+        TableCatalog deltaCatalog = 
deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
+        return deltaCatalog.alterTable(ident, changes);
+      }
+      return this.polarisSparkCatalog.alterTable(ident);
+    }
   }
 
   @Override
   public boolean dropTable(Identifier ident) {
-    throw new UnsupportedOperationException("dropTable");
+    return this.icebergsSparkCatalog.dropTable(ident) || 
this.polarisSparkCatalog.dropTable(ident);
   }
 
   @Override
   public void renameTable(Identifier from, Identifier to)
       throws NoSuchTableException, TableAlreadyExistsException {
-    throw new UnsupportedOperationException("renameTable");
+    try {
+      this.icebergsSparkCatalog.renameTable(from, to);
+    } catch (NoSuchTableException e) {
+      this.polarisSparkCatalog.renameTable(from, to);
+    }
   }
 
   @Override
   public void invalidateTable(Identifier ident) {
-    throw new UnsupportedOperationException("invalidateTable");
+    this.icebergsSparkCatalog.invalidateTable(ident);
   }
 
   @Override
   public boolean purgeTable(Identifier ident) {
-    throw new UnsupportedOperationException("purgeTable");
+    if (this.icebergsSparkCatalog.purgeTable(ident)) {
+      return true;
+    } else {
+      return this.polarisSparkCatalog.purgeTable(ident);
+    }
   }
 
   @Override
   public Identifier[] listTables(String[] namespace) {
-    throw new UnsupportedOperationException("listTables");
+    Identifier[] icebergIdents = 
this.icebergsSparkCatalog.listTables(namespace);
+    Identifier[] genericTableIdents = 
this.polarisSparkCatalog.listTables(namespace);
+
+    return Stream.concat(Arrays.stream(icebergIdents), 
Arrays.stream(genericTableIdents))
+        .toArray(Identifier[]::new);
   }
 
   @Override
diff --git 
a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/rest/ListGenericTablesRESTResponse.java
 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/rest/ListGenericTablesRESTResponse.java
new file mode 100644
index 000000000..ede2c89a9
--- /dev/null
+++ 
b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/rest/ListGenericTablesRESTResponse.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Set;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.RESTResponse;
+import org.apache.polaris.service.types.ListGenericTablesResponse;
+
+/**
+ * RESTResponse definition for ListGenericTable which extends the iceberg 
RESTResponse. This is
+ * currently required because the Iceberg HTTPClient requires the request and 
response to be a class
+ * of RESTRequest and RESTResponse.
+ */
+public class ListGenericTablesRESTResponse extends ListGenericTablesResponse
+    implements RESTResponse {
+
+  @JsonCreator
+  public ListGenericTablesRESTResponse(
+      @JsonProperty(value = "next-page-token") String nextPageToken,
+      @JsonProperty(value = "identifiers") Set<TableIdentifier> identifiers) {
+    super(nextPageToken, identifiers);
+  }
+
+  @Override
+  public void validate() {}
+}
diff --git 
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java
 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java
index c11e8de3b..f698615e6 100644
--- 
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java
+++ 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java
@@ -18,7 +18,11 @@
  */
 package org.apache.polaris.spark;
 
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableChange;
 
 /**
  * This is a fake delta catalog class that is used for testing. This class is 
a noop class that
@@ -29,4 +33,9 @@ public class NoopDeltaCatalog extends 
DelegatingCatalogExtension {
   // This is a mock of isUnityCatalog scala val in
   // org.apache.spark.sql.delta.catalog.DeltaCatalog.
   private boolean isUnityCatalog = false;
+
+  @Override
+  public Table alterTable(Identifier ident, TableChange... changes) throws 
NoSuchTableException {
+    return super.loadTable(ident);
+  }
 }
diff --git 
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
index 0d142cbcb..6aa4a3c08 100644
--- 
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
+++ 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
@@ -28,20 +28,25 @@ import java.util.Map;
 import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.actions.DeleteReachableFiles;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.spark.actions.DeleteReachableFilesSparkAction;
+import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.source.SparkTable;
 import org.apache.polaris.spark.utils.DeltaHelper;
 import org.apache.polaris.spark.utils.PolarisCatalogUtils;
 import org.apache.spark.SparkContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
 import org.apache.spark.sql.connector.catalog.Identifier;
 import org.apache.spark.sql.connector.catalog.NamespaceChange;
 import org.apache.spark.sql.connector.catalog.Table;
 import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
 import org.apache.spark.sql.connector.catalog.TableProvider;
 import org.apache.spark.sql.connector.catalog.V1Table;
 import org.apache.spark.sql.connector.catalog.View;
@@ -106,6 +111,8 @@ public class SparkCatalogTest {
   private String catalogName;
 
   private static final String[] defaultNS = new String[] {"ns"};
+  private static StructType defaultSchema =
+      new StructType().add("id", "long").add("name", "string");
 
   @BeforeEach
   public void setup() throws Exception {
@@ -321,8 +328,6 @@ public class SparkCatalogTest {
     // create a new namespace under the default NS
     String[] namespace = new String[] {"ns", "nsl2"};
     catalog.createNamespace(namespace, Maps.newHashMap());
-    // table schema
-    StructType schema = new StructType().add("id", "long").add("name", 
"string");
     // create  under defaultNS
     String view1Name = "test-view1";
     String view1SQL = "select id from test-table where id >= 3";
@@ -331,7 +336,7 @@ public class SparkCatalogTest {
         view1SQL,
         catalogName,
         defaultNS,
-        schema,
+        defaultSchema,
         new String[0],
         new String[0],
         new String[0],
@@ -348,7 +353,7 @@ public class SparkCatalogTest {
           nsl2ViewSQLs[i],
           catalogName,
           namespace,
-          schema,
+          defaultSchema,
           new String[0],
           new String[0],
           new String[0],
@@ -368,39 +373,222 @@ public class SparkCatalogTest {
   }
 
   @Test
-  void testCreateAndLoadIcebergTable() throws Exception {
+  void testIcebergTableOperations() throws Exception {
     Identifier identifier = Identifier.of(defaultNS, "iceberg-table");
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "iceberg");
-    properties.put(TableCatalog.PROP_LOCATION, "file:///tmp/path/to/table/");
-    StructType schema = new StructType().add("boolType", "boolean");
-
-    Table createdTable = catalog.createTable(identifier, schema, new 
Transform[0], properties);
-    assertThat(createdTable).isInstanceOf(SparkTable.class);
+    createAndValidateGenericTableWithLoad(catalog, identifier, defaultSchema, 
"iceberg");
 
     // load the table
     Table table = catalog.loadTable(identifier);
     // verify iceberg SparkTable is loaded
     assertThat(table).isInstanceOf(SparkTable.class);
 
+    Identifier[] icebergTables = catalog.listTables(defaultNS);
+    assertThat(icebergTables.length).isEqualTo(1);
+    assertThat(icebergTables[0]).isEqualTo(Identifier.of(defaultNS, 
"iceberg-table"));
+
     // verify create table with the same identifier fails with spark 
TableAlreadyExistsException
-    StructType newSchema = new StructType().add("LongType", "Long");
     Map<String, String> newProperties = Maps.newHashMap();
     newProperties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "iceberg");
     newProperties.put(TableCatalog.PROP_LOCATION, 
"file:///tmp/path/to/table/");
     assertThatThrownBy(
-            () -> catalog.createTable(identifier, newSchema, new Transform[0], 
newProperties))
+            () -> catalog.createTable(identifier, defaultSchema, new 
Transform[0], newProperties))
         .isInstanceOf(TableAlreadyExistsException.class);
+
+    // drop the iceberg table
+    catalog.dropTable(identifier);
+    assertThatThrownBy(() -> catalog.loadTable(identifier))
+        .isInstanceOf(NoSuchTableException.class);
+    assertThat(catalog.listTables(defaultNS)).isEmpty();
   }
 
   @ParameterizedTest
   @ValueSource(strings = {"delta", "csv"})
   void testCreateAndLoadGenericTable(String format) throws Exception {
     Identifier identifier = Identifier.of(defaultNS, "generic-test-table");
+    createAndValidateGenericTableWithLoad(catalog, identifier, defaultSchema, 
format);
+
+    Identifier[] icebergTables = catalog.listTables(defaultNS);
+    assertThat(icebergTables.length).isEqualTo(1);
+    assertThat(icebergTables[0]).isEqualTo(Identifier.of(defaultNS, 
"generic-test-table"));
+
+    Map<String, String> newProperties = Maps.newHashMap();
+    newProperties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "parquet");
+    newProperties.put(TableCatalog.PROP_LOCATION, 
"file:///tmp/path/to/table/");
+    assertThatThrownBy(
+            () -> catalog.createTable(identifier, defaultSchema, new 
Transform[0], newProperties))
+        .isInstanceOf(TableAlreadyExistsException.class);
+
+    // drop the iceberg table
+    catalog.dropTable(identifier);
+    assertThatThrownBy(() -> catalog.loadTable(identifier))
+        .isInstanceOf(NoSuchTableException.class);
+    assertThat(catalog.listTables(defaultNS)).isEmpty();
+  }
+
+  @Test
+  void testMixedTables() throws Exception {
+    // create two iceberg tables, and three non-iceberg tables
+    String[] tableNames = new String[] {"iceberg1", "iceberg2", "delta1", 
"csv1", "delta2"};
+    String[] tableFormats = new String[] {"iceberg", null, "delta", "csv", 
"delta"};
+    for (int i = 0; i < tableNames.length; i++) {
+      Identifier identifier = Identifier.of(defaultNS, tableNames[i]);
+      createAndValidateGenericTableWithLoad(catalog, identifier, 
defaultSchema, tableFormats[i]);
+    }
+
+    // list all tables
+    Identifier[] tableIdents = catalog.listTables(defaultNS);
+    assertThat(tableIdents.length).isEqualTo(tableNames.length);
+    for (String name : tableNames) {
+      assertThat(tableIdents).contains(Identifier.of(defaultNS, name));
+    }
+
+    // drop iceberg2 and delta1 table
+    catalog.dropTable(Identifier.of(defaultNS, "iceberg2"));
+    catalog.dropTable(Identifier.of(defaultNS, "delta2"));
+
+    String[] remainingTableNames = new String[] {"iceberg1", "delta1", "csv1"};
+    Identifier[] remainingTableIndents = catalog.listTables(defaultNS);
+    
assertThat(remainingTableIndents.length).isEqualTo(remainingTableNames.length);
+    for (String name : remainingTableNames) {
+      assertThat(tableIdents).contains(Identifier.of(defaultNS, name));
+    }
+
+    // drop the remaining tables
+    for (String name : remainingTableNames) {
+      catalog.dropTable(Identifier.of(defaultNS, name));
+    }
+    assertThat(catalog.listTables(defaultNS)).isEmpty();
+  }
+
+  @Test
+  void testAlterAndRenameTable() throws Exception {
+    String icebergTableName = "iceberg-table";
+    String deltaTableName = "delta-table";
+    String csvTableName = "csv-table";
+    Identifier icebergIdent = Identifier.of(defaultNS, icebergTableName);
+    Identifier deltaIdent = Identifier.of(defaultNS, deltaTableName);
+    Identifier csvIdent = Identifier.of(defaultNS, csvTableName);
+    createAndValidateGenericTableWithLoad(catalog, icebergIdent, 
defaultSchema, "iceberg");
+    createAndValidateGenericTableWithLoad(catalog, deltaIdent, defaultSchema, 
"delta");
+    createAndValidateGenericTableWithLoad(catalog, csvIdent, defaultSchema, 
"csv");
+
+    // verify alter iceberg table
+    Table newIcebergTable =
+        catalog.alterTable(icebergIdent, 
TableChange.setProperty("iceberg_key", "iceberg_value"));
+    assertThat(newIcebergTable).isInstanceOf(SparkTable.class);
+    assertThat(newIcebergTable.properties()).contains(Map.entry("iceberg_key", 
"iceberg_value"));
+
+    // verify rename iceberg table works
+    Identifier newIcebergIdent = Identifier.of(defaultNS, "new-iceberg-table");
+    catalog.renameTable(icebergIdent, newIcebergIdent);
+    assertThatThrownBy(() -> catalog.loadTable(icebergIdent))
+        .isInstanceOf(NoSuchTableException.class);
+    Table icebergTable = catalog.loadTable(newIcebergIdent);
+    assertThat(icebergTable).isInstanceOf(SparkTable.class);
+
+    // verify alter delta table is a no-op, and alter csv table throws an 
exception
+    SQLConf conf = new SQLConf();
+    try (MockedStatic<SparkSession> mockedStaticSparkSession =
+            Mockito.mockStatic(SparkSession.class);
+        MockedStatic<DataSource> mockedStaticDS = 
Mockito.mockStatic(DataSource.class);
+        MockedStatic<DataSourceV2Utils> mockedStaticDSV2 =
+            Mockito.mockStatic(DataSourceV2Utils.class)) {
+      SparkSession mockedSession = Mockito.mock(SparkSession.class);
+      
mockedStaticSparkSession.when(SparkSession::active).thenReturn(mockedSession);
+      SessionState mockedState = Mockito.mock(SessionState.class);
+      Mockito.when(mockedSession.sessionState()).thenReturn(mockedState);
+      Mockito.when(mockedState.conf()).thenReturn(conf);
+
+      TableProvider deltaProvider = Mockito.mock(TableProvider.class);
+      mockedStaticDS
+          .when(() -> DataSource.lookupDataSourceV2(Mockito.eq("delta"), 
Mockito.any()))
+          .thenReturn(Option.apply(deltaProvider));
+      V1Table deltaTable = Mockito.mock(V1Table.class);
+      Map<String, String> deltaProps = Maps.newHashMap();
+      deltaProps.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "delta");
+      deltaProps.put(TableCatalog.PROP_LOCATION, 
"file:///tmp/delta/path/to/table/test-delta/");
+      Mockito.when(deltaTable.properties()).thenReturn(deltaProps);
+      mockedStaticDSV2
+          .when(
+              () ->
+                  DataSourceV2Utils.getTableFromProvider(
+                      Mockito.eq(deltaProvider), Mockito.any(), Mockito.any()))
+          .thenReturn(deltaTable);
+
+      Table delta =
+          catalog.alterTable(deltaIdent, TableChange.setProperty("delta_key", 
"delta_value"));
+      assertThat(delta).isInstanceOf(V1Table.class);
+
+      TableProvider csvProvider = Mockito.mock(TableProvider.class);
+      mockedStaticDS
+          .when(() -> DataSource.lookupDataSourceV2(Mockito.eq("csv"), 
Mockito.any()))
+          .thenReturn(Option.apply(csvProvider));
+      Map<String, String> csvProps = Maps.newHashMap();
+      csvProps.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "csv");
+      V1Table csvTable = Mockito.mock(V1Table.class);
+      Mockito.when(csvTable.properties()).thenReturn(csvProps);
+      mockedStaticDSV2
+          .when(
+              () ->
+                  DataSourceV2Utils.getTableFromProvider(
+                      Mockito.eq(csvProvider), Mockito.any(), Mockito.any()))
+          .thenReturn(csvTable);
+      assertThatThrownBy(
+              () -> catalog.alterTable(csvIdent, 
TableChange.setProperty("csv_key", "scv_value")))
+          .isInstanceOf(UnsupportedOperationException.class);
+    }
+
+    // verify rename non-iceberg table is not supported
+    assertThatThrownBy(
+            () -> catalog.renameTable(deltaIdent, Identifier.of(defaultNS, 
"new-delta-table")))
+        .isInstanceOf(UnsupportedOperationException.class);
+    assertThatThrownBy(
+            () -> catalog.renameTable(csvIdent, Identifier.of(defaultNS, 
"new-csv-table")))
+        .isInstanceOf(UnsupportedOperationException.class);
+  }
+
+  @Test
+  void testPurgeInvalidateTable() throws Exception {
+    Identifier icebergIdent = Identifier.of(defaultNS, "iceberg-table");
+    Identifier deltaIdent = Identifier.of(defaultNS, "delta-table");
+    createAndValidateGenericTableWithLoad(catalog, icebergIdent, 
defaultSchema, "iceberg");
+    createAndValidateGenericTableWithLoad(catalog, deltaIdent, defaultSchema, 
"delta");
+
+    // test invalidate table is a no op today
+    catalog.invalidateTable(icebergIdent);
+    catalog.invalidateTable(deltaIdent);
+
+    Identifier[] tableIdents = catalog.listTables(defaultNS);
+    assertThat(tableIdents.length).isEqualTo(2);
+
+    // verify purge tables drops the table
+    catalog.purgeTable(deltaIdent);
+    assertThat(catalog.listTables(defaultNS).length).isEqualTo(1);
+
+    // purge iceberg table triggers file deletion
+    try (MockedStatic<SparkActions> mockedStaticActions = 
Mockito.mockStatic(SparkActions.class)) {
+      SparkActions actions = Mockito.mock(SparkActions.class);
+      DeleteReachableFilesSparkAction deleteAction =
+          Mockito.mock(DeleteReachableFilesSparkAction.class);
+      mockedStaticActions.when(SparkActions::get).thenReturn(actions);
+      
Mockito.when(actions.deleteReachableFiles(Mockito.any())).thenReturn(deleteAction);
+      Mockito.when(deleteAction.io(Mockito.any())).thenReturn(deleteAction);
+      Mockito.when(deleteAction.execute())
+          .thenReturn(Mockito.mock(DeleteReachableFiles.Result.class));
+
+      catalog.purgeTable(icebergIdent);
+    }
+    assertThat(catalog.listTables(defaultNS).length).isEqualTo(0);
+  }
+
+  private void createAndValidateGenericTableWithLoad(
+      InMemorySparkCatalog sparkCatalog, Identifier identifier, StructType 
schema, String format)
+      throws Exception {
     Map<String, String> properties = Maps.newHashMap();
     properties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, format);
-    properties.put(TableCatalog.PROP_LOCATION, 
"file:///tmp/delta/path/to/table/");
-    StructType schema = new StructType().add("boolType", "boolean");
+    properties.put(
+        TableCatalog.PROP_LOCATION,
+        String.format("file:///tmp/delta/path/to/table/%s/", 
identifier.name()));
 
     SQLConf conf = new SQLConf();
     try (MockedStatic<SparkSession> mockedStaticSparkSession =
@@ -425,40 +613,20 @@ public class SparkCatalogTest {
                   DataSourceV2Utils.getTableFromProvider(
                       Mockito.eq(provider), Mockito.any(), Mockito.any()))
           .thenReturn(table);
-      Table createdTable = catalog.createTable(identifier, schema, new 
Transform[0], properties);
-      assertThat(createdTable).isInstanceOf(V1Table.class);
-
-      // load the table
-      Table loadedTable = catalog.loadTable(identifier);
-      assertThat(loadedTable).isInstanceOf(V1Table.class);
+      Table createdTable =
+          sparkCatalog.createTable(identifier, schema, new Transform[0], 
properties);
+      Table loadedTable = sparkCatalog.loadTable(identifier);
+
+      // verify the create and load table result
+      if (PolarisCatalogUtils.useIceberg(format)) {
+        // iceberg SparkTable is returned for iceberg tables
+        assertThat(createdTable).isInstanceOf(SparkTable.class);
+        assertThat(loadedTable).isInstanceOf(SparkTable.class);
+      } else {
+        // Spark V1 table is returned for non-iceberg tables
+        assertThat(createdTable).isInstanceOf(V1Table.class);
+        assertThat(loadedTable).isInstanceOf(V1Table.class);
+      }
     }
-
-    StructType newSchema = new StructType().add("LongType", "Long");
-    Map<String, String> newProperties = Maps.newHashMap();
-    newProperties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "parquet");
-    newProperties.put(TableCatalog.PROP_LOCATION, 
"file:///tmp/path/to/table/");
-    assertThatThrownBy(
-            () -> catalog.createTable(identifier, newSchema, new Transform[0], 
newProperties))
-        .isInstanceOf(TableAlreadyExistsException.class);
-  }
-
-  @Test
-  public void testUnsupportedOperations() {
-    String[] namespace = new String[] {"ns1"};
-    Identifier identifier = Identifier.of(namespace, "table1");
-    Identifier new_identifier = Identifier.of(namespace, "table2");
-    // table methods
-    assertThatThrownBy(() -> catalog.alterTable(identifier))
-        .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(() -> catalog.dropTable(identifier))
-        .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(() -> catalog.renameTable(identifier, new_identifier))
-        .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(() -> catalog.listTables(namespace))
-        .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(() -> catalog.invalidateTable(identifier))
-        .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(() -> catalog.purgeTable(identifier))
-        .isInstanceOf(UnsupportedOperationException.class);
   }
 }
diff --git 
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
index 542fd05d8..e6747e653 100644
--- 
a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
+++ 
b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
@@ -20,23 +20,48 @@ package org.apache.polaris.spark.rest;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonFactoryBuilder;
 import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategies;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Stream;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.rest.RESTSerializers;
 import org.apache.polaris.service.types.GenericTable;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 public class DeserializationTest {
   private ObjectMapper mapper;
+  private static final JsonFactory FACTORY =
+      new JsonFactoryBuilder()
+          .configure(JsonFactory.Feature.INTERN_FIELD_NAMES, false)
+          .configure(JsonFactory.Feature.FAIL_ON_SYMBOL_HASH_OVERFLOW, false)
+          .build();
 
   @BeforeEach
   public void setUp() {
-    mapper = new ObjectMapper();
+    // NOTE: This is the same setting as iceberg RESTObjectMapper.java. 
However,
+    // RESTObjectMapper is not a public class, therefore, we duplicate the
+    // setting here for serialization and deserialization tests.
+    mapper = new ObjectMapper(FACTORY);
+    mapper.setVisibility(PropertyAccessor.FIELD, 
JsonAutoDetect.Visibility.ANY);
+    mapper.setVisibility(PropertyAccessor.CREATOR, 
JsonAutoDetect.Visibility.ANY);
+    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+    mapper.setPropertyNamingStrategy(new 
PropertyNamingStrategies.KebabCaseStrategy());
+    RESTSerializers.registerAll(mapper);
   }
 
   @ParameterizedTest
@@ -75,6 +100,36 @@ public class DeserializationTest {
     
assertThat(deserializedRequest.getProperties().size()).isEqualTo(properties.size());
   }
 
+  @Test
+  public void testListGenericTablesRESTResponse() throws 
JsonProcessingException {
+    Namespace namespace = Namespace.of("test-ns");
+    Set<TableIdentifier> idents =
+        ImmutableSet.of(
+            TableIdentifier.of(namespace, "table1"),
+            TableIdentifier.of(namespace, "table2"),
+            TableIdentifier.of(namespace, "table3"));
+
+    // page token is null
+    ListGenericTablesRESTResponse response = new 
ListGenericTablesRESTResponse(null, idents);
+    String json = mapper.writeValueAsString(response);
+    ListGenericTablesRESTResponse deserializedResponse =
+        mapper.readValue(json, ListGenericTablesRESTResponse.class);
+    assertThat(deserializedResponse.getNextPageToken()).isNull();
+    
assertThat(deserializedResponse.getIdentifiers().size()).isEqualTo(idents.size());
+    for (TableIdentifier identifier : idents) {
+      assertThat(deserializedResponse.getIdentifiers()).contains(identifier);
+    }
+
+    // page token is not null
+    response = new ListGenericTablesRESTResponse("page-token", idents);
+    json = mapper.writeValueAsString(response);
+    deserializedResponse = mapper.readValue(json, 
ListGenericTablesRESTResponse.class);
+    
assertThat(deserializedResponse.getNextPageToken()).isEqualTo("page-token");
+    for (TableIdentifier identifier : idents) {
+      assertThat(deserializedResponse.getIdentifiers()).contains(identifier);
+    }
+  }
+
   private static Stream<Arguments> genericTableTestCases() {
     var doc = "table for testing";
     var properties = Maps.newHashMap();


Reply via email to