This is an automated email from the ASF dual-hosted git repository. yufei pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push: new ab799a8ac Spark: Add Namespaces and View support for SparkCatalog (#1332) ab799a8ac is described below commit ab799a8ac8099f137216e2675a0be19cac0901fd Author: gh-yzou <167037035+gh-y...@users.noreply.github.com> AuthorDate: Wed Apr 9 11:16:29 2025 -0700 Spark: Add Namespaces and View support for SparkCatalog (#1332) --- .../org/apache/polaris/spark/SparkCatalog.java | 120 +++++++-- .../org/apache/polaris/spark/SparkCatalogTest.java | 278 ++++++++++++++++++--- 2 files changed, 350 insertions(+), 48 deletions(-) diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java index 2ec0450a0..e38bbe1ad 100644 --- a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java +++ b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java @@ -21,14 +21,35 @@ package org.apache.polaris.spark; import com.google.common.collect.ImmutableSet; import java.util.Map; import java.util.Set; -import org.apache.iceberg.exceptions.NoSuchTableException; -import org.apache.spark.sql.catalyst.analysis.*; -import org.apache.spark.sql.connector.catalog.*; +import org.apache.iceberg.spark.SupportsReplaceView; +import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.NamespaceChange; +import org.apache.spark.sql.connector.catalog.StagedTable; +import org.apache.spark.sql.connector.catalog.StagingTableCatalog; +import org.apache.spark.sql.connector.catalog.SupportsNamespaces; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.catalog.View; +import org.apache.spark.sql.connector.catalog.ViewCatalog; +import org.apache.spark.sql.connector.catalog.ViewChange; import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; -public class SparkCatalog implements TableCatalog, SupportsNamespaces, ViewCatalog { +public class SparkCatalog + implements StagingTableCatalog, + TableCatalog, + SupportsNamespaces, + ViewCatalog, + SupportsReplaceView { + private static final Set<String> DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER); private String catalogName = null; private org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null; @@ -43,6 +64,8 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces, ViewCatal @Override public void initialize(String name, CaseInsensitiveStringMap options) { this.catalogName = name; + this.icebergsSparkCatalog = new org.apache.iceberg.spark.SparkCatalog(); + this.icebergsSparkCatalog.initialize(name, options); } @Override @@ -73,58 +96,88 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces, ViewCatal throw new UnsupportedOperationException("renameTable"); } + @Override + public void invalidateTable(Identifier ident) { + throw new UnsupportedOperationException("invalidateTable"); + } + + @Override + public boolean purgeTable(Identifier ident) { + throw new UnsupportedOperationException("purgeTable"); + } + @Override public Identifier[] listTables(String[] namespace) { throw new UnsupportedOperationException("listTables"); } + @Override + public StagedTable stageCreate( + Identifier ident, StructType schema, Transform[] transforms, Map<String, String> properties) + throws TableAlreadyExistsException { + return this.icebergsSparkCatalog.stageCreate(ident, schema, transforms, properties); + } + + @Override + public StagedTable stageReplace( + Identifier ident, StructType schema, Transform[] transforms, Map<String, String> properties) + throws NoSuchTableException { + return this.icebergsSparkCatalog.stageReplace(ident, schema, transforms, properties); + } + + @Override + public StagedTable stageCreateOrReplace( + Identifier ident, StructType schema, Transform[] transforms, Map<String, String> properties) { + return this.icebergsSparkCatalog.stageCreateOrReplace(ident, schema, transforms, properties); + } + @Override public String[] defaultNamespace() { - throw new UnsupportedOperationException("defaultNamespace"); + return this.icebergsSparkCatalog.defaultNamespace(); } @Override public String[][] listNamespaces() { - throw new UnsupportedOperationException("listNamespaces"); + return this.icebergsSparkCatalog.listNamespaces(); } @Override public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceException { - throw new UnsupportedOperationException("listNamespaces"); + return this.icebergsSparkCatalog.listNamespaces(namespace); } @Override public Map<String, String> loadNamespaceMetadata(String[] namespace) throws NoSuchNamespaceException { - throw new UnsupportedOperationException("loadNamespaceMetadata"); + return this.icebergsSparkCatalog.loadNamespaceMetadata(namespace); } @Override public void createNamespace(String[] namespace, Map<String, String> metadata) throws NamespaceAlreadyExistsException { - throw new UnsupportedOperationException("createNamespace"); + this.icebergsSparkCatalog.createNamespace(namespace, metadata); } @Override public void alterNamespace(String[] namespace, NamespaceChange... changes) throws NoSuchNamespaceException { - throw new UnsupportedOperationException("alterNamespace"); + this.icebergsSparkCatalog.alterNamespace(namespace, changes); } @Override public boolean dropNamespace(String[] namespace, boolean cascade) throws NoSuchNamespaceException { - throw new UnsupportedOperationException("dropNamespace"); + return this.icebergsSparkCatalog.dropNamespace(namespace, cascade); } @Override public Identifier[] listViews(String... namespace) { - throw new UnsupportedOperationException("listViews"); + return this.icebergsSparkCatalog.listViews(namespace); } @Override public View loadView(Identifier ident) throws NoSuchViewException { - throw new UnsupportedOperationException("loadView"); + return this.icebergsSparkCatalog.loadView(ident); } @Override @@ -139,23 +192,56 @@ public class SparkCatalog implements TableCatalog, SupportsNamespaces, ViewCatal String[] columnComments, Map<String, String> properties) throws ViewAlreadyExistsException, NoSuchNamespaceException { - throw new UnsupportedOperationException("createView"); + return this.icebergsSparkCatalog.createView( + ident, + sql, + currentCatalog, + currentNamespace, + schema, + queryColumnNames, + columnAliases, + columnComments, + properties); } @Override public View alterView(Identifier ident, ViewChange... changes) throws NoSuchViewException, IllegalArgumentException { - throw new UnsupportedOperationException("alterView"); + return this.icebergsSparkCatalog.alterView(ident, changes); } @Override public boolean dropView(Identifier ident) { - throw new UnsupportedOperationException("dropView"); + return this.icebergsSparkCatalog.dropView(ident); } @Override public void renameView(Identifier fromIdentifier, Identifier toIdentifier) throws NoSuchViewException, ViewAlreadyExistsException { - throw new UnsupportedOperationException("renameView"); + this.icebergsSparkCatalog.renameView(fromIdentifier, toIdentifier); + } + + @Override + public View replaceView( + Identifier ident, + String sql, + String currentCatalog, + String[] currentNamespace, + StructType schema, + String[] queryColumnNames, + String[] columnAliases, + String[] columnComments, + Map<String, String> properties) + throws NoSuchNamespaceException, NoSuchViewException { + return this.icebergsSparkCatalog.replaceView( + ident, + sql, + currentCatalog, + currentNamespace, + schema, + queryColumnNames, + columnAliases, + columnComments, + properties); } } diff --git a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java index 50c1e645a..70e9b00c5 100644 --- a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java +++ b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java @@ -19,30 +19,271 @@ package org.apache.polaris.spark; import static org.apache.iceberg.CatalogProperties.CATALOG_IMPL; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import com.google.common.collect.Maps; +import java.util.Arrays; import java.util.Map; import java.util.UUID; -import org.apache.iceberg.relocated.com.google.common.collect.Maps; -import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.Schema; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.types.Types; +import org.apache.spark.SparkContext; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; +import org.apache.spark.sql.connector.catalog.*; +import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; import org.mockito.Mockito; public class SparkCatalogTest { private SparkCatalog catalog; private String catalogName; + private static final String[] defaultNS = new String[] {"ns"}; + private static final Schema defaultSchema = + new Schema( + 5, + required(3, "id", Types.IntegerType.get(), "unique ID"), + required(4, "data", Types.StringType.get())); + @BeforeEach - public void setup() { + public void setup() throws Exception { catalogName = "test_" + UUID.randomUUID(); Map<String, String> catalogConfig = Maps.newHashMap(); catalogConfig.put(CATALOG_IMPL, "org.apache.iceberg.inmemory.InMemoryCatalog"); catalogConfig.put("cache-enabled", "false"); + catalog = new SparkCatalog(); - catalog.initialize(catalogName, new CaseInsensitiveStringMap(catalogConfig)); + Configuration conf = new Configuration(); + try (MockedStatic<SparkSession> mockedStaticSparkSession = + Mockito.mockStatic(SparkSession.class); + MockedStatic<SparkUtil> mockedSparkUtil = Mockito.mockStatic(SparkUtil.class)) { + SparkSession mockedSession = Mockito.mock(SparkSession.class); + mockedStaticSparkSession.when(SparkSession::active).thenReturn(mockedSession); + mockedSparkUtil + .when(() -> SparkUtil.hadoopConfCatalogOverrides(mockedSession, catalogName)) + .thenReturn(conf); + SparkContext mockedContext = Mockito.mock(SparkContext.class); + Mockito.when(mockedSession.sparkContext()).thenReturn(mockedContext); + Mockito.when(mockedContext.applicationId()).thenReturn("appId"); + Mockito.when(mockedContext.sparkUser()).thenReturn("test-user"); + Mockito.when(mockedContext.version()).thenReturn("3.5"); + + catalog.initialize(catalogName, new CaseInsensitiveStringMap(catalogConfig)); + } + catalog.createNamespace(defaultNS, Maps.newHashMap()); + } + + @Test + void testCreateAndLoadNamespace() throws Exception { + String[] namespace = new String[] {"ns1"}; + Map<String, String> metadata = Maps.newHashMap(); + metadata.put("key1", "value1"); + + // no namespace can be found + assertThatThrownBy(() -> catalog.loadNamespaceMetadata(namespace)) + .isInstanceOf(NoSuchNamespaceException.class); + + // create the namespace + catalog.createNamespace(namespace, metadata); + + Map<String, String> nsMetadata = catalog.loadNamespaceMetadata(namespace); + assertThat(nsMetadata).contains(Map.entry("key1", "value1")); + } + + @Test + void testDropAndListNamespaces() throws Exception { + String[][] lv1ns = new String[][] {{"l1ns1"}, {"l1ns2"}}; + String[][] lv2ns1 = new String[][] {{"l1ns1", "l2ns1"}, {"l1ns1", "l2ns2"}}; + String[][] lv2ns2 = new String[][] {{"l1ns2", "l2ns3"}}; + + // create the namespaces + for (String[] namespace : lv1ns) { + catalog.createNamespace(namespace, Maps.newHashMap()); + } + for (String[] namespace : lv2ns1) { + catalog.createNamespace(namespace, Maps.newHashMap()); + } + for (String[] namespace : lv2ns2) { + catalog.createNamespace(namespace, Maps.newHashMap()); + } + + // list namespaces under root + String[][] lv1nsResult = catalog.listNamespaces(); + assertThat(lv1nsResult.length).isEqualTo(lv1ns.length + 1); + assertThat(Arrays.asList(lv1nsResult)).contains(defaultNS); + for (String[] namespace : lv1ns) { + assertThat(Arrays.asList(lv1nsResult)).contains(namespace); + } + // list namespace under l1ns1 + String[][] lv2ns1Result = catalog.listNamespaces(lv1ns[0]); + assertThat(lv2ns1Result.length).isEqualTo(lv2ns1.length); + for (String[] namespace : lv2ns1) { + assertThat(Arrays.asList(lv2ns1Result)).contains(namespace); + } + // list namespace under l1ns2 + String[][] lv2ns2Result = catalog.listNamespaces(lv1ns[1]); + assertThat(lv2ns2Result.length).isEqualTo(lv2ns2.length); + for (String[] namespace : lv2ns2) { + assertThat(Arrays.asList(lv2ns2Result)).contains(namespace); + } + // no namespace under l1ns2.l2ns3 + assertThat(catalog.listNamespaces(lv2ns2[0]).length).isEqualTo(0); + + // drop l1ns2 + catalog.dropNamespace(lv2ns2[0], true); + assertThat(catalog.listNamespaces(lv1ns[1]).length).isEqualTo(0); + + catalog.dropNamespace(lv1ns[1], true); + assertThatThrownBy(() -> catalog.listNamespaces(lv1ns[1])) + .isInstanceOf(NoSuchNamespaceException.class); + } + + @Test + void testAlterNamespace() throws Exception { + String[] namespace = new String[] {"ns1"}; + Map<String, String> metadata = Maps.newHashMap(); + metadata.put("orig_key1", "orig_value1"); + + catalog.createNamespace(namespace, metadata); + assertThat(catalog.loadNamespaceMetadata(namespace)) + .contains(Map.entry("orig_key1", "orig_value1")); + + catalog.alterNamespace(namespace, NamespaceChange.setProperty("new_key", "new_value")); + assertThat(catalog.loadNamespaceMetadata(namespace)) + .contains(Map.entry("new_key", "new_value")); + } + + @Test + void testStageOperations() throws Exception { + Identifier createId = Identifier.of(defaultNS, "iceberg-table-create"); + Map<String, String> icebergProperties = Maps.newHashMap(); + icebergProperties.put("provider", "iceberg"); + icebergProperties.put(TableCatalog.PROP_LOCATION, "file:///tmp/path/to/iceberg-table/"); + StructType iceberg_schema = new StructType().add("boolType", "boolean"); + + catalog.stageCreate(createId, iceberg_schema, new Transform[0], icebergProperties); + + catalog.stageCreateOrReplace(createId, iceberg_schema, new Transform[0], icebergProperties); + } + + @Test + void testBasicViewOperations() throws Exception { + Identifier viewIdentifier = Identifier.of(defaultNS, "test-view"); + String viewSql = "select id from test-table where id < 3"; + StructType schema = new StructType().add("id", "long"); + catalog.createView( + viewIdentifier, + viewSql, + catalogName, + defaultNS, + schema, + new String[0], + new String[0], + new String[0], + Maps.newHashMap()); + + // load the view + View view = catalog.loadView(viewIdentifier); + assertThat(view.query()).isEqualTo(viewSql); + assertThat(view.schema()).isEqualTo(schema); + + // alter the view properties + catalog.alterView(viewIdentifier, ViewChange.setProperty("view_key1", "view_value1")); + view = catalog.loadView(viewIdentifier); + assertThat(view.properties()).contains(Map.entry("view_key1", "view_value1")); + + // rename the view + Identifier newIdentifier = Identifier.of(defaultNS, "new-view"); + catalog.renameView(viewIdentifier, newIdentifier); + assertThatThrownBy(() -> catalog.loadView(viewIdentifier)) + .isInstanceOf(NoSuchViewException.class); + view = catalog.loadView(newIdentifier); + assertThat(view.query()).isEqualTo(viewSql); + assertThat(view.schema()).isEqualTo(schema); + + // replace the view + String newSql = "select id from test-table where id == 3"; + Map<String, String> properties = Maps.newHashMap(); + properties.put("key1", "value1"); + catalog.replaceView( + newIdentifier, + newSql, + catalogName, + defaultNS, + schema, + new String[0], + new String[0], + new String[0], + properties); + view = catalog.loadView(newIdentifier); + assertThat(view.query()).isEqualTo(newSql); + assertThat(view.properties()).contains(Map.entry("key1", "value1")); + + // drop the view + catalog.dropView(newIdentifier); + assertThatThrownBy(() -> catalog.loadView(newIdentifier)) + .isInstanceOf(NoSuchViewException.class); + } + + @Test + void testListViews() throws Exception { + // create a new namespace under the default NS + String[] namespace = new String[] {"ns", "nsl2"}; + catalog.createNamespace(namespace, Maps.newHashMap()); + // table schema + StructType schema = new StructType().add("id", "long").add("name", "string"); + // create under defaultNS + String view1Name = "test-view1"; + String view1SQL = "select id from test-table where id >= 3"; + catalog.createView( + Identifier.of(defaultNS, view1Name), + view1SQL, + catalogName, + defaultNS, + schema, + new String[0], + new String[0], + new String[0], + Maps.newHashMap()); + // create two views under ns.nsl2 + String[] nsl2ViewNames = new String[] {"test-view2", "test-view3"}; + String[] nsl2ViewSQLs = + new String[] { + "select id from test-table where id == 3", "select id from test-table where id < 3" + }; + for (int i = 0; i < nsl2ViewNames.length; i++) { + catalog.createView( + Identifier.of(namespace, nsl2ViewNames[i]), + nsl2ViewSQLs[i], + catalogName, + namespace, + schema, + new String[0], + new String[0], + new String[0], + Maps.newHashMap()); + } + // list views under defaultNS + Identifier[] l1Views = catalog.listViews(defaultNS); + assertThat(l1Views.length).isEqualTo(1); + assertThat(l1Views[0].name()).isEqualTo(view1Name); + + // list views under ns1.nsl2 + Identifier[] l2Views = catalog.listViews(namespace); + assertThat(l2Views.length).isEqualTo(nsl2ViewSQLs.length); + for (String name : nsl2ViewNames) { + assertThat(Arrays.asList(l2Views)).contains(Identifier.of(namespace, name)); + } } @Test @@ -64,34 +305,9 @@ public class SparkCatalogTest { .isInstanceOf(UnsupportedOperationException.class); assertThatThrownBy(() -> catalog.listTables(namespace)) .isInstanceOf(UnsupportedOperationException.class); - - // namespace methods - assertThatThrownBy(() -> catalog.loadNamespaceMetadata(namespace)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> catalog.listNamespaces()) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> catalog.listNamespaces(namespace)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> catalog.createNamespace(namespace, null)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> catalog.alterNamespace(namespace)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> catalog.dropNamespace(namespace, false)) - .isInstanceOf(UnsupportedOperationException.class); - - // view methods - assertThatThrownBy(() -> catalog.listViews(namespace)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> catalog.loadView(identifier)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy( - () -> catalog.createView(identifier, null, null, null, null, null, null, null, null)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> catalog.alterView(identifier)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> catalog.dropView(identifier)) + assertThatThrownBy(() -> catalog.invalidateTable(identifier)) .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> catalog.renameView(identifier, new_identifier)) + assertThatThrownBy(() -> catalog.purgeTable(identifier)) .isInstanceOf(UnsupportedOperationException.class); } }