This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 97a49e0ec Spark: Add CreateTable and LoadTable implementation for 
SparkCatalog (#1303)
97a49e0ec is described below

commit 97a49e0ec3910e61d1d55d3a06fc01c3c5724bf7
Author: gh-yzou <167037035+gh-y...@users.noreply.github.com>
AuthorDate: Fri Apr 11 10:56:37 2025 -0700

    Spark: Add CreateTable and LoadTable implementation for SparkCatalog (#1303)
---
 plugins/pluginlibs.versions.toml                   |   3 +
 plugins/spark/v3.5/build.gradle.kts                |  73 +++++++-
 .../org/apache/polaris/spark/PolarisCatalog.java   |  36 ++++
 .../apache/polaris/spark/PolarisRESTCatalog.java   | 184 ++++++++++++++++++++
 .../apache/polaris/spark/PolarisSparkCatalog.java  | 111 +++++++++++++
 .../org/apache/polaris/spark/SparkCatalog.java     | 103 ++++++++++--
 .../spark/rest/CreateGenericTableRESTRequest.java  |  46 +++++
 .../spark/rest/LoadGenericTableRESTResponse.java   |  42 +++++
 .../apache/polaris/spark/utils/DeltaHelper.java    | 107 ++++++++++++
 .../polaris/spark/utils/PolarisCatalogUtils.java   | 112 +++++++++++++
 .../org/apache/polaris/spark/NoopDeltaCatalog.java |  32 ++++
 .../polaris/spark/PolarisInMemoryCatalog.java      |  90 ++++++++++
 .../org/apache/polaris/spark/SparkCatalogTest.java | 185 +++++++++++++++++++--
 .../polaris/spark/rest/DeserializationTest.java    |  88 ++++++++++
 14 files changed, 1175 insertions(+), 37 deletions(-)

diff --git a/plugins/pluginlibs.versions.toml b/plugins/pluginlibs.versions.toml
index 0a4a515e5..e48f6ef45 100644
--- a/plugins/pluginlibs.versions.toml
+++ b/plugins/pluginlibs.versions.toml
@@ -20,3 +20,6 @@
 [versions]
 iceberg = "1.8.1"
 spark35 = "3.5.5"
+scala212 = "2.12.19"
+scala213 = "2.13.15"
+
diff --git a/plugins/spark/v3.5/build.gradle.kts 
b/plugins/spark/v3.5/build.gradle.kts
index 36ca6d528..df37fa229 100644
--- a/plugins/spark/v3.5/build.gradle.kts
+++ b/plugins/spark/v3.5/build.gradle.kts
@@ -41,18 +41,34 @@ val scalaVersion = getAndUseScalaVersionForProject()
 val icebergVersion = pluginlibs.versions.iceberg.get()
 val spark35Version = pluginlibs.versions.spark35.get()
 
+val scalaLibraryVersion =
+  if (scalaVersion == "2.12") {
+    pluginlibs.versions.scala212.get()
+  } else {
+    pluginlibs.versions.scala213.get()
+  }
+
 dependencies {
   implementation(project(":polaris-api-iceberg-service")) {
-    // exclude the iceberg and jackson dependencies, use the
-    // dependencies packed in the iceberg-spark dependency
+    // exclude the iceberg dependencies, use the ones pulled
+    // by iceberg-core
     exclude("org.apache.iceberg", "*")
-    exclude("com.fasterxml.jackson.core", "*")
   }
+  implementation(project(":polaris-api-catalog-service"))
+  implementation(project(":polaris-core")) { exclude("org.apache.iceberg", 
"*") }
+
+  implementation("org.apache.iceberg:iceberg-core:${icebergVersion}")
 
   implementation(
     
"org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}"
-  )
+  ) {
+    // exclude the iceberg rest dependencies, use the ones pulled
+    // with iceberg-core dependency
+    exclude("org.apache.iceberg", "iceberg-core")
+  }
 
+  compileOnly("org.scala-lang:scala-library:${scalaLibraryVersion}")
+  compileOnly("org.scala-lang:scala-reflect:${scalaLibraryVersion}")
   compileOnly("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") {
     // exclude log4j dependencies
     exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
@@ -78,24 +94,65 @@ dependencies {
   }
 }
 
+// TODO: replace the check using gradlew checkstyle plugin
+tasks.register("checkNoDisallowedImports") {
+  doLast {
+    // List of disallowed imports. Right now, we disallow usage of shaded or
+    // relocated libraries in the iceberg spark runtime jar.
+    val disallowedImports =
+      listOf("import org.apache.iceberg.shaded.", 
"org.apache.iceberg.relocated.")
+
+    // Directory to scan for Java files
+    val sourceDirs = listOf(file("src/main/java"), file("src/test/java"))
+
+    val violations = mutableListOf<String>()
+    // Scan Java files in each directory
+    sourceDirs.forEach { sourceDir ->
+      fileTree(sourceDir)
+        .matching {
+          include("**/*.java") // Only include Java files
+        }
+        .forEach { file ->
+          val content = file.readText()
+          disallowedImports.forEach { importStatement ->
+            if (content.contains(importStatement)) {
+              violations.add(
+                "Disallowed import found in ${file.relativeTo(projectDir)}: 
$importStatement"
+              )
+            }
+          }
+        }
+    }
+
+    if (violations.isNotEmpty()) {
+      throw GradleException("Disallowed imports found! $violations")
+    }
+  }
+}
+
+tasks.named("check") { dependsOn("checkNoDisallowedImports") }
+
 tasks.register<ShadowJar>("createPolarisSparkJar") {
   archiveClassifier = null
   archiveBaseName =
     
"polaris-iceberg-${icebergVersion}-spark-runtime-${sparkMajorVersion}_${scalaVersion}"
   isZip64 = true
 
-  dependencies { exclude("META-INF/**") }
+  mergeServiceFiles()
 
   // pack both the source code and dependencies
   from(sourceSets.main.get().output)
   configurations = listOf(project.configurations.runtimeClasspath.get())
 
-  mergeServiceFiles()
-
   // Optimization: Minimize the JAR (remove unused classes from dependencies)
   // The iceberg-spark-runtime plugin is always packaged along with our 
polaris-spark plugin,
   // therefore excluded from the optimization.
-  minimize { 
exclude(dependency("org.apache.iceberg:iceberg-spark-runtime-*.*")) }
+  minimize {
+    exclude(dependency("org.apache.iceberg:iceberg-spark-runtime-*.*"))
+    exclude(dependency("org.apache.iceberg:iceberg-core*.*"))
+  }
+
+  relocate("com.fasterxml", "org.apache.polaris.shaded.com.fasterxml.jackson")
 }
 
 tasks.withType(Jar::class).named("sourcesJar") { 
dependsOn("createPolarisSparkJar") }
diff --git 
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisCatalog.java 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisCatalog.java
new file mode 100644
index 000000000..31a6ac189
--- /dev/null
+++ 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisCatalog.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.polaris.service.types.GenericTable;
+
+public interface PolarisCatalog {
+  List<TableIdentifier> listGenericTables(Namespace ns);
+
+  GenericTable loadGenericTable(TableIdentifier identifier);
+
+  boolean dropGenericTable(TableIdentifier identifier);
+
+  GenericTable createGenericTable(
+      TableIdentifier identifier, String format, String doc, Map<String, 
String> props);
+}
diff --git 
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java
 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java
new file mode 100644
index 000000000..0b8743132
--- /dev/null
+++ 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.rest.Endpoint;
+import org.apache.iceberg.rest.ErrorHandlers;
+import org.apache.iceberg.rest.HTTPClient;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.ResourcePaths;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.rest.responses.ConfigResponse;
+import org.apache.iceberg.util.EnvironmentUtil;
+import org.apache.polaris.core.rest.PolarisEndpoints;
+import org.apache.polaris.core.rest.PolarisResourcePaths;
+import org.apache.polaris.service.types.GenericTable;
+import org.apache.polaris.spark.rest.CreateGenericTableRESTRequest;
+import org.apache.polaris.spark.rest.LoadGenericTableRESTResponse;
+
+/**
+ * [[PolarisRESTCatalog]] talks to Polaris REST APIs, and implements the 
PolarisCatalog interfaces,
+ * which are generic table related APIs at this moment. This class doesn't 
interact with any Spark
+ * objects.
+ */
+public class PolarisRESTCatalog implements PolarisCatalog, Closeable {
+  private final Function<Map<String, String>, RESTClient> clientBuilder;
+
+  private RESTClient restClient = null;
+  private CloseableGroup closeables = null;
+  private Set<Endpoint> endpoints;
+  private OAuth2Util.AuthSession catalogAuth = null;
+  private PolarisResourcePaths pathGenerator = null;
+
+  // the default endpoints to config if server doesn't specify the 'endpoints' 
configuration.
+  private static final Set<Endpoint> DEFAULT_ENDPOINTS = 
PolarisEndpoints.GENERIC_TABLE_ENDPOINTS;
+
+  public PolarisRESTCatalog() {
+    this(config -> 
HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build());
+  }
+
+  public PolarisRESTCatalog(Function<Map<String, String>, RESTClient> 
clientBuilder) {
+    this.clientBuilder = clientBuilder;
+  }
+
+  public void initialize(Map<String, String> unresolved, 
OAuth2Util.AuthSession catalogAuth) {
+    Preconditions.checkArgument(unresolved != null, "Invalid configuration: 
null");
+
+    // Resolve any configuration that is supplied by environment variables.
+    // For example: if we have an entity ("key", "env:envVar") in the 
unresolved,
+    // and envVar is configured to envValue in system env. After resolve, we 
got
+    // entity ("key", "envValue").
+    Map<String, String> props = EnvironmentUtil.resolveAll(unresolved);
+
+    // TODO: switch to use authManager once iceberg dependency is updated to 
1.9.0
+    this.catalogAuth = catalogAuth;
+
+    ConfigResponse config;
+    try (RESTClient initClient = 
clientBuilder.apply(props).withAuthSession(catalogAuth)) {
+      config = fetchConfig(initClient, catalogAuth.headers(), props);
+    } catch (IOException e) {
+      throw new UncheckedIOException("Failed to close HTTP client", e);
+    }
+
+    // call getConfig to get the server configurations
+    Map<String, String> mergedProps = config.merge(props);
+    if (config.endpoints().isEmpty()) {
+      this.endpoints = DEFAULT_ENDPOINTS;
+    } else {
+      this.endpoints = ImmutableSet.copyOf(config.endpoints());
+    }
+
+    this.pathGenerator = 
PolarisResourcePaths.forCatalogProperties(mergedProps);
+    this.restClient = 
clientBuilder.apply(mergedProps).withAuthSession(catalogAuth);
+
+    this.closeables = new CloseableGroup();
+    this.closeables.addCloseable(this.restClient);
+    this.closeables.setSuppressCloseFailure(true);
+  }
+
+  protected static ConfigResponse fetchConfig(
+      RESTClient client, Map<String, String> headers, Map<String, String> 
properties) {
+    // send the client's warehouse location to the service to keep in sync
+    // this is needed for cases where the warehouse is configured at client 
side,
+    // and used by Polaris server as catalog name.
+    ImmutableMap.Builder<String, String> queryParams = ImmutableMap.builder();
+    if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) {
+      queryParams.put(
+          CatalogProperties.WAREHOUSE_LOCATION,
+          properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+    }
+
+    ConfigResponse configResponse =
+        client.get(
+            ResourcePaths.config(),
+            queryParams.build(),
+            ConfigResponse.class,
+            headers,
+            ErrorHandlers.defaultErrorHandler());
+    configResponse.validate();
+    return configResponse;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closeables != null) {
+      closeables.close();
+    }
+  }
+
+  @Override
+  public List<TableIdentifier> listGenericTables(Namespace ns) {
+    throw new UnsupportedOperationException("listTables not supported");
+  }
+
+  @Override
+  public boolean dropGenericTable(TableIdentifier identifier) {
+    throw new UnsupportedOperationException("dropTable not supported");
+  }
+
+  @Override
+  public GenericTable createGenericTable(
+      TableIdentifier identifier, String format, String doc, Map<String, 
String> props) {
+    Endpoint.check(endpoints, PolarisEndpoints.V1_CREATE_GENERIC_TABLE);
+    CreateGenericTableRESTRequest request =
+        new CreateGenericTableRESTRequest(identifier.name(), format, doc, 
props);
+
+    LoadGenericTableRESTResponse response =
+        restClient
+            .withAuthSession(this.catalogAuth)
+            .post(
+                pathGenerator.genericTables(identifier.namespace()),
+                request,
+                LoadGenericTableRESTResponse.class,
+                Map.of(),
+                ErrorHandlers.tableErrorHandler());
+
+    return response.getTable();
+  }
+
+  @Override
+  public GenericTable loadGenericTable(TableIdentifier identifier) {
+    Endpoint.check(endpoints, PolarisEndpoints.V1_LOAD_GENERIC_TABLE);
+    LoadGenericTableRESTResponse response =
+        restClient
+            .withAuthSession(this.catalogAuth)
+            .get(
+                pathGenerator.genericTable(identifier),
+                null,
+                LoadGenericTableRESTResponse.class,
+                Map.of(),
+                ErrorHandlers.tableErrorHandler());
+
+    return response.getTable();
+  }
+}
diff --git 
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
new file mode 100644
index 000000000..8f8c07fba
--- /dev/null
+++ 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import java.util.Map;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.polaris.service.types.GenericTable;
+import org.apache.polaris.spark.utils.PolarisCatalogUtils;
+import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableChange;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+/**
+ * A spark TableCatalog Implementation interacts with Polaris specific APIs 
only. The APIs it
+ * interacts with is generic table APIs, and all table operations performed in 
this class are
+ * expected to be for non-iceberg tables.
+ */
+public class PolarisSparkCatalog implements TableCatalog {
+
+  private PolarisCatalog polarisCatalog = null;
+  private String catalogName = null;
+
+  public PolarisSparkCatalog(PolarisCatalog polarisCatalog) {
+    this.polarisCatalog = polarisCatalog;
+  }
+
+  @Override
+  public void initialize(String name, CaseInsensitiveStringMap options) {
+    this.catalogName = name;
+  }
+
+  @Override
+  public String name() {
+    return catalogName;
+  }
+
+  @Override
+  public Table loadTable(Identifier identifier) throws NoSuchTableException {
+    try {
+      GenericTable genericTable =
+          
this.polarisCatalog.loadGenericTable(Spark3Util.identifierToTableIdentifier(identifier));
+      return PolarisCatalogUtils.loadSparkTable(genericTable);
+    } catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
+      throw new NoSuchTableException(identifier);
+    }
+  }
+
+  @Override
+  public Table createTable(
+      Identifier identifier,
+      StructType schema,
+      Transform[] transforms,
+      Map<String, String> properties)
+      throws TableAlreadyExistsException, NoSuchNamespaceException {
+    try {
+      String format = properties.get(PolarisCatalogUtils.TABLE_PROVIDER_KEY);
+      GenericTable genericTable =
+          this.polarisCatalog.createGenericTable(
+              Spark3Util.identifierToTableIdentifier(identifier), format, 
null, properties);
+      return PolarisCatalogUtils.loadSparkTable(genericTable);
+    } catch (AlreadyExistsException e) {
+      throw new TableAlreadyExistsException(identifier);
+    }
+  }
+
+  @Override
+  public Table alterTable(Identifier identifier, TableChange... changes)
+      throws NoSuchTableException {
+    throw new NoSuchTableException(identifier);
+  }
+
+  @Override
+  public boolean dropTable(Identifier identifier) {
+    return false;
+  }
+
+  @Override
+  public void renameTable(Identifier from, Identifier to)
+      throws NoSuchTableException, TableAlreadyExistsException {
+    throw new UnsupportedOperationException("renameTable operation is not 
supported");
+  }
+
+  @Override
+  public Identifier[] listTables(String[] namespace) {
+    throw new UnsupportedOperationException("listTables operation is not 
supported");
+  }
+}
diff --git 
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java
index e38bbe1ad..cf46d9a15 100644
--- 
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java
+++ 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java
@@ -18,10 +18,17 @@
  */
 package org.apache.polaris.spark;
 
-import com.google.common.collect.ImmutableSet;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
 import java.util.Map;
-import java.util.Set;
+import org.apache.arrow.util.VisibleForTesting;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.rest.auth.OAuth2Util;
 import org.apache.iceberg.spark.SupportsReplaceView;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.polaris.spark.utils.DeltaHelper;
+import org.apache.polaris.spark.utils.PolarisCatalogUtils;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -42,42 +49,114 @@ import org.apache.spark.sql.connector.catalog.ViewChange;
 import org.apache.spark.sql.connector.expressions.Transform;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * SparkCatalog Implementation that is able to interact with both Iceberg 
SparkCatalog and Polaris
+ * SparkCatalog. All namespaces and view related operations continue goes 
through the Iceberg
+ * SparkCatalog. For table operations, depends on the table format, the 
operation can be achieved
+ * with interaction with both Iceberg and Polaris SparkCatalog.
+ */
 public class SparkCatalog
     implements StagingTableCatalog,
         TableCatalog,
         SupportsNamespaces,
         ViewCatalog,
         SupportsReplaceView {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkCatalog.class);
 
-  private static final Set<String> DEFAULT_NS_KEYS = 
ImmutableSet.of(TableCatalog.PROP_OWNER);
-  private String catalogName = null;
-  private org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
-
-  // TODO: Add Polaris Specific REST Catalog
+  @VisibleForTesting protected String catalogName = null;
+  @VisibleForTesting protected org.apache.iceberg.spark.SparkCatalog 
icebergsSparkCatalog = null;
+  @VisibleForTesting protected PolarisSparkCatalog polarisSparkCatalog = null;
+  @VisibleForTesting protected DeltaHelper deltaHelper = null;
 
   @Override
   public String name() {
     return catalogName;
   }
 
+  /**
+   * Check whether invalid catalog configuration is provided, and return an 
option map with catalog
+   * type configured correctly. This function mainly validates two parts: 1) 
No customized catalog
+   * implementation is provided. 2) No non-rest catalog type is configured.
+   */
+  @VisibleForTesting
+  public CaseInsensitiveStringMap validateAndResolveCatalogOptions(
+      CaseInsensitiveStringMap options) {
+    Preconditions.checkArgument(
+        options.get(CatalogProperties.CATALOG_IMPL) == null,
+        "Customized catalog implementation is not supported and not needed, 
please remove the configuration!");
+
+    String catalogType =
+        PropertyUtil.propertyAsString(
+            options, CatalogUtil.ICEBERG_CATALOG_TYPE, 
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
+    Preconditions.checkArgument(
+        catalogType.equals(CatalogUtil.ICEBERG_CATALOG_TYPE_REST),
+        "Only rest catalog type is allowed, but got catalog type: "
+            + catalogType
+            + ". Either configure the type to rest or remove the config");
+
+    Map<String, String> resolvedOptions = Maps.newHashMap();
+    resolvedOptions.putAll(options);
+    // when no catalog type is configured, iceberg uses hive by default. Here, 
we make sure the
+    // type is set to rest since we only support rest catalog.
+    resolvedOptions.put(CatalogUtil.ICEBERG_CATALOG_TYPE, 
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
+
+    return new CaseInsensitiveStringMap(resolvedOptions);
+  }
+
+  /**
+   * Initialize REST Catalog for Iceberg and Polaris, this is the only catalog 
type supported by
+   * Polaris at this moment.
+   */
+  private void initRESTCatalog(String name, CaseInsensitiveStringMap options) {
+    CaseInsensitiveStringMap resolvedOptions = 
validateAndResolveCatalogOptions(options);
+
+    // initialize the icebergSparkCatalog
+    this.icebergsSparkCatalog = new org.apache.iceberg.spark.SparkCatalog();
+    this.icebergsSparkCatalog.initialize(name, resolvedOptions);
+
+    // initialize the polaris spark catalog
+    OAuth2Util.AuthSession catalogAuth =
+        PolarisCatalogUtils.getAuthSession(this.icebergsSparkCatalog);
+    PolarisRESTCatalog restCatalog = new PolarisRESTCatalog();
+    restCatalog.initialize(options, catalogAuth);
+    this.polarisSparkCatalog = new PolarisSparkCatalog(restCatalog);
+    this.polarisSparkCatalog.initialize(name, resolvedOptions);
+  }
+
   @Override
   public void initialize(String name, CaseInsensitiveStringMap options) {
     this.catalogName = name;
-    this.icebergsSparkCatalog = new org.apache.iceberg.spark.SparkCatalog();
-    this.icebergsSparkCatalog.initialize(name, options);
+    initRESTCatalog(name, options);
+    this.deltaHelper = new DeltaHelper(options);
   }
 
   @Override
   public Table loadTable(Identifier ident) throws NoSuchTableException {
-    throw new UnsupportedOperationException("loadTable");
+    try {
+      return this.icebergsSparkCatalog.loadTable(ident);
+    } catch (NoSuchTableException e) {
+      return this.polarisSparkCatalog.loadTable(ident);
+    }
   }
 
   @Override
   public Table createTable(
       Identifier ident, StructType schema, Transform[] transforms, Map<String, 
String> properties)
-      throws TableAlreadyExistsException {
-    throw new UnsupportedOperationException("createTable");
+      throws TableAlreadyExistsException, NoSuchNamespaceException {
+    String provider = properties.get(PolarisCatalogUtils.TABLE_PROVIDER_KEY);
+    if (PolarisCatalogUtils.useIceberg(provider)) {
+      return this.icebergsSparkCatalog.createTable(ident, schema, transforms, 
properties);
+    } else if (PolarisCatalogUtils.useDelta(provider)) {
+      // For delta table, we load the delta catalog to help dealing with the
+      // delta log creation.
+      TableCatalog deltaCatalog = 
deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
+      return deltaCatalog.createTable(ident, schema, transforms, properties);
+    } else {
+      return this.polarisSparkCatalog.createTable(ident, schema, transforms, 
properties);
+    }
   }
 
   @Override
diff --git 
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java
 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java
new file mode 100644
index 000000000..4ec348a80
--- /dev/null
+++ 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.Map;
+import org.apache.iceberg.rest.RESTRequest;
+import org.apache.polaris.service.types.CreateGenericTableRequest;
+
+/**
+ * RESTRequest definition for CreateGenericTable which extends the iceberg 
RESTRequest. This is
+ * currently required because the Iceberg HTTPClient requires the request and 
response to be a class
+ * of RESTRequest and RESTResponse.
+ */
+public class CreateGenericTableRESTRequest extends CreateGenericTableRequest
+    implements RESTRequest {
+
+  @JsonCreator
+  public CreateGenericTableRESTRequest(
+      @JsonProperty(value = "name", required = true) String name,
+      @JsonProperty(value = "format", required = true) String format,
+      @JsonProperty(value = "doc") String doc,
+      @JsonProperty(value = "properties") Map<String, String> properties) {
+    super(name, format, doc, properties);
+  }
+
+  @Override
+  public void validate() {}
+}
diff --git 
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java
 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java
new file mode 100644
index 000000000..68c738dae
--- /dev/null
+++ 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.rest;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.iceberg.rest.RESTResponse;
+import org.apache.polaris.service.types.GenericTable;
+import org.apache.polaris.service.types.LoadGenericTableResponse;
+
+/**
+ * RESTResponse definition for LoadGenericTable which extends the iceberg 
RESTResponse. This is
+ * currently required because the Iceberg HTTPClient requires the request and 
response to be a class
+ * of RESTRequest and RESTResponse.
+ */
+public class LoadGenericTableRESTResponse extends LoadGenericTableResponse 
implements RESTResponse {
+
+  @JsonCreator
+  public LoadGenericTableRESTResponse(
+      @JsonProperty(value = "table", required = true) GenericTable table) {
+    super(table);
+  }
+
+  @Override
+  public void validate() {}
+}
diff --git 
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java
 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java
new file mode 100644
index 000000000..297438424
--- /dev/null
+++ 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.utils;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import org.apache.iceberg.common.DynConstructors;
+import org.apache.polaris.spark.PolarisSparkCatalog;
+import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DeltaHelper {
+  private static final Logger LOG = LoggerFactory.getLogger(DeltaHelper.class);
+
+  public static final String DELTA_CATALOG_IMPL_KEY = "delta-catalog-impl";
+  private static final String DEFAULT_DELTA_CATALOG_CLASS =
+      "org.apache.spark.sql.delta.catalog.DeltaCatalog";
+
+  private TableCatalog deltaCatalog = null;
+  private String deltaCatalogImpl = DEFAULT_DELTA_CATALOG_CLASS;
+
+  public DeltaHelper(CaseInsensitiveStringMap options) {
+    if (options.get(DELTA_CATALOG_IMPL_KEY) != null) {
+      this.deltaCatalogImpl = options.get(DELTA_CATALOG_IMPL_KEY);
+    }
+  }
+
+  public TableCatalog loadDeltaCatalog(PolarisSparkCatalog 
polarisSparkCatalog) {
+    if (this.deltaCatalog != null) {
+      return this.deltaCatalog;
+    }
+
+    DynConstructors.Ctor<TableCatalog> ctor;
+    try {
+      ctor = 
DynConstructors.builder(TableCatalog.class).impl(deltaCatalogImpl).buildChecked();
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException(
+          String.format("Cannot initialize Delta Catalog %s: %s", 
deltaCatalogImpl, e.getMessage()),
+          e);
+    }
+
+    try {
+      this.deltaCatalog = ctor.newInstance();
+    } catch (ClassCastException e) {
+      throw new IllegalArgumentException(
+          String.format(
+              "Cannot initialize Delta Catalog, %s does not implement Table 
Catalog.",
+              deltaCatalogImpl),
+          e);
+    }
+
+    // set the polaris spark catalog as the delegate catalog of delta catalog
+    ((DelegatingCatalogExtension) 
this.deltaCatalog).setDelegateCatalog(polarisSparkCatalog);
+
+    // We want to behave exactly the same as unity catalog for Delta. However, 
DeltaCatalog
+    // implementation today is hard coded for unity catalog. Following issue 
is used to track
+    // the extension of the usage 
https://github.com/delta-io/delta/issues/4306.
+    // Here, we use reflection to set the isUnityCatalog to true for exactly 
same behavior as
+    // unity catalog for now.
+    try {
+      // isUnityCatalog is a lazy val, access the compute method for the lazy 
val
+      // make sure the method is triggered before the value is set, otherwise, 
the
+      // value will be overwritten later when the method is triggered.
+      String methodGetName = "isUnityCatalog" + "$lzycompute";
+      Method method = 
this.deltaCatalog.getClass().getDeclaredMethod(methodGetName);
+      method.setAccessible(true);
+      // invoke the lazy methods before it is set
+      method.invoke(this.deltaCatalog);
+    } catch (NoSuchMethodException e) {
+      LOG.warn("No lazy compute method found for variable isUnityCatalog");
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to invoke the lazy compute methods 
for isUnityCatalog", e);
+    }
+
+    try {
+      Field field = 
this.deltaCatalog.getClass().getDeclaredField("isUnityCatalog");
+      field.setAccessible(true);
+      field.set(this.deltaCatalog, true);
+    } catch (NoSuchFieldException e) {
+      throw new RuntimeException(
+          "Failed find the isUnityCatalog field, delta-spark version >= 3.2.1 
is required", e);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException("Failed to set the isUnityCatalog field", e);
+    }
+
+    return this.deltaCatalog;
+  }
+}
diff --git 
a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java
 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java
new file mode 100644
index 000000000..01a4af45d
--- /dev/null
+++ 
b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.utils;
+
+import com.google.common.collect.Maps;
+import java.lang.reflect.Field;
+import java.util.Map;
+import org.apache.iceberg.CachingCatalog;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.rest.RESTSessionCatalog;
+import org.apache.iceberg.rest.auth.OAuth2Util;
+import org.apache.iceberg.spark.SparkCatalog;
+import org.apache.polaris.service.types.GenericTable;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.execution.datasources.DataSource;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
+
+public class PolarisCatalogUtils {
+  public static final String TABLE_PROVIDER_KEY = "provider";
+  public static final String TABLE_PATH_KEY = "path";
+
+  /** Check whether the table provider is iceberg. */
+  public static boolean useIceberg(String provider) {
+    return provider == null || "iceberg".equalsIgnoreCase(provider);
+  }
+
+  /** Check whether the table provider is delta. */
+  public static boolean useDelta(String provider) {
+    return "delta".equalsIgnoreCase(provider);
+  }
+
+  /**
+   * Load spark table using DataSourceV2.
+   *
+   * @return V2Table if DataSourceV2 is available for the table format. For 
delta table, it returns
+   *     DeltaTableV2.
+   */
+  public static Table loadSparkTable(GenericTable genericTable) {
+    SparkSession sparkSession = SparkSession.active();
+    TableProvider provider =
+        DataSource.lookupDataSourceV2(genericTable.getFormat(), 
sparkSession.sessionState().conf())
+            .get();
+    Map<String, String> properties = genericTable.getProperties();
+    boolean hasLocationClause = properties.get(TableCatalog.PROP_LOCATION) != 
null;
+    boolean hasPathClause = properties.get(TABLE_PATH_KEY) != null;
+    Map<String, String> tableProperties = Maps.newHashMap();
+    tableProperties.putAll(properties);
+    if (!hasPathClause && hasLocationClause) {
+      // DataSourceV2 requires the path property on table loading. However, 
spark today
+      // doesn't create the corresponding path property if the path keyword is 
not
+      // provided by user when location is provided. Here, we duplicate the 
location
+      // property as path to make sure the table can be loaded.
+      tableProperties.put(TABLE_PATH_KEY, 
properties.get(TableCatalog.PROP_LOCATION));
+    }
+    return DataSourceV2Utils.getTableFromProvider(
+        provider, new CaseInsensitiveStringMap(tableProperties), 
scala.Option.empty());
+  }
+
+  /**
+   * Get the catalogAuth field inside the RESTSessionCatalog used by Iceberg 
Spark Catalog use
+   * reflection. TODO: Deprecate this function once the iceberg client is 
updated to 1.9.0 to use
+   * AuthManager and the capability of injecting an AuthManger is available. 
Related iceberg PR:
+   * https://github.com/apache/iceberg/pull/12655
+   */
+  public static OAuth2Util.AuthSession getAuthSession(SparkCatalog 
sparkCatalog) {
+    try {
+      Field icebergCatalogField = 
sparkCatalog.getClass().getDeclaredField("icebergCatalog");
+      icebergCatalogField.setAccessible(true);
+      Catalog icebergCatalog = (Catalog) icebergCatalogField.get(sparkCatalog);
+      RESTCatalog icebergRestCatalog;
+      if (icebergCatalog instanceof CachingCatalog) {
+        Field catalogField = 
icebergCatalog.getClass().getDeclaredField("catalog");
+        catalogField.setAccessible(true);
+        icebergRestCatalog = (RESTCatalog) catalogField.get(icebergCatalog);
+      } else {
+        icebergRestCatalog = (RESTCatalog) icebergCatalog;
+      }
+
+      Field sessionCatalogField = 
icebergRestCatalog.getClass().getDeclaredField("sessionCatalog");
+      sessionCatalogField.setAccessible(true);
+      RESTSessionCatalog sessionCatalog =
+          (RESTSessionCatalog) sessionCatalogField.get(icebergRestCatalog);
+
+      Field authField = 
sessionCatalog.getClass().getDeclaredField("catalogAuth");
+      authField.setAccessible(true);
+      return (OAuth2Util.AuthSession) authField.get(sessionCatalog);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to get the catalogAuth from the 
Iceberg SparkCatalog", e);
+    }
+  }
+}
diff --git 
a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java
 
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java
new file mode 100644
index 000000000..c11e8de3b
--- /dev/null
+++ 
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension;
+
+/**
+ * This is a fake delta catalog class that is used for testing. This class is 
a noop class that
+ * directly passes all calls to the delegate CatalogPlugin configured as part 
of
+ * DelegatingCatalogExtension.
+ */
+public class NoopDeltaCatalog extends DelegatingCatalogExtension {
+  // This is a mock of isUnityCatalog scala val in
+  // org.apache.spark.sql.delta.catalog.DeltaCatalog.
+  private boolean isUnityCatalog = false;
+}
diff --git 
a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java
 
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java
new file mode 100644
index 000000000..5c3d59710
--- /dev/null
+++ 
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark;
+
+import com.google.common.collect.Maps;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.inmemory.InMemoryCatalog;
+import org.apache.polaris.service.types.GenericTable;
+
+/** InMemory implementation for the Polaris Catalog. This class is mainly used 
by testing. */
+public class PolarisInMemoryCatalog extends InMemoryCatalog implements 
PolarisCatalog {
+  private final ConcurrentMap<TableIdentifier, GenericTable> genericTables;
+
+  public PolarisInMemoryCatalog() {
+    this.genericTables = Maps.newConcurrentMap();
+  }
+
+  @Override
+  public List<TableIdentifier> listGenericTables(Namespace ns) {
+    return this.genericTables.keySet().stream()
+        .filter(t -> t.namespace().equals(ns))
+        .sorted(Comparator.comparing(TableIdentifier::toString))
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public GenericTable loadGenericTable(TableIdentifier identifier) {
+    GenericTable table = this.genericTables.get(identifier);
+    if (table == null) {
+      throw new NoSuchTableException("Generic table does not exist: %s", 
identifier);
+    }
+
+    return table;
+  }
+
+  @Override
+  public boolean dropGenericTable(TableIdentifier identifier) {
+    return null != this.genericTables.remove(identifier);
+  }
+
+  @Override
+  public GenericTable createGenericTable(
+      TableIdentifier identifier, String format, String doc, Map<String, 
String> props) {
+    if (!namespaceExists(identifier.namespace())) {
+      throw new NoSuchNamespaceException(
+          "Cannot create generic table %s. Namespace does not exist: %s",
+          identifier, identifier.namespace());
+    }
+
+    GenericTable previous =
+        this.genericTables.putIfAbsent(
+            identifier,
+            GenericTable.builder()
+                .setName(identifier.name())
+                .setFormat(format)
+                .setProperties(props)
+                .build());
+
+    if (previous != null) {
+      throw new AlreadyExistsException("Generic table already exists: %s", 
identifier);
+    }
+
+    return this.genericTables.get(identifier);
+  }
+}
diff --git 
a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
 
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
index 70e9b00c5..0d142cbcb 100644
--- 
a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
+++ 
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java
@@ -19,7 +19,6 @@
 package org.apache.polaris.spark;
 
 import static org.apache.iceberg.CatalogProperties.CATALOG_IMPL;
-import static org.apache.iceberg.types.Types.NestedField.required;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -28,32 +27,85 @@ import java.util.Arrays;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.Schema;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.spark.SparkUtil;
-import org.apache.iceberg.types.Types;
+import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.polaris.spark.utils.DeltaHelper;
+import org.apache.polaris.spark.utils.PolarisCatalogUtils;
 import org.apache.spark.SparkContext;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
 import org.apache.spark.sql.catalyst.analysis.NoSuchViewException;
-import org.apache.spark.sql.connector.catalog.*;
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.NamespaceChange;
+import org.apache.spark.sql.connector.catalog.Table;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableProvider;
+import org.apache.spark.sql.connector.catalog.V1Table;
+import org.apache.spark.sql.connector.catalog.View;
+import org.apache.spark.sql.connector.catalog.ViewChange;
 import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.execution.datasources.DataSource;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.internal.SessionState;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.MockedStatic;
 import org.mockito.Mockito;
+import scala.Option;
 
 public class SparkCatalogTest {
-  private SparkCatalog catalog;
+  private static class InMemoryIcebergSparkCatalog extends 
org.apache.iceberg.spark.SparkCatalog {
+    private PolarisInMemoryCatalog inMemoryCatalog = null;
+
+    @Override
+    protected Catalog buildIcebergCatalog(String name, 
CaseInsensitiveStringMap options) {
+      PolarisInMemoryCatalog inMemoryCatalog = new PolarisInMemoryCatalog();
+      inMemoryCatalog.initialize(name, options);
+
+      this.inMemoryCatalog = inMemoryCatalog;
+
+      return inMemoryCatalog;
+    }
+
+    public PolarisInMemoryCatalog getInMemoryCatalog() {
+      return this.inMemoryCatalog;
+    }
+  }
+
+  /**
+   * And SparkCatalog implementation that uses InMemory catalog implementation 
for both Iceberg and
+   * Polaris
+   */
+  private static class InMemorySparkCatalog extends SparkCatalog {
+    @Override
+    public void initialize(String name, CaseInsensitiveStringMap options) {
+      this.catalogName = name;
+      // initialize the InMemory icebergSparkCatalog
+      this.icebergsSparkCatalog = new InMemoryIcebergSparkCatalog();
+      this.icebergsSparkCatalog.initialize(name, options);
+
+      // initialize the polarisSparkCatalog with PolarisSparkCatalog
+      this.polarisSparkCatalog =
+          new PolarisSparkCatalog(
+              ((InMemoryIcebergSparkCatalog) 
this.icebergsSparkCatalog).getInMemoryCatalog());
+      this.polarisSparkCatalog.initialize(name, options);
+
+      this.deltaHelper = new DeltaHelper(options);
+    }
+  }
+
+  private InMemorySparkCatalog catalog;
   private String catalogName;
 
   private static final String[] defaultNS = new String[] {"ns"};
-  private static final Schema defaultSchema =
-      new Schema(
-          5,
-          required(3, "id", Types.IntegerType.get(), "unique ID"),
-          required(4, "data", Types.StringType.get()));
 
   @BeforeEach
   public void setup() throws Exception {
@@ -61,8 +113,9 @@ public class SparkCatalogTest {
     Map<String, String> catalogConfig = Maps.newHashMap();
     catalogConfig.put(CATALOG_IMPL, 
"org.apache.iceberg.inmemory.InMemoryCatalog");
     catalogConfig.put("cache-enabled", "false");
-
-    catalog = new SparkCatalog();
+    catalogConfig.put(
+        DeltaHelper.DELTA_CATALOG_IMPL_KEY, 
"org.apache.polaris.spark.NoopDeltaCatalog");
+    catalog = new InMemorySparkCatalog();
     Configuration conf = new Configuration();
     try (MockedStatic<SparkSession> mockedStaticSparkSession =
             Mockito.mockStatic(SparkSession.class);
@@ -83,6 +136,34 @@ public class SparkCatalogTest {
     catalog.createNamespace(defaultNS, Maps.newHashMap());
   }
 
+  @Test
+  void testCatalogValidation() {
+    Map<String, String> catalogConfigWithImpl = Maps.newHashMap();
+    catalogConfigWithImpl.put(CATALOG_IMPL, 
"org.apache.iceberg.inmemory.InMemoryCatalog");
+    catalogConfigWithImpl.put("cache-enabled", "false");
+    SparkCatalog testCatalog = new SparkCatalog();
+    assertThatThrownBy(
+            () ->
+                testCatalog.validateAndResolveCatalogOptions(
+                    new CaseInsensitiveStringMap(catalogConfigWithImpl)))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Customized catalog implementation is not 
supported and not needed");
+
+    Map<String, String> catalogConfigInvalidType = Maps.newHashMap();
+    catalogConfigInvalidType.put(CatalogUtil.ICEBERG_CATALOG_TYPE, "hive");
+    assertThatThrownBy(
+            () ->
+                testCatalog.validateAndResolveCatalogOptions(
+                    new CaseInsensitiveStringMap(catalogConfigInvalidType)))
+        .isInstanceOf(IllegalArgumentException.class);
+
+    CaseInsensitiveStringMap resolvedMap =
+        testCatalog.validateAndResolveCatalogOptions(
+            new CaseInsensitiveStringMap(Maps.newHashMap()));
+    assertThat(resolvedMap.get(CatalogUtil.ICEBERG_CATALOG_TYPE))
+        .isEqualTo(CatalogUtil.ICEBERG_CATALOG_TYPE_REST);
+  }
+
   @Test
   void testCreateAndLoadNamespace() throws Exception {
     String[] namespace = new String[] {"ns1"};
@@ -286,17 +367,87 @@ public class SparkCatalogTest {
     }
   }
 
+  @Test
+  void testCreateAndLoadIcebergTable() throws Exception {
+    Identifier identifier = Identifier.of(defaultNS, "iceberg-table");
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "iceberg");
+    properties.put(TableCatalog.PROP_LOCATION, "file:///tmp/path/to/table/");
+    StructType schema = new StructType().add("boolType", "boolean");
+
+    Table createdTable = catalog.createTable(identifier, schema, new 
Transform[0], properties);
+    assertThat(createdTable).isInstanceOf(SparkTable.class);
+
+    // load the table
+    Table table = catalog.loadTable(identifier);
+    // verify iceberg SparkTable is loaded
+    assertThat(table).isInstanceOf(SparkTable.class);
+
+    // verify create table with the same identifier fails with spark 
TableAlreadyExistsException
+    StructType newSchema = new StructType().add("LongType", "Long");
+    Map<String, String> newProperties = Maps.newHashMap();
+    newProperties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "iceberg");
+    newProperties.put(TableCatalog.PROP_LOCATION, 
"file:///tmp/path/to/table/");
+    assertThatThrownBy(
+            () -> catalog.createTable(identifier, newSchema, new Transform[0], 
newProperties))
+        .isInstanceOf(TableAlreadyExistsException.class);
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {"delta", "csv"})
+  void testCreateAndLoadGenericTable(String format) throws Exception {
+    Identifier identifier = Identifier.of(defaultNS, "generic-test-table");
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, format);
+    properties.put(TableCatalog.PROP_LOCATION, 
"file:///tmp/delta/path/to/table/");
+    StructType schema = new StructType().add("boolType", "boolean");
+
+    SQLConf conf = new SQLConf();
+    try (MockedStatic<SparkSession> mockedStaticSparkSession =
+            Mockito.mockStatic(SparkSession.class);
+        MockedStatic<DataSource> mockedStaticDS = 
Mockito.mockStatic(DataSource.class);
+        MockedStatic<DataSourceV2Utils> mockedStaticDSV2 =
+            Mockito.mockStatic(DataSourceV2Utils.class)) {
+      SparkSession mockedSession = Mockito.mock(SparkSession.class);
+      
mockedStaticSparkSession.when(SparkSession::active).thenReturn(mockedSession);
+      SessionState mockedState = Mockito.mock(SessionState.class);
+      Mockito.when(mockedSession.sessionState()).thenReturn(mockedState);
+      Mockito.when(mockedState.conf()).thenReturn(conf);
+
+      TableProvider provider = Mockito.mock(TableProvider.class);
+      mockedStaticDS
+          .when(() -> DataSource.lookupDataSourceV2(Mockito.eq(format), 
Mockito.any()))
+          .thenReturn(Option.apply(provider));
+      V1Table table = Mockito.mock(V1Table.class);
+      mockedStaticDSV2
+          .when(
+              () ->
+                  DataSourceV2Utils.getTableFromProvider(
+                      Mockito.eq(provider), Mockito.any(), Mockito.any()))
+          .thenReturn(table);
+      Table createdTable = catalog.createTable(identifier, schema, new 
Transform[0], properties);
+      assertThat(createdTable).isInstanceOf(V1Table.class);
+
+      // load the table
+      Table loadedTable = catalog.loadTable(identifier);
+      assertThat(loadedTable).isInstanceOf(V1Table.class);
+    }
+
+    StructType newSchema = new StructType().add("LongType", "Long");
+    Map<String, String> newProperties = Maps.newHashMap();
+    newProperties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "parquet");
+    newProperties.put(TableCatalog.PROP_LOCATION, 
"file:///tmp/path/to/table/");
+    assertThatThrownBy(
+            () -> catalog.createTable(identifier, newSchema, new Transform[0], 
newProperties))
+        .isInstanceOf(TableAlreadyExistsException.class);
+  }
+
   @Test
   public void testUnsupportedOperations() {
     String[] namespace = new String[] {"ns1"};
     Identifier identifier = Identifier.of(namespace, "table1");
     Identifier new_identifier = Identifier.of(namespace, "table2");
     // table methods
-    assertThatThrownBy(() -> catalog.loadTable(identifier))
-        .isInstanceOf(UnsupportedOperationException.class);
-    assertThatThrownBy(
-            () -> catalog.createTable(identifier, 
Mockito.mock(StructType.class), null, null))
-        .isInstanceOf(UnsupportedOperationException.class);
     assertThatThrownBy(() -> catalog.alterTable(identifier))
         .isInstanceOf(UnsupportedOperationException.class);
     assertThatThrownBy(() -> catalog.dropTable(identifier))
diff --git 
a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
 
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
new file mode 100644
index 000000000..542fd05d8
--- /dev/null
+++ 
b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.spark.rest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.polaris.service.types.GenericTable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+public class DeserializationTest {
+  private ObjectMapper mapper;
+
+  @BeforeEach
+  public void setUp() {
+    mapper = new ObjectMapper();
+  }
+
+  @ParameterizedTest
+  @MethodSource("genericTableTestCases")
+  public void testLoadGenericTableRESTResponse(String doc, Map<String, String> 
properties)
+      throws JsonProcessingException {
+    GenericTable table =
+        GenericTable.builder()
+            .setFormat("delta")
+            .setName("test-table")
+            .setProperties(properties)
+            .setDoc(doc)
+            .build();
+    LoadGenericTableRESTResponse response = new 
LoadGenericTableRESTResponse(table);
+    String json = mapper.writeValueAsString(response);
+    LoadGenericTableRESTResponse deserializedResponse =
+        mapper.readValue(json, LoadGenericTableRESTResponse.class);
+    assertThat(deserializedResponse.getTable().getFormat()).isEqualTo("delta");
+    
assertThat(deserializedResponse.getTable().getName()).isEqualTo("test-table");
+    assertThat(deserializedResponse.getTable().getDoc()).isEqualTo(doc);
+    
assertThat(deserializedResponse.getTable().getProperties().size()).isEqualTo(properties.size());
+  }
+
+  @ParameterizedTest
+  @MethodSource("genericTableTestCases")
+  public void testCreateGenericTableRESTRequest(String doc, Map<String, 
String> properties)
+      throws JsonProcessingException {
+    CreateGenericTableRESTRequest request =
+        new CreateGenericTableRESTRequest("test-table", "delta", doc, 
properties);
+    String json = mapper.writeValueAsString(request);
+    CreateGenericTableRESTRequest deserializedRequest =
+        mapper.readValue(json, CreateGenericTableRESTRequest.class);
+    assertThat(deserializedRequest.getName()).isEqualTo("test-table");
+    assertThat(deserializedRequest.getFormat()).isEqualTo("delta");
+    assertThat(deserializedRequest.getDoc()).isEqualTo(doc);
+    
assertThat(deserializedRequest.getProperties().size()).isEqualTo(properties.size());
+  }
+
+  private static Stream<Arguments> genericTableTestCases() {
+    var doc = "table for testing";
+    var properties = Maps.newHashMap();
+    properties.put("location", "s3://path/to/table/");
+    return Stream.of(
+        Arguments.of(doc, properties),
+        Arguments.of(null, Maps.newHashMap()),
+        Arguments.of(doc, Maps.newHashMap()),
+        Arguments.of(null, properties));
+  }
+}

Reply via email to