This is an automated email from the ASF dual-hosted git repository. yufei pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push: new 475d10a9c Spark: Add rest table operations (drop, list, purge and rename etc) for Spark Client (#1368) 475d10a9c is described below commit 475d10a9c85cda80528ba5f24c10a7ed71df5cdf Author: gh-yzou <167037035+gh-y...@users.noreply.github.com> AuthorDate: Thu Apr 17 17:00:08 2025 -0700 Spark: Add rest table operations (drop, list, purge and rename etc) for Spark Client (#1368) --- .../apache/polaris/spark/PolarisRESTCatalog.java | 55 ++++- .../apache/polaris/spark/PolarisSparkCatalog.java | 20 +- .../org/apache/polaris/spark/SparkCatalog.java | 40 ++- .../spark/rest/ListGenericTablesRESTResponse.java | 45 ++++ .../org/apache/polaris/spark/NoopDeltaCatalog.java | 9 + .../org/apache/polaris/spark/SparkCatalogTest.java | 268 +++++++++++++++++---- .../polaris/spark/rest/DeserializationTest.java | 57 ++++- 7 files changed, 432 insertions(+), 62 deletions(-) diff --git a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java index 0b8743132..72d258511 100644 --- a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java +++ b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java @@ -19,8 +19,10 @@ package org.apache.polaris.spark; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; @@ -31,6 +33,7 @@ import java.util.function.Function; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.rest.Endpoint; import org.apache.iceberg.rest.ErrorHandlers; @@ -39,7 +42,9 @@ import org.apache.iceberg.rest.RESTClient; import org.apache.iceberg.rest.ResourcePaths; import org.apache.iceberg.rest.auth.OAuth2Util; import org.apache.iceberg.rest.responses.ConfigResponse; +import org.apache.iceberg.rest.responses.ListTablesResponse; import org.apache.iceberg.util.EnvironmentUtil; +import org.apache.iceberg.util.PropertyUtil; import org.apache.polaris.core.rest.PolarisEndpoints; import org.apache.polaris.core.rest.PolarisResourcePaths; import org.apache.polaris.service.types.GenericTable; @@ -52,6 +57,8 @@ import org.apache.polaris.spark.rest.LoadGenericTableRESTResponse; * objects. */ public class PolarisRESTCatalog implements PolarisCatalog, Closeable { + public static final String REST_PAGE_SIZE = "rest-page-size"; + private final Function<Map<String, String>, RESTClient> clientBuilder; private RESTClient restClient = null; @@ -59,6 +66,7 @@ public class PolarisRESTCatalog implements PolarisCatalog, Closeable { private Set<Endpoint> endpoints; private OAuth2Util.AuthSession catalogAuth = null; private PolarisResourcePaths pathGenerator = null; + private Integer pageSize = null; // the default endpoints to config if server doesn't specify the 'endpoints' configuration. private static final Set<Endpoint> DEFAULT_ENDPOINTS = PolarisEndpoints.GENERIC_TABLE_ENDPOINTS; @@ -101,6 +109,12 @@ public class PolarisRESTCatalog implements PolarisCatalog, Closeable { this.pathGenerator = PolarisResourcePaths.forCatalogProperties(mergedProps); this.restClient = clientBuilder.apply(mergedProps).withAuthSession(catalogAuth); + this.pageSize = PropertyUtil.propertyAsNullableInt(mergedProps, REST_PAGE_SIZE); + if (pageSize != null) { + Preconditions.checkArgument( + pageSize > 0, "Invalid value for %s, must be a positive integer", REST_PAGE_SIZE); + } + this.closeables = new CloseableGroup(); this.closeables.addCloseable(this.restClient); this.closeables.setSuppressCloseFailure(true); @@ -138,12 +152,49 @@ public class PolarisRESTCatalog implements PolarisCatalog, Closeable { @Override public List<TableIdentifier> listGenericTables(Namespace ns) { - throw new UnsupportedOperationException("listTables not supported"); + Endpoint.check(endpoints, PolarisEndpoints.V1_LIST_GENERIC_TABLES); + + Map<String, String> queryParams = Maps.newHashMap(); + ImmutableList.Builder<TableIdentifier> tables = ImmutableList.builder(); + String pageToken = ""; + if (pageSize != null) { + queryParams.put("pageSize", String.valueOf(pageSize)); + } + + do { + queryParams.put("pageToken", pageToken); + ListTablesResponse response = + restClient + .withAuthSession(this.catalogAuth) + .get( + pathGenerator.genericTables(ns), + queryParams, + ListTablesResponse.class, + Map.of(), + ErrorHandlers.namespaceErrorHandler()); + pageToken = response.nextPageToken(); + tables.addAll(response.identifiers()); + } while (pageToken != null); + + return tables.build(); } @Override public boolean dropGenericTable(TableIdentifier identifier) { - throw new UnsupportedOperationException("dropTable not supported"); + Endpoint.check(endpoints, PolarisEndpoints.V1_DELETE_GENERIC_TABLE); + + try { + restClient + .withAuthSession(this.catalogAuth) + .delete( + pathGenerator.genericTable(identifier), + null, + Map.of(), + ErrorHandlers.tableErrorHandler()); + return true; + } catch (NoSuchTableException e) { + return false; + } } @Override diff --git a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java index 8f8c07fba..e1658312b 100644 --- a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java +++ b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java @@ -19,6 +19,7 @@ package org.apache.polaris.spark; import java.util.Map; +import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.spark.Spark3Util; import org.apache.polaris.service.types.GenericTable; @@ -90,12 +91,19 @@ public class PolarisSparkCatalog implements TableCatalog { @Override public Table alterTable(Identifier identifier, TableChange... changes) throws NoSuchTableException { - throw new NoSuchTableException(identifier); + // alterTable currently is not supported for generic tables + throw new UnsupportedOperationException("alterTable operation is not supported"); + } + + @Override + public boolean purgeTable(Identifier ident) { + // purgeTable for generic table will only do a drop without purge + return dropTable(ident); } @Override public boolean dropTable(Identifier identifier) { - return false; + return this.polarisCatalog.dropGenericTable(Spark3Util.identifierToTableIdentifier(identifier)); } @Override @@ -106,6 +114,12 @@ public class PolarisSparkCatalog implements TableCatalog { @Override public Identifier[] listTables(String[] namespace) { - throw new UnsupportedOperationException("listTables operation is not supported"); + try { + return this.polarisCatalog.listGenericTables(Namespace.of(namespace)).stream() + .map(ident -> Identifier.of(ident.namespace().levels(), ident.name())) + .toArray(Identifier[]::new); + } catch (UnsupportedOperationException ex) { + return new Identifier[0]; + } } } diff --git a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java index cf46d9a15..e88628a70 100644 --- a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java +++ b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java @@ -20,7 +20,9 @@ package org.apache.polaris.spark; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; +import java.util.Arrays; import java.util.Map; +import java.util.stream.Stream; import org.apache.arrow.util.VisibleForTesting; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.CatalogUtil; @@ -161,33 +163,59 @@ public class SparkCatalog @Override public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException { - throw new UnsupportedOperationException("alterTable"); + try { + return this.icebergsSparkCatalog.alterTable(ident, changes); + } catch (NoSuchTableException e) { + Table table = this.polarisSparkCatalog.loadTable(ident); + String provider = table.properties().get(PolarisCatalogUtils.TABLE_PROVIDER_KEY); + if (PolarisCatalogUtils.useDelta(provider)) { + // For delta table, most of the alter operations is a delta log manipulation, + // we load the delta catalog to help handling the alter table operation. + // NOTE: This currently doesn't work for changing file location and file format + // using ALTER TABLE ...SET LOCATION, and ALTER TABLE ... SET FILEFORMAT. + TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog); + return deltaCatalog.alterTable(ident, changes); + } + return this.polarisSparkCatalog.alterTable(ident); + } } @Override public boolean dropTable(Identifier ident) { - throw new UnsupportedOperationException("dropTable"); + return this.icebergsSparkCatalog.dropTable(ident) || this.polarisSparkCatalog.dropTable(ident); } @Override public void renameTable(Identifier from, Identifier to) throws NoSuchTableException, TableAlreadyExistsException { - throw new UnsupportedOperationException("renameTable"); + try { + this.icebergsSparkCatalog.renameTable(from, to); + } catch (NoSuchTableException e) { + this.polarisSparkCatalog.renameTable(from, to); + } } @Override public void invalidateTable(Identifier ident) { - throw new UnsupportedOperationException("invalidateTable"); + this.icebergsSparkCatalog.invalidateTable(ident); } @Override public boolean purgeTable(Identifier ident) { - throw new UnsupportedOperationException("purgeTable"); + if (this.icebergsSparkCatalog.purgeTable(ident)) { + return true; + } else { + return this.polarisSparkCatalog.purgeTable(ident); + } } @Override public Identifier[] listTables(String[] namespace) { - throw new UnsupportedOperationException("listTables"); + Identifier[] icebergIdents = this.icebergsSparkCatalog.listTables(namespace); + Identifier[] genericTableIdents = this.polarisSparkCatalog.listTables(namespace); + + return Stream.concat(Arrays.stream(icebergIdents), Arrays.stream(genericTableIdents)) + .toArray(Identifier[]::new); } @Override diff --git a/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/rest/ListGenericTablesRESTResponse.java b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/rest/ListGenericTablesRESTResponse.java new file mode 100644 index 000000000..ede2c89a9 --- /dev/null +++ b/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/rest/ListGenericTablesRESTResponse.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Set; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.RESTResponse; +import org.apache.polaris.service.types.ListGenericTablesResponse; + +/** + * RESTResponse definition for ListGenericTable which extends the iceberg RESTResponse. This is + * currently required because the Iceberg HTTPClient requires the request and response to be a class + * of RESTRequest and RESTResponse. + */ +public class ListGenericTablesRESTResponse extends ListGenericTablesResponse + implements RESTResponse { + + @JsonCreator + public ListGenericTablesRESTResponse( + @JsonProperty(value = "next-page-token") String nextPageToken, + @JsonProperty(value = "identifiers") Set<TableIdentifier> identifiers) { + super(nextPageToken, identifiers); + } + + @Override + public void validate() {} +} diff --git a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java index c11e8de3b..f698615e6 100644 --- a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java +++ b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java @@ -18,7 +18,11 @@ */ package org.apache.polaris.spark; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableChange; /** * This is a fake delta catalog class that is used for testing. This class is a noop class that @@ -29,4 +33,9 @@ public class NoopDeltaCatalog extends DelegatingCatalogExtension { // This is a mock of isUnityCatalog scala val in // org.apache.spark.sql.delta.catalog.DeltaCatalog. private boolean isUnityCatalog = false; + + @Override + public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchTableException { + return super.loadTable(ident); + } } diff --git a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java index 0d142cbcb..6aa4a3c08 100644 --- a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java +++ b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java @@ -28,20 +28,25 @@ import java.util.Map; import java.util.UUID; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.actions.DeleteReachableFiles; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.spark.actions.DeleteReachableFilesSparkAction; +import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.spark.source.SparkTable; import org.apache.polaris.spark.utils.DeltaHelper; import org.apache.polaris.spark.utils.PolarisCatalogUtils; import org.apache.spark.SparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; import org.apache.spark.sql.connector.catalog.Identifier; import org.apache.spark.sql.connector.catalog.NamespaceChange; import org.apache.spark.sql.connector.catalog.Table; import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.TableChange; import org.apache.spark.sql.connector.catalog.TableProvider; import org.apache.spark.sql.connector.catalog.V1Table; import org.apache.spark.sql.connector.catalog.View; @@ -106,6 +111,8 @@ public class SparkCatalogTest { private String catalogName; private static final String[] defaultNS = new String[] {"ns"}; + private static StructType defaultSchema = + new StructType().add("id", "long").add("name", "string"); @BeforeEach public void setup() throws Exception { @@ -321,8 +328,6 @@ public class SparkCatalogTest { // create a new namespace under the default NS String[] namespace = new String[] {"ns", "nsl2"}; catalog.createNamespace(namespace, Maps.newHashMap()); - // table schema - StructType schema = new StructType().add("id", "long").add("name", "string"); // create under defaultNS String view1Name = "test-view1"; String view1SQL = "select id from test-table where id >= 3"; @@ -331,7 +336,7 @@ public class SparkCatalogTest { view1SQL, catalogName, defaultNS, - schema, + defaultSchema, new String[0], new String[0], new String[0], @@ -348,7 +353,7 @@ public class SparkCatalogTest { nsl2ViewSQLs[i], catalogName, namespace, - schema, + defaultSchema, new String[0], new String[0], new String[0], @@ -368,39 +373,222 @@ public class SparkCatalogTest { } @Test - void testCreateAndLoadIcebergTable() throws Exception { + void testIcebergTableOperations() throws Exception { Identifier identifier = Identifier.of(defaultNS, "iceberg-table"); - Map<String, String> properties = Maps.newHashMap(); - properties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "iceberg"); - properties.put(TableCatalog.PROP_LOCATION, "file:///tmp/path/to/table/"); - StructType schema = new StructType().add("boolType", "boolean"); - - Table createdTable = catalog.createTable(identifier, schema, new Transform[0], properties); - assertThat(createdTable).isInstanceOf(SparkTable.class); + createAndValidateGenericTableWithLoad(catalog, identifier, defaultSchema, "iceberg"); // load the table Table table = catalog.loadTable(identifier); // verify iceberg SparkTable is loaded assertThat(table).isInstanceOf(SparkTable.class); + Identifier[] icebergTables = catalog.listTables(defaultNS); + assertThat(icebergTables.length).isEqualTo(1); + assertThat(icebergTables[0]).isEqualTo(Identifier.of(defaultNS, "iceberg-table")); + // verify create table with the same identifier fails with spark TableAlreadyExistsException - StructType newSchema = new StructType().add("LongType", "Long"); Map<String, String> newProperties = Maps.newHashMap(); newProperties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "iceberg"); newProperties.put(TableCatalog.PROP_LOCATION, "file:///tmp/path/to/table/"); assertThatThrownBy( - () -> catalog.createTable(identifier, newSchema, new Transform[0], newProperties)) + () -> catalog.createTable(identifier, defaultSchema, new Transform[0], newProperties)) .isInstanceOf(TableAlreadyExistsException.class); + + // drop the iceberg table + catalog.dropTable(identifier); + assertThatThrownBy(() -> catalog.loadTable(identifier)) + .isInstanceOf(NoSuchTableException.class); + assertThat(catalog.listTables(defaultNS)).isEmpty(); } @ParameterizedTest @ValueSource(strings = {"delta", "csv"}) void testCreateAndLoadGenericTable(String format) throws Exception { Identifier identifier = Identifier.of(defaultNS, "generic-test-table"); + createAndValidateGenericTableWithLoad(catalog, identifier, defaultSchema, format); + + Identifier[] icebergTables = catalog.listTables(defaultNS); + assertThat(icebergTables.length).isEqualTo(1); + assertThat(icebergTables[0]).isEqualTo(Identifier.of(defaultNS, "generic-test-table")); + + Map<String, String> newProperties = Maps.newHashMap(); + newProperties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "parquet"); + newProperties.put(TableCatalog.PROP_LOCATION, "file:///tmp/path/to/table/"); + assertThatThrownBy( + () -> catalog.createTable(identifier, defaultSchema, new Transform[0], newProperties)) + .isInstanceOf(TableAlreadyExistsException.class); + + // drop the iceberg table + catalog.dropTable(identifier); + assertThatThrownBy(() -> catalog.loadTable(identifier)) + .isInstanceOf(NoSuchTableException.class); + assertThat(catalog.listTables(defaultNS)).isEmpty(); + } + + @Test + void testMixedTables() throws Exception { + // create two iceberg tables, and three non-iceberg tables + String[] tableNames = new String[] {"iceberg1", "iceberg2", "delta1", "csv1", "delta2"}; + String[] tableFormats = new String[] {"iceberg", null, "delta", "csv", "delta"}; + for (int i = 0; i < tableNames.length; i++) { + Identifier identifier = Identifier.of(defaultNS, tableNames[i]); + createAndValidateGenericTableWithLoad(catalog, identifier, defaultSchema, tableFormats[i]); + } + + // list all tables + Identifier[] tableIdents = catalog.listTables(defaultNS); + assertThat(tableIdents.length).isEqualTo(tableNames.length); + for (String name : tableNames) { + assertThat(tableIdents).contains(Identifier.of(defaultNS, name)); + } + + // drop iceberg2 and delta1 table + catalog.dropTable(Identifier.of(defaultNS, "iceberg2")); + catalog.dropTable(Identifier.of(defaultNS, "delta2")); + + String[] remainingTableNames = new String[] {"iceberg1", "delta1", "csv1"}; + Identifier[] remainingTableIndents = catalog.listTables(defaultNS); + assertThat(remainingTableIndents.length).isEqualTo(remainingTableNames.length); + for (String name : remainingTableNames) { + assertThat(tableIdents).contains(Identifier.of(defaultNS, name)); + } + + // drop the remaining tables + for (String name : remainingTableNames) { + catalog.dropTable(Identifier.of(defaultNS, name)); + } + assertThat(catalog.listTables(defaultNS)).isEmpty(); + } + + @Test + void testAlterAndRenameTable() throws Exception { + String icebergTableName = "iceberg-table"; + String deltaTableName = "delta-table"; + String csvTableName = "csv-table"; + Identifier icebergIdent = Identifier.of(defaultNS, icebergTableName); + Identifier deltaIdent = Identifier.of(defaultNS, deltaTableName); + Identifier csvIdent = Identifier.of(defaultNS, csvTableName); + createAndValidateGenericTableWithLoad(catalog, icebergIdent, defaultSchema, "iceberg"); + createAndValidateGenericTableWithLoad(catalog, deltaIdent, defaultSchema, "delta"); + createAndValidateGenericTableWithLoad(catalog, csvIdent, defaultSchema, "csv"); + + // verify alter iceberg table + Table newIcebergTable = + catalog.alterTable(icebergIdent, TableChange.setProperty("iceberg_key", "iceberg_value")); + assertThat(newIcebergTable).isInstanceOf(SparkTable.class); + assertThat(newIcebergTable.properties()).contains(Map.entry("iceberg_key", "iceberg_value")); + + // verify rename iceberg table works + Identifier newIcebergIdent = Identifier.of(defaultNS, "new-iceberg-table"); + catalog.renameTable(icebergIdent, newIcebergIdent); + assertThatThrownBy(() -> catalog.loadTable(icebergIdent)) + .isInstanceOf(NoSuchTableException.class); + Table icebergTable = catalog.loadTable(newIcebergIdent); + assertThat(icebergTable).isInstanceOf(SparkTable.class); + + // verify alter delta table is a no-op, and alter csv table throws an exception + SQLConf conf = new SQLConf(); + try (MockedStatic<SparkSession> mockedStaticSparkSession = + Mockito.mockStatic(SparkSession.class); + MockedStatic<DataSource> mockedStaticDS = Mockito.mockStatic(DataSource.class); + MockedStatic<DataSourceV2Utils> mockedStaticDSV2 = + Mockito.mockStatic(DataSourceV2Utils.class)) { + SparkSession mockedSession = Mockito.mock(SparkSession.class); + mockedStaticSparkSession.when(SparkSession::active).thenReturn(mockedSession); + SessionState mockedState = Mockito.mock(SessionState.class); + Mockito.when(mockedSession.sessionState()).thenReturn(mockedState); + Mockito.when(mockedState.conf()).thenReturn(conf); + + TableProvider deltaProvider = Mockito.mock(TableProvider.class); + mockedStaticDS + .when(() -> DataSource.lookupDataSourceV2(Mockito.eq("delta"), Mockito.any())) + .thenReturn(Option.apply(deltaProvider)); + V1Table deltaTable = Mockito.mock(V1Table.class); + Map<String, String> deltaProps = Maps.newHashMap(); + deltaProps.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "delta"); + deltaProps.put(TableCatalog.PROP_LOCATION, "file:///tmp/delta/path/to/table/test-delta/"); + Mockito.when(deltaTable.properties()).thenReturn(deltaProps); + mockedStaticDSV2 + .when( + () -> + DataSourceV2Utils.getTableFromProvider( + Mockito.eq(deltaProvider), Mockito.any(), Mockito.any())) + .thenReturn(deltaTable); + + Table delta = + catalog.alterTable(deltaIdent, TableChange.setProperty("delta_key", "delta_value")); + assertThat(delta).isInstanceOf(V1Table.class); + + TableProvider csvProvider = Mockito.mock(TableProvider.class); + mockedStaticDS + .when(() -> DataSource.lookupDataSourceV2(Mockito.eq("csv"), Mockito.any())) + .thenReturn(Option.apply(csvProvider)); + Map<String, String> csvProps = Maps.newHashMap(); + csvProps.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "csv"); + V1Table csvTable = Mockito.mock(V1Table.class); + Mockito.when(csvTable.properties()).thenReturn(csvProps); + mockedStaticDSV2 + .when( + () -> + DataSourceV2Utils.getTableFromProvider( + Mockito.eq(csvProvider), Mockito.any(), Mockito.any())) + .thenReturn(csvTable); + assertThatThrownBy( + () -> catalog.alterTable(csvIdent, TableChange.setProperty("csv_key", "scv_value"))) + .isInstanceOf(UnsupportedOperationException.class); + } + + // verify rename non-iceberg table is not supported + assertThatThrownBy( + () -> catalog.renameTable(deltaIdent, Identifier.of(defaultNS, "new-delta-table"))) + .isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy( + () -> catalog.renameTable(csvIdent, Identifier.of(defaultNS, "new-csv-table"))) + .isInstanceOf(UnsupportedOperationException.class); + } + + @Test + void testPurgeInvalidateTable() throws Exception { + Identifier icebergIdent = Identifier.of(defaultNS, "iceberg-table"); + Identifier deltaIdent = Identifier.of(defaultNS, "delta-table"); + createAndValidateGenericTableWithLoad(catalog, icebergIdent, defaultSchema, "iceberg"); + createAndValidateGenericTableWithLoad(catalog, deltaIdent, defaultSchema, "delta"); + + // test invalidate table is a no op today + catalog.invalidateTable(icebergIdent); + catalog.invalidateTable(deltaIdent); + + Identifier[] tableIdents = catalog.listTables(defaultNS); + assertThat(tableIdents.length).isEqualTo(2); + + // verify purge tables drops the table + catalog.purgeTable(deltaIdent); + assertThat(catalog.listTables(defaultNS).length).isEqualTo(1); + + // purge iceberg table triggers file deletion + try (MockedStatic<SparkActions> mockedStaticActions = Mockito.mockStatic(SparkActions.class)) { + SparkActions actions = Mockito.mock(SparkActions.class); + DeleteReachableFilesSparkAction deleteAction = + Mockito.mock(DeleteReachableFilesSparkAction.class); + mockedStaticActions.when(SparkActions::get).thenReturn(actions); + Mockito.when(actions.deleteReachableFiles(Mockito.any())).thenReturn(deleteAction); + Mockito.when(deleteAction.io(Mockito.any())).thenReturn(deleteAction); + Mockito.when(deleteAction.execute()) + .thenReturn(Mockito.mock(DeleteReachableFiles.Result.class)); + + catalog.purgeTable(icebergIdent); + } + assertThat(catalog.listTables(defaultNS).length).isEqualTo(0); + } + + private void createAndValidateGenericTableWithLoad( + InMemorySparkCatalog sparkCatalog, Identifier identifier, StructType schema, String format) + throws Exception { Map<String, String> properties = Maps.newHashMap(); properties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, format); - properties.put(TableCatalog.PROP_LOCATION, "file:///tmp/delta/path/to/table/"); - StructType schema = new StructType().add("boolType", "boolean"); + properties.put( + TableCatalog.PROP_LOCATION, + String.format("file:///tmp/delta/path/to/table/%s/", identifier.name())); SQLConf conf = new SQLConf(); try (MockedStatic<SparkSession> mockedStaticSparkSession = @@ -425,40 +613,20 @@ public class SparkCatalogTest { DataSourceV2Utils.getTableFromProvider( Mockito.eq(provider), Mockito.any(), Mockito.any())) .thenReturn(table); - Table createdTable = catalog.createTable(identifier, schema, new Transform[0], properties); - assertThat(createdTable).isInstanceOf(V1Table.class); - - // load the table - Table loadedTable = catalog.loadTable(identifier); - assertThat(loadedTable).isInstanceOf(V1Table.class); + Table createdTable = + sparkCatalog.createTable(identifier, schema, new Transform[0], properties); + Table loadedTable = sparkCatalog.loadTable(identifier); + + // verify the create and load table result + if (PolarisCatalogUtils.useIceberg(format)) { + // iceberg SparkTable is returned for iceberg tables + assertThat(createdTable).isInstanceOf(SparkTable.class); + assertThat(loadedTable).isInstanceOf(SparkTable.class); + } else { + // Spark V1 table is returned for non-iceberg tables + assertThat(createdTable).isInstanceOf(V1Table.class); + assertThat(loadedTable).isInstanceOf(V1Table.class); + } } - - StructType newSchema = new StructType().add("LongType", "Long"); - Map<String, String> newProperties = Maps.newHashMap(); - newProperties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "parquet"); - newProperties.put(TableCatalog.PROP_LOCATION, "file:///tmp/path/to/table/"); - assertThatThrownBy( - () -> catalog.createTable(identifier, newSchema, new Transform[0], newProperties)) - .isInstanceOf(TableAlreadyExistsException.class); - } - - @Test - public void testUnsupportedOperations() { - String[] namespace = new String[] {"ns1"}; - Identifier identifier = Identifier.of(namespace, "table1"); - Identifier new_identifier = Identifier.of(namespace, "table2"); - // table methods - assertThatThrownBy(() -> catalog.alterTable(identifier)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> catalog.dropTable(identifier)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> catalog.renameTable(identifier, new_identifier)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> catalog.listTables(namespace)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> catalog.invalidateTable(identifier)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> catalog.purgeTable(identifier)) - .isInstanceOf(UnsupportedOperationException.class); } } diff --git a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java index 542fd05d8..e6747e653 100644 --- a/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java +++ b/plugins/spark/v3.5/spark/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java @@ -20,23 +20,48 @@ package org.apache.polaris.spark.rest; import static org.assertj.core.api.Assertions.assertThat; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonFactoryBuilder; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import java.util.Map; +import java.util.Set; import java.util.stream.Stream; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.rest.RESTSerializers; import org.apache.polaris.service.types.GenericTable; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; public class DeserializationTest { private ObjectMapper mapper; + private static final JsonFactory FACTORY = + new JsonFactoryBuilder() + .configure(JsonFactory.Feature.INTERN_FIELD_NAMES, false) + .configure(JsonFactory.Feature.FAIL_ON_SYMBOL_HASH_OVERFLOW, false) + .build(); @BeforeEach public void setUp() { - mapper = new ObjectMapper(); + // NOTE: This is the same setting as iceberg RESTObjectMapper.java. However, + // RESTObjectMapper is not a public class, therefore, we duplicate the + // setting here for serialization and deserialization tests. + mapper = new ObjectMapper(FACTORY); + mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + mapper.setVisibility(PropertyAccessor.CREATOR, JsonAutoDetect.Visibility.ANY); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + mapper.setPropertyNamingStrategy(new PropertyNamingStrategies.KebabCaseStrategy()); + RESTSerializers.registerAll(mapper); } @ParameterizedTest @@ -75,6 +100,36 @@ public class DeserializationTest { assertThat(deserializedRequest.getProperties().size()).isEqualTo(properties.size()); } + @Test + public void testListGenericTablesRESTResponse() throws JsonProcessingException { + Namespace namespace = Namespace.of("test-ns"); + Set<TableIdentifier> idents = + ImmutableSet.of( + TableIdentifier.of(namespace, "table1"), + TableIdentifier.of(namespace, "table2"), + TableIdentifier.of(namespace, "table3")); + + // page token is null + ListGenericTablesRESTResponse response = new ListGenericTablesRESTResponse(null, idents); + String json = mapper.writeValueAsString(response); + ListGenericTablesRESTResponse deserializedResponse = + mapper.readValue(json, ListGenericTablesRESTResponse.class); + assertThat(deserializedResponse.getNextPageToken()).isNull(); + assertThat(deserializedResponse.getIdentifiers().size()).isEqualTo(idents.size()); + for (TableIdentifier identifier : idents) { + assertThat(deserializedResponse.getIdentifiers()).contains(identifier); + } + + // page token is not null + response = new ListGenericTablesRESTResponse("page-token", idents); + json = mapper.writeValueAsString(response); + deserializedResponse = mapper.readValue(json, ListGenericTablesRESTResponse.class); + assertThat(deserializedResponse.getNextPageToken()).isEqualTo("page-token"); + for (TableIdentifier identifier : idents) { + assertThat(deserializedResponse.getIdentifiers()).contains(identifier); + } + } + private static Stream<Arguments> genericTableTestCases() { var doc = "table for testing"; var properties = Maps.newHashMap();