This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new ab799a8ac Spark: Add Namespaces and View support for SparkCatalog 
(#1332)
ab799a8ac is described below

commit ab799a8ac8099f137216e2675a0be19cac0901fd
Author: gh-yzou <167037035+gh-y...@users.noreply.github.com>
AuthorDate: Wed Apr 9 11:16:29 2025 -0700

    Spark: Add Namespaces and View support for SparkCatalog (#1332)
---
 .../org/apache/polaris/spark/SparkCatalog.java     | 120 +++++++--
 .../org/apache/polaris/spark/SparkCatalogTest.java | 278 ++++++++++++++++++---
 2 files changed, 350 insertions(+), 48 deletions(-)

diff --git 
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java
index 2ec0450a0..e38bbe1ad 100644
--- 
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java
+++ 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java
@@ -21,14 +21,35 @@ package org.apache.polaris.spark;
 import com.google.common.collect.ImmutableSet;
 import java.util.Map;
 import java.util.Set;
-import org.apache.iceberg.exceptions.NoSuchTableException;
-import org.apache.spark.sql.catalyst.analysis.*;
-import org.apache.spark.sql.connector.catalog.*;
+import org.apache.iceberg.spark.SupportsReplaceView;
+import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.catalyst.analysis.ViewAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.StagedTable;
+import org.apache.spark.sql.connector.catalog.StagingTableCatalog;
+import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.catalog.View;
+import org.apache.spark.sql.connector.catalog.ViewCatalog;
+import org.apache.spark.sql.connector.catalog.ViewChange;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 
-public class SparkCatalog implements TableCatalog, SupportsNamespaces, 
ViewCatalog {
+public class SparkCatalog
+    implements StagingTableCatalog,
+        TableCatalog,
+        SupportsNamespaces,
+        ViewCatalog,
+        SupportsReplaceView {
+
   private static final Set<String> DEFAULT_NS_KEYS = 
ImmutableSet.of(TableCatalog.PROP_OWNER);
   private String catalogName = null;
   private org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
@@ -43,6 +64,8 @@ public class SparkCatalog implements TableCatalog, 
SupportsNamespaces, ViewCatal
   @Override
   public void initialize(String name, CaseInsensitiveStringMap options) {
     this.catalogName = name;
+    this.icebergsSparkCatalog = new org.apache.iceberg.spark.SparkCatalog();
+    this.icebergsSparkCatalog.initialize(name, options);
   }
 
   @Override
@@ -73,58 +96,88 @@ public class SparkCatalog implements TableCatalog, 
SupportsNamespaces, ViewCatal
     throw new UnsupportedOperationException("renameTable");
   }
 
+  @Override
+  public void invalidateTable(Identifier ident) {
+    throw new UnsupportedOperationException("invalidateTable");
+  }
+
+  @Override
+  public boolean purgeTable(Identifier ident) {
+    throw new UnsupportedOperationException("purgeTable");
+  }
+
   @Override
   public Identifier[] listTables(String[] namespace) {
     throw new UnsupportedOperationException("listTables");
   }
 
+  @Override
+  public StagedTable stageCreate(
+      Identifier ident, StructType schema, Transform[] transforms, Map<String, 
String> properties)
+      throws TableAlreadyExistsException {
+    return this.icebergsSparkCatalog.stageCreate(ident, schema, transforms, 
properties);
+  }
+
+  @Override
+  public StagedTable stageReplace(
+      Identifier ident, StructType schema, Transform[] transforms, Map<String, 
String> properties)
+      throws NoSuchTableException {
+    return this.icebergsSparkCatalog.stageReplace(ident, schema, transforms, 
properties);
+  }
+
+  @Override
+  public StagedTable stageCreateOrReplace(
+      Identifier ident, StructType schema, Transform[] transforms, Map<String, 
String> properties) {
+    return this.icebergsSparkCatalog.stageCreateOrReplace(ident, schema, 
transforms, properties);
+  }
+
   @Override
   public String[] defaultNamespace() {
-    throw new UnsupportedOperationException("defaultNamespace");
+    return this.icebergsSparkCatalog.defaultNamespace();
   }
 
   @Override
   public String[][] listNamespaces() {
-    throw new UnsupportedOperationException("listNamespaces");
+    return this.icebergsSparkCatalog.listNamespaces();
   }
 
   @Override
   public String[][] listNamespaces(String[] namespace) throws 
NoSuchNamespaceException {
-    throw new UnsupportedOperationException("listNamespaces");
+    return this.icebergsSparkCatalog.listNamespaces(namespace);
   }
 
   @Override
   public Map<String, String> loadNamespaceMetadata(String[] namespace)
       throws NoSuchNamespaceException {
-    throw new UnsupportedOperationException("loadNamespaceMetadata");
+    return this.icebergsSparkCatalog.loadNamespaceMetadata(namespace);
   }
 
   @Override
   public void createNamespace(String[] namespace, Map<String, String> metadata)
       throws NamespaceAlreadyExistsException {
-    throw new UnsupportedOperationException("createNamespace");
+    this.icebergsSparkCatalog.createNamespace(namespace, metadata);
   }
 
   @Override
   public void alterNamespace(String[] namespace, NamespaceChange... changes)
       throws NoSuchNamespaceException {
-    throw new UnsupportedOperationException("alterNamespace");
+    this.icebergsSparkCatalog.alterNamespace(namespace, changes);
   }
 
   @Override
   public boolean dropNamespace(String[] namespace, boolean cascade)
       throws NoSuchNamespaceException {
-    throw new UnsupportedOperationException("dropNamespace");
+    return this.icebergsSparkCatalog.dropNamespace(namespace, cascade);
   }
 
   @Override
   public Identifier[] listViews(String... namespace) {
-    throw new UnsupportedOperationException("listViews");
+    return this.icebergsSparkCatalog.listViews(namespace);
   }
 
   @Override
   public View loadView(Identifier ident) throws NoSuchViewException {
-    throw new UnsupportedOperationException("loadView");
+    return this.icebergsSparkCatalog.loadView(ident);
   }
 
   @Override
@@ -139,23 +192,56 @@ public class SparkCatalog implements TableCatalog, 
SupportsNamespaces, ViewCatal
       String[] columnComments,
       Map<String, String> properties)
       throws ViewAlreadyExistsException, NoSuchNamespaceException {
-    throw new UnsupportedOperationException("createView");
+    return this.icebergsSparkCatalog.createView(
+        ident,
+        sql,
+        currentCatalog,
+        currentNamespace,
+        schema,
+        queryColumnNames,
+        columnAliases,
+        columnComments,
+        properties);
   }
 
   @Override
   public View alterView(Identifier ident, ViewChange... changes)
       throws NoSuchViewException, IllegalArgumentException {
-    throw new UnsupportedOperationException("alterView");
+    return this.icebergsSparkCatalog.alterView(ident, changes);
   }
 
   @Override
   public boolean dropView(Identifier ident) {
-    throw new UnsupportedOperationException("dropView");
+    return this.icebergsSparkCatalog.dropView(ident);
   }
 
   @Override
   public void renameView(Identifier fromIdentifier, Identifier toIdentifier)
       throws NoSuchViewException, ViewAlreadyExistsException {
-    throw new UnsupportedOperationException("renameView");
+    this.icebergsSparkCatalog.renameView(fromIdentifier, toIdentifier);
+  }
+
+  @Override
+  public View replaceView(
+      Identifier ident,
+      String sql,
+      String currentCatalog,
+      String[] currentNamespace,
+      StructType schema,
+      String[] queryColumnNames,
+      String[] columnAliases,
+      String[] columnComments,
+      Map<String, String> properties)
+      throws NoSuchNamespaceException, NoSuchViewException {
+    return this.icebergsSparkCatalog.replaceView(
+        ident,
+        sql,
+        currentCatalog,
+        currentNamespace,
+        schema,
+        queryColumnNames,
+        columnAliases,
+        columnComments,
+        properties);
   }
 }
diff --git 
a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
 
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
index 50c1e645a..70e9b00c5 100644
--- 
a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
+++ 
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
@@ -19,30 +19,271 @@
 package org.apache.polaris.spark;
 
 import static org.apache.iceberg.CatalogProperties.CATALOG_IMPL;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import com.google.common.collect.Maps;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.UUID;
-import org.apache.iceberg.relocated.com.google.common.collect.Maps;
-import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.spark.SparkUtil;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkContext;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
+import org.apache.spark.sql.connector.catalog.*;
+import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
 import org.mockito.Mockito;
 
 public class SparkCatalogTest {
   private SparkCatalog catalog;
   private String catalogName;
 
+  private static final String[] defaultNS = new String[] {"ns"};
+  private static final Schema defaultSchema =
+      new Schema(
+          5,
+          required(3, "id", Types.IntegerType.get(), "unique ID"),
+          required(4, "data", Types.StringType.get()));
+
   @BeforeEach
-  public void setup() {
+  public void setup() throws Exception {
     catalogName = "test_" + UUID.randomUUID();
     Map<String, String> catalogConfig = Maps.newHashMap();
     catalogConfig.put(CATALOG_IMPL, 
"org.apache.iceberg.inmemory.InMemoryCatalog");
     catalogConfig.put("cache-enabled", "false");
+
     catalog = new SparkCatalog();
-    catalog.initialize(catalogName, new 
CaseInsensitiveStringMap(catalogConfig));
+    Configuration conf = new Configuration();
+    try (MockedStatic<SparkSession> mockedStaticSparkSession =
+            Mockito.mockStatic(SparkSession.class);
+        MockedStatic<SparkUtil> mockedSparkUtil = 
Mockito.mockStatic(SparkUtil.class)) {
+      SparkSession mockedSession = Mockito.mock(SparkSession.class);
+      
mockedStaticSparkSession.when(SparkSession::active).thenReturn(mockedSession);
+      mockedSparkUtil
+          .when(() -> SparkUtil.hadoopConfCatalogOverrides(mockedSession, 
catalogName))
+          .thenReturn(conf);
+      SparkContext mockedContext = Mockito.mock(SparkContext.class);
+      Mockito.when(mockedSession.sparkContext()).thenReturn(mockedContext);
+      Mockito.when(mockedContext.applicationId()).thenReturn("appId");
+      Mockito.when(mockedContext.sparkUser()).thenReturn("test-user");
+      Mockito.when(mockedContext.version()).thenReturn("3.5");
+
+      catalog.initialize(catalogName, new 
CaseInsensitiveStringMap(catalogConfig));
+    }
+    catalog.createNamespace(defaultNS, Maps.newHashMap());
+  }
+
+  @Test
+  void testCreateAndLoadNamespace() throws Exception {
+    String[] namespace = new String[] {"ns1"};
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put("key1", "value1");
+
+    // no namespace can be found
+    assertThatThrownBy(() -> catalog.loadNamespaceMetadata(namespace))
+        .isInstanceOf(NoSuchNamespaceException.class);
+
+    // create the namespace
+    catalog.createNamespace(namespace, metadata);
+
+    Map<String, String> nsMetadata = catalog.loadNamespaceMetadata(namespace);
+    assertThat(nsMetadata).contains(Map.entry("key1", "value1"));
+  }
+
+  @Test
+  void testDropAndListNamespaces() throws Exception {
+    String[][] lv1ns = new String[][] {{"l1ns1"}, {"l1ns2"}};
+    String[][] lv2ns1 = new String[][] {{"l1ns1", "l2ns1"}, {"l1ns1", 
"l2ns2"}};
+    String[][] lv2ns2 = new String[][] {{"l1ns2", "l2ns3"}};
+
+    // create the namespaces
+    for (String[] namespace : lv1ns) {
+      catalog.createNamespace(namespace, Maps.newHashMap());
+    }
+    for (String[] namespace : lv2ns1) {
+      catalog.createNamespace(namespace, Maps.newHashMap());
+    }
+    for (String[] namespace : lv2ns2) {
+      catalog.createNamespace(namespace, Maps.newHashMap());
+    }
+
+    // list namespaces under root
+    String[][] lv1nsResult = catalog.listNamespaces();
+    assertThat(lv1nsResult.length).isEqualTo(lv1ns.length + 1);
+    assertThat(Arrays.asList(lv1nsResult)).contains(defaultNS);
+    for (String[] namespace : lv1ns) {
+      assertThat(Arrays.asList(lv1nsResult)).contains(namespace);
+    }
+    // list namespace under l1ns1
+    String[][] lv2ns1Result = catalog.listNamespaces(lv1ns[0]);
+    assertThat(lv2ns1Result.length).isEqualTo(lv2ns1.length);
+    for (String[] namespace : lv2ns1) {
+      assertThat(Arrays.asList(lv2ns1Result)).contains(namespace);
+    }
+    // list namespace under l1ns2
+    String[][] lv2ns2Result = catalog.listNamespaces(lv1ns[1]);
+    assertThat(lv2ns2Result.length).isEqualTo(lv2ns2.length);
+    for (String[] namespace : lv2ns2) {
+      assertThat(Arrays.asList(lv2ns2Result)).contains(namespace);
+    }
+    // no namespace under l1ns2.l2ns3
+    assertThat(catalog.listNamespaces(lv2ns2[0]).length).isEqualTo(0);
+
+    // drop l1ns2
+    catalog.dropNamespace(lv2ns2[0], true);
+    assertThat(catalog.listNamespaces(lv1ns[1]).length).isEqualTo(0);
+
+    catalog.dropNamespace(lv1ns[1], true);
+    assertThatThrownBy(() -> catalog.listNamespaces(lv1ns[1]))
+        .isInstanceOf(NoSuchNamespaceException.class);
+  }
+
+  @Test
+  void testAlterNamespace() throws Exception {
+    String[] namespace = new String[] {"ns1"};
+    Map<String, String> metadata = Maps.newHashMap();
+    metadata.put("orig_key1", "orig_value1");
+
+    catalog.createNamespace(namespace, metadata);
+    assertThat(catalog.loadNamespaceMetadata(namespace))
+        .contains(Map.entry("orig_key1", "orig_value1"));
+
+    catalog.alterNamespace(namespace, NamespaceChange.setProperty("new_key", 
"new_value"));
+    assertThat(catalog.loadNamespaceMetadata(namespace))
+        .contains(Map.entry("new_key", "new_value"));
+  }
+
+  @Test
+  void testStageOperations() throws Exception {
+    Identifier createId = Identifier.of(defaultNS, "iceberg-table-create");
+    Map<String, String> icebergProperties = Maps.newHashMap();
+    icebergProperties.put("provider", "iceberg");
+    icebergProperties.put(TableCatalog.PROP_LOCATION, 
"file:///tmp/path/to/iceberg-table/");
+    StructType iceberg_schema = new StructType().add("boolType", "boolean");
+
+    catalog.stageCreate(createId, iceberg_schema, new Transform[0], 
icebergProperties);
+
+    catalog.stageCreateOrReplace(createId, iceberg_schema, new Transform[0], 
icebergProperties);
+  }
+
+  @Test
+  void testBasicViewOperations() throws Exception {
+    Identifier viewIdentifier = Identifier.of(defaultNS, "test-view");
+    String viewSql = "select id from test-table where id < 3";
+    StructType schema = new StructType().add("id", "long");
+    catalog.createView(
+        viewIdentifier,
+        viewSql,
+        catalogName,
+        defaultNS,
+        schema,
+        new String[0],
+        new String[0],
+        new String[0],
+        Maps.newHashMap());
+
+    // load the view
+    View view = catalog.loadView(viewIdentifier);
+    assertThat(view.query()).isEqualTo(viewSql);
+    assertThat(view.schema()).isEqualTo(schema);
+
+    // alter the view properties
+    catalog.alterView(viewIdentifier, ViewChange.setProperty("view_key1", 
"view_value1"));
+    view = catalog.loadView(viewIdentifier);
+    assertThat(view.properties()).contains(Map.entry("view_key1", 
"view_value1"));
+
+    // rename the view
+    Identifier newIdentifier = Identifier.of(defaultNS, "new-view");
+    catalog.renameView(viewIdentifier, newIdentifier);
+    assertThatThrownBy(() -> catalog.loadView(viewIdentifier))
+        .isInstanceOf(NoSuchViewException.class);
+    view = catalog.loadView(newIdentifier);
+    assertThat(view.query()).isEqualTo(viewSql);
+    assertThat(view.schema()).isEqualTo(schema);
+
+    // replace the view
+    String newSql = "select id from test-table where id == 3";
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("key1", "value1");
+    catalog.replaceView(
+        newIdentifier,
+        newSql,
+        catalogName,
+        defaultNS,
+        schema,
+        new String[0],
+        new String[0],
+        new String[0],
+        properties);
+    view = catalog.loadView(newIdentifier);
+    assertThat(view.query()).isEqualTo(newSql);
+    assertThat(view.properties()).contains(Map.entry("key1", "value1"));
+
+    // drop the view
+    catalog.dropView(newIdentifier);
+    assertThatThrownBy(() -> catalog.loadView(newIdentifier))
+        .isInstanceOf(NoSuchViewException.class);
+  }
+
+  @Test
+  void testListViews() throws Exception {
+    // create a new namespace under the default NS
+    String[] namespace = new String[] {"ns", "nsl2"};
+    catalog.createNamespace(namespace, Maps.newHashMap());
+    // table schema
+    StructType schema = new StructType().add("id", "long").add("name", 
"string");
+    // create  under defaultNS
+    String view1Name = "test-view1";
+    String view1SQL = "select id from test-table where id >= 3";
+    catalog.createView(
+        Identifier.of(defaultNS, view1Name),
+        view1SQL,
+        catalogName,
+        defaultNS,
+        schema,
+        new String[0],
+        new String[0],
+        new String[0],
+        Maps.newHashMap());
+    // create two views under ns.nsl2
+    String[] nsl2ViewNames = new String[] {"test-view2", "test-view3"};
+    String[] nsl2ViewSQLs =
+        new String[] {
+          "select id from test-table where id == 3", "select id from 
test-table where id < 3"
+        };
+    for (int i = 0; i < nsl2ViewNames.length; i++) {
+      catalog.createView(
+          Identifier.of(namespace, nsl2ViewNames[i]),
+          nsl2ViewSQLs[i],
+          catalogName,
+          namespace,
+          schema,
+          new String[0],
+          new String[0],
+          new String[0],
+          Maps.newHashMap());
+    }
+    // list views under defaultNS
+    Identifier[] l1Views = catalog.listViews(defaultNS);
+    assertThat(l1Views.length).isEqualTo(1);
+    assertThat(l1Views[0].name()).isEqualTo(view1Name);
+
+    // list views under ns1.nsl2
+    Identifier[] l2Views = catalog.listViews(namespace);
+    assertThat(l2Views.length).isEqualTo(nsl2ViewSQLs.length);
+    for (String name : nsl2ViewNames) {
+      assertThat(Arrays.asList(l2Views)).contains(Identifier.of(namespace, 
name));
+    }
   }
 
   @Test
@@ -64,34 +305,9 @@ public class SparkCatalogTest {
         .isInstanceOf(UnsupportedOperationException.class);
     assertThatThrownBy(() -> catalog.listTables(namespace))
         .isInstanceOf(UnsupportedOperationException.class);
-
-    // namespace methods
-    assertThatThrownBy(() -> catalog.loadNamespaceMetadata(namespace))
-        .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(() -> catalog.listNamespaces())
-        .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(() -> catalog.listNamespaces(namespace))
-        .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(() -> catalog.createNamespace(namespace, null))
-        .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(() -> catalog.alterNamespace(namespace))
-        .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(() -> catalog.dropNamespace(namespace, false))
-        .isInstanceOf(UnsupportedOperationException.class);
-
-    // view methods
-    assertThatThrownBy(() -> catalog.listViews(namespace))
-        .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(() -> catalog.loadView(identifier))
-        .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(
-            () -> catalog.createView(identifier, null, null, null, null, null, 
null, null, null))
-        .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(() -> catalog.alterView(identifier))
-        .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(() -> catalog.dropView(identifier))
+    assertThatThrownBy(() -> catalog.invalidateTable(identifier))
         .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(() -> catalog.renameView(identifier, new_identifier))
+    assertThatThrownBy(() -> catalog.purgeTable(identifier))
         .isInstanceOf(UnsupportedOperationException.class);
   }
 }

Reply via email to