This is an automated email from the ASF dual-hosted git repository. yufei pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push: new 97a49e0ec Spark: Add CreateTable and LoadTable implementation for SparkCatalog (#1303) 97a49e0ec is described below commit 97a49e0ec3910e61d1d55d3a06fc01c3c5724bf7 Author: gh-yzou <167037035+gh-y...@users.noreply.github.com> AuthorDate: Fri Apr 11 10:56:37 2025 -0700 Spark: Add CreateTable and LoadTable implementation for SparkCatalog (#1303) --- plugins/pluginlibs.versions.toml | 3 + plugins/spark/v3.5/build.gradle.kts | 73 +++++++- .../org/apache/polaris/spark/PolarisCatalog.java | 36 ++++ .../apache/polaris/spark/PolarisRESTCatalog.java | 184 ++++++++++++++++++++ .../apache/polaris/spark/PolarisSparkCatalog.java | 111 +++++++++++++ .../org/apache/polaris/spark/SparkCatalog.java | 103 ++++++++++-- .../spark/rest/CreateGenericTableRESTRequest.java | 46 +++++ .../spark/rest/LoadGenericTableRESTResponse.java | 42 +++++ .../apache/polaris/spark/utils/DeltaHelper.java | 107 ++++++++++++ .../polaris/spark/utils/PolarisCatalogUtils.java | 112 +++++++++++++ .../org/apache/polaris/spark/NoopDeltaCatalog.java | 32 ++++ .../polaris/spark/PolarisInMemoryCatalog.java | 90 ++++++++++ .../org/apache/polaris/spark/SparkCatalogTest.java | 185 +++++++++++++++++++-- .../polaris/spark/rest/DeserializationTest.java | 88 ++++++++++ 14 files changed, 1175 insertions(+), 37 deletions(-) diff --git a/plugins/pluginlibs.versions.toml b/plugins/pluginlibs.versions.toml index 0a4a515e5..e48f6ef45 100644 --- a/plugins/pluginlibs.versions.toml +++ b/plugins/pluginlibs.versions.toml @@ -20,3 +20,6 @@ [versions] iceberg = "1.8.1" spark35 = "3.5.5" +scala212 = "2.12.19" +scala213 = "2.13.15" + diff --git a/plugins/spark/v3.5/build.gradle.kts b/plugins/spark/v3.5/build.gradle.kts index 36ca6d528..df37fa229 100644 --- a/plugins/spark/v3.5/build.gradle.kts +++ b/plugins/spark/v3.5/build.gradle.kts @@ -41,18 +41,34 @@ val scalaVersion = getAndUseScalaVersionForProject() val icebergVersion = pluginlibs.versions.iceberg.get() val spark35Version = pluginlibs.versions.spark35.get() +val scalaLibraryVersion = + if (scalaVersion == "2.12") { + pluginlibs.versions.scala212.get() + } else { + pluginlibs.versions.scala213.get() + } + dependencies { implementation(project(":polaris-api-iceberg-service")) { - // exclude the iceberg and jackson dependencies, use the - // dependencies packed in the iceberg-spark dependency + // exclude the iceberg dependencies, use the ones pulled + // by iceberg-core exclude("org.apache.iceberg", "*") - exclude("com.fasterxml.jackson.core", "*") } + implementation(project(":polaris-api-catalog-service")) + implementation(project(":polaris-core")) { exclude("org.apache.iceberg", "*") } + + implementation("org.apache.iceberg:iceberg-core:${icebergVersion}") implementation( "org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}" - ) + ) { + // exclude the iceberg rest dependencies, use the ones pulled + // with iceberg-core dependency + exclude("org.apache.iceberg", "iceberg-core") + } + compileOnly("org.scala-lang:scala-library:${scalaLibraryVersion}") + compileOnly("org.scala-lang:scala-reflect:${scalaLibraryVersion}") compileOnly("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") { // exclude log4j dependencies exclude("org.apache.logging.log4j", "log4j-slf4j2-impl") @@ -78,24 +94,65 @@ dependencies { } } +// TODO: replace the check using gradlew checkstyle plugin +tasks.register("checkNoDisallowedImports") { + doLast { + // List of disallowed imports. Right now, we disallow usage of shaded or + // relocated libraries in the iceberg spark runtime jar. + val disallowedImports = + listOf("import org.apache.iceberg.shaded.", "org.apache.iceberg.relocated.") + + // Directory to scan for Java files + val sourceDirs = listOf(file("src/main/java"), file("src/test/java")) + + val violations = mutableListOf<String>() + // Scan Java files in each directory + sourceDirs.forEach { sourceDir -> + fileTree(sourceDir) + .matching { + include("**/*.java") // Only include Java files + } + .forEach { file -> + val content = file.readText() + disallowedImports.forEach { importStatement -> + if (content.contains(importStatement)) { + violations.add( + "Disallowed import found in ${file.relativeTo(projectDir)}: $importStatement" + ) + } + } + } + } + + if (violations.isNotEmpty()) { + throw GradleException("Disallowed imports found! $violations") + } + } +} + +tasks.named("check") { dependsOn("checkNoDisallowedImports") } + tasks.register<ShadowJar>("createPolarisSparkJar") { archiveClassifier = null archiveBaseName = "polaris-iceberg-${icebergVersion}-spark-runtime-${sparkMajorVersion}_${scalaVersion}" isZip64 = true - dependencies { exclude("META-INF/**") } + mergeServiceFiles() // pack both the source code and dependencies from(sourceSets.main.get().output) configurations = listOf(project.configurations.runtimeClasspath.get()) - mergeServiceFiles() - // Optimization: Minimize the JAR (remove unused classes from dependencies) // The iceberg-spark-runtime plugin is always packaged along with our polaris-spark plugin, // therefore excluded from the optimization. - minimize { exclude(dependency("org.apache.iceberg:iceberg-spark-runtime-*.*")) } + minimize { + exclude(dependency("org.apache.iceberg:iceberg-spark-runtime-*.*")) + exclude(dependency("org.apache.iceberg:iceberg-core*.*")) + } + + relocate("com.fasterxml", "org.apache.polaris.shaded.com.fasterxml.jackson") } tasks.withType(Jar::class).named("sourcesJar") { dependsOn("createPolarisSparkJar") } diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisCatalog.java b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisCatalog.java new file mode 100644 index 000000000..31a6ac189 --- /dev/null +++ b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisCatalog.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.polaris.service.types.GenericTable; + +public interface PolarisCatalog { + List<TableIdentifier> listGenericTables(Namespace ns); + + GenericTable loadGenericTable(TableIdentifier identifier); + + boolean dropGenericTable(TableIdentifier identifier); + + GenericTable createGenericTable( + TableIdentifier identifier, String format, String doc, Map<String, String> props); +} diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java new file mode 100644 index 000000000..0b8743132 --- /dev/null +++ b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.CloseableGroup; +import org.apache.iceberg.rest.Endpoint; +import org.apache.iceberg.rest.ErrorHandlers; +import org.apache.iceberg.rest.HTTPClient; +import org.apache.iceberg.rest.RESTClient; +import org.apache.iceberg.rest.ResourcePaths; +import org.apache.iceberg.rest.auth.OAuth2Util; +import org.apache.iceberg.rest.responses.ConfigResponse; +import org.apache.iceberg.util.EnvironmentUtil; +import org.apache.polaris.core.rest.PolarisEndpoints; +import org.apache.polaris.core.rest.PolarisResourcePaths; +import org.apache.polaris.service.types.GenericTable; +import org.apache.polaris.spark.rest.CreateGenericTableRESTRequest; +import org.apache.polaris.spark.rest.LoadGenericTableRESTResponse; + +/** + * [[PolarisRESTCatalog]] talks to Polaris REST APIs, and implements the PolarisCatalog interfaces, + * which are generic table related APIs at this moment. This class doesn't interact with any Spark + * objects. + */ +public class PolarisRESTCatalog implements PolarisCatalog, Closeable { + private final Function<Map<String, String>, RESTClient> clientBuilder; + + private RESTClient restClient = null; + private CloseableGroup closeables = null; + private Set<Endpoint> endpoints; + private OAuth2Util.AuthSession catalogAuth = null; + private PolarisResourcePaths pathGenerator = null; + + // the default endpoints to config if server doesn't specify the 'endpoints' configuration. + private static final Set<Endpoint> DEFAULT_ENDPOINTS = PolarisEndpoints.GENERIC_TABLE_ENDPOINTS; + + public PolarisRESTCatalog() { + this(config -> HTTPClient.builder(config).uri(config.get(CatalogProperties.URI)).build()); + } + + public PolarisRESTCatalog(Function<Map<String, String>, RESTClient> clientBuilder) { + this.clientBuilder = clientBuilder; + } + + public void initialize(Map<String, String> unresolved, OAuth2Util.AuthSession catalogAuth) { + Preconditions.checkArgument(unresolved != null, "Invalid configuration: null"); + + // Resolve any configuration that is supplied by environment variables. + // For example: if we have an entity ("key", "env:envVar") in the unresolved, + // and envVar is configured to envValue in system env. After resolve, we got + // entity ("key", "envValue"). + Map<String, String> props = EnvironmentUtil.resolveAll(unresolved); + + // TODO: switch to use authManager once iceberg dependency is updated to 1.9.0 + this.catalogAuth = catalogAuth; + + ConfigResponse config; + try (RESTClient initClient = clientBuilder.apply(props).withAuthSession(catalogAuth)) { + config = fetchConfig(initClient, catalogAuth.headers(), props); + } catch (IOException e) { + throw new UncheckedIOException("Failed to close HTTP client", e); + } + + // call getConfig to get the server configurations + Map<String, String> mergedProps = config.merge(props); + if (config.endpoints().isEmpty()) { + this.endpoints = DEFAULT_ENDPOINTS; + } else { + this.endpoints = ImmutableSet.copyOf(config.endpoints()); + } + + this.pathGenerator = PolarisResourcePaths.forCatalogProperties(mergedProps); + this.restClient = clientBuilder.apply(mergedProps).withAuthSession(catalogAuth); + + this.closeables = new CloseableGroup(); + this.closeables.addCloseable(this.restClient); + this.closeables.setSuppressCloseFailure(true); + } + + protected static ConfigResponse fetchConfig( + RESTClient client, Map<String, String> headers, Map<String, String> properties) { + // send the client's warehouse location to the service to keep in sync + // this is needed for cases where the warehouse is configured at client side, + // and used by Polaris server as catalog name. + ImmutableMap.Builder<String, String> queryParams = ImmutableMap.builder(); + if (properties.containsKey(CatalogProperties.WAREHOUSE_LOCATION)) { + queryParams.put( + CatalogProperties.WAREHOUSE_LOCATION, + properties.get(CatalogProperties.WAREHOUSE_LOCATION)); + } + + ConfigResponse configResponse = + client.get( + ResourcePaths.config(), + queryParams.build(), + ConfigResponse.class, + headers, + ErrorHandlers.defaultErrorHandler()); + configResponse.validate(); + return configResponse; + } + + @Override + public void close() throws IOException { + if (closeables != null) { + closeables.close(); + } + } + + @Override + public List<TableIdentifier> listGenericTables(Namespace ns) { + throw new UnsupportedOperationException("listTables not supported"); + } + + @Override + public boolean dropGenericTable(TableIdentifier identifier) { + throw new UnsupportedOperationException("dropTable not supported"); + } + + @Override + public GenericTable createGenericTable( + TableIdentifier identifier, String format, String doc, Map<String, String> props) { + Endpoint.check(endpoints, PolarisEndpoints.V1_CREATE_GENERIC_TABLE); + CreateGenericTableRESTRequest request = + new CreateGenericTableRESTRequest(identifier.name(), format, doc, props); + + LoadGenericTableRESTResponse response = + restClient + .withAuthSession(this.catalogAuth) + .post( + pathGenerator.genericTables(identifier.namespace()), + request, + LoadGenericTableRESTResponse.class, + Map.of(), + ErrorHandlers.tableErrorHandler()); + + return response.getTable(); + } + + @Override + public GenericTable loadGenericTable(TableIdentifier identifier) { + Endpoint.check(endpoints, PolarisEndpoints.V1_LOAD_GENERIC_TABLE); + LoadGenericTableRESTResponse response = + restClient + .withAuthSession(this.catalogAuth) + .get( + pathGenerator.genericTable(identifier), + null, + LoadGenericTableRESTResponse.class, + Map.of(), + ErrorHandlers.tableErrorHandler()); + + return response.getTable(); + } +} diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java new file mode 100644 index 000000000..8f8c07fba --- /dev/null +++ b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark; + +import java.util.Map; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.spark.Spark3Util; +import org.apache.polaris.service.types.GenericTable; +import org.apache.polaris.spark.utils.PolarisCatalogUtils; +import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.TableChange; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +/** + * A spark TableCatalog Implementation interacts with Polaris specific APIs only. The APIs it + * interacts with is generic table APIs, and all table operations performed in this class are + * expected to be for non-iceberg tables. + */ +public class PolarisSparkCatalog implements TableCatalog { + + private PolarisCatalog polarisCatalog = null; + private String catalogName = null; + + public PolarisSparkCatalog(PolarisCatalog polarisCatalog) { + this.polarisCatalog = polarisCatalog; + } + + @Override + public void initialize(String name, CaseInsensitiveStringMap options) { + this.catalogName = name; + } + + @Override + public String name() { + return catalogName; + } + + @Override + public Table loadTable(Identifier identifier) throws NoSuchTableException { + try { + GenericTable genericTable = + this.polarisCatalog.loadGenericTable(Spark3Util.identifierToTableIdentifier(identifier)); + return PolarisCatalogUtils.loadSparkTable(genericTable); + } catch (org.apache.iceberg.exceptions.NoSuchTableException e) { + throw new NoSuchTableException(identifier); + } + } + + @Override + public Table createTable( + Identifier identifier, + StructType schema, + Transform[] transforms, + Map<String, String> properties) + throws TableAlreadyExistsException, NoSuchNamespaceException { + try { + String format = properties.get(PolarisCatalogUtils.TABLE_PROVIDER_KEY); + GenericTable genericTable = + this.polarisCatalog.createGenericTable( + Spark3Util.identifierToTableIdentifier(identifier), format, null, properties); + return PolarisCatalogUtils.loadSparkTable(genericTable); + } catch (AlreadyExistsException e) { + throw new TableAlreadyExistsException(identifier); + } + } + + @Override + public Table alterTable(Identifier identifier, TableChange... changes) + throws NoSuchTableException { + throw new NoSuchTableException(identifier); + } + + @Override + public boolean dropTable(Identifier identifier) { + return false; + } + + @Override + public void renameTable(Identifier from, Identifier to) + throws NoSuchTableException, TableAlreadyExistsException { + throw new UnsupportedOperationException("renameTable operation is not supported"); + } + + @Override + public Identifier[] listTables(String[] namespace) { + throw new UnsupportedOperationException("listTables operation is not supported"); + } +} diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java index e38bbe1ad..cf46d9a15 100644 --- a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java +++ b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/SparkCatalog.java @@ -18,10 +18,17 @@ */ package org.apache.polaris.spark; -import com.google.common.collect.ImmutableSet; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; import java.util.Map; -import java.util.Set; +import org.apache.arrow.util.VisibleForTesting; +import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.rest.auth.OAuth2Util; import org.apache.iceberg.spark.SupportsReplaceView; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.polaris.spark.utils.DeltaHelper; +import org.apache.polaris.spark.utils.PolarisCatalogUtils; import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; @@ -42,42 +49,114 @@ import org.apache.spark.sql.connector.catalog.ViewChange; import org.apache.spark.sql.connector.expressions.Transform; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * SparkCatalog Implementation that is able to interact with both Iceberg SparkCatalog and Polaris + * SparkCatalog. All namespaces and view related operations continue goes through the Iceberg + * SparkCatalog. For table operations, depends on the table format, the operation can be achieved + * with interaction with both Iceberg and Polaris SparkCatalog. + */ public class SparkCatalog implements StagingTableCatalog, TableCatalog, SupportsNamespaces, ViewCatalog, SupportsReplaceView { + private static final Logger LOG = LoggerFactory.getLogger(SparkCatalog.class); - private static final Set<String> DEFAULT_NS_KEYS = ImmutableSet.of(TableCatalog.PROP_OWNER); - private String catalogName = null; - private org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null; - - // TODO: Add Polaris Specific REST Catalog + @VisibleForTesting protected String catalogName = null; + @VisibleForTesting protected org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null; + @VisibleForTesting protected PolarisSparkCatalog polarisSparkCatalog = null; + @VisibleForTesting protected DeltaHelper deltaHelper = null; @Override public String name() { return catalogName; } + /** + * Check whether invalid catalog configuration is provided, and return an option map with catalog + * type configured correctly. This function mainly validates two parts: 1) No customized catalog + * implementation is provided. 2) No non-rest catalog type is configured. + */ + @VisibleForTesting + public CaseInsensitiveStringMap validateAndResolveCatalogOptions( + CaseInsensitiveStringMap options) { + Preconditions.checkArgument( + options.get(CatalogProperties.CATALOG_IMPL) == null, + "Customized catalog implementation is not supported and not needed, please remove the configuration!"); + + String catalogType = + PropertyUtil.propertyAsString( + options, CatalogUtil.ICEBERG_CATALOG_TYPE, CatalogUtil.ICEBERG_CATALOG_TYPE_REST); + Preconditions.checkArgument( + catalogType.equals(CatalogUtil.ICEBERG_CATALOG_TYPE_REST), + "Only rest catalog type is allowed, but got catalog type: " + + catalogType + + ". Either configure the type to rest or remove the config"); + + Map<String, String> resolvedOptions = Maps.newHashMap(); + resolvedOptions.putAll(options); + // when no catalog type is configured, iceberg uses hive by default. Here, we make sure the + // type is set to rest since we only support rest catalog. + resolvedOptions.put(CatalogUtil.ICEBERG_CATALOG_TYPE, CatalogUtil.ICEBERG_CATALOG_TYPE_REST); + + return new CaseInsensitiveStringMap(resolvedOptions); + } + + /** + * Initialize REST Catalog for Iceberg and Polaris, this is the only catalog type supported by + * Polaris at this moment. + */ + private void initRESTCatalog(String name, CaseInsensitiveStringMap options) { + CaseInsensitiveStringMap resolvedOptions = validateAndResolveCatalogOptions(options); + + // initialize the icebergSparkCatalog + this.icebergsSparkCatalog = new org.apache.iceberg.spark.SparkCatalog(); + this.icebergsSparkCatalog.initialize(name, resolvedOptions); + + // initialize the polaris spark catalog + OAuth2Util.AuthSession catalogAuth = + PolarisCatalogUtils.getAuthSession(this.icebergsSparkCatalog); + PolarisRESTCatalog restCatalog = new PolarisRESTCatalog(); + restCatalog.initialize(options, catalogAuth); + this.polarisSparkCatalog = new PolarisSparkCatalog(restCatalog); + this.polarisSparkCatalog.initialize(name, resolvedOptions); + } + @Override public void initialize(String name, CaseInsensitiveStringMap options) { this.catalogName = name; - this.icebergsSparkCatalog = new org.apache.iceberg.spark.SparkCatalog(); - this.icebergsSparkCatalog.initialize(name, options); + initRESTCatalog(name, options); + this.deltaHelper = new DeltaHelper(options); } @Override public Table loadTable(Identifier ident) throws NoSuchTableException { - throw new UnsupportedOperationException("loadTable"); + try { + return this.icebergsSparkCatalog.loadTable(ident); + } catch (NoSuchTableException e) { + return this.polarisSparkCatalog.loadTable(ident); + } } @Override public Table createTable( Identifier ident, StructType schema, Transform[] transforms, Map<String, String> properties) - throws TableAlreadyExistsException { - throw new UnsupportedOperationException("createTable"); + throws TableAlreadyExistsException, NoSuchNamespaceException { + String provider = properties.get(PolarisCatalogUtils.TABLE_PROVIDER_KEY); + if (PolarisCatalogUtils.useIceberg(provider)) { + return this.icebergsSparkCatalog.createTable(ident, schema, transforms, properties); + } else if (PolarisCatalogUtils.useDelta(provider)) { + // For delta table, we load the delta catalog to help dealing with the + // delta log creation. + TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog); + return deltaCatalog.createTable(ident, schema, transforms, properties); + } else { + return this.polarisSparkCatalog.createTable(ident, schema, transforms, properties); + } } @Override diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java new file mode 100644 index 000000000..4ec348a80 --- /dev/null +++ b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/CreateGenericTableRESTRequest.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.Map; +import org.apache.iceberg.rest.RESTRequest; +import org.apache.polaris.service.types.CreateGenericTableRequest; + +/** + * RESTRequest definition for CreateGenericTable which extends the iceberg RESTRequest. This is + * currently required because the Iceberg HTTPClient requires the request and response to be a class + * of RESTRequest and RESTResponse. + */ +public class CreateGenericTableRESTRequest extends CreateGenericTableRequest + implements RESTRequest { + + @JsonCreator + public CreateGenericTableRESTRequest( + @JsonProperty(value = "name", required = true) String name, + @JsonProperty(value = "format", required = true) String format, + @JsonProperty(value = "doc") String doc, + @JsonProperty(value = "properties") Map<String, String> properties) { + super(name, format, doc, properties); + } + + @Override + public void validate() {} +} diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java new file mode 100644 index 000000000..68c738dae --- /dev/null +++ b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/rest/LoadGenericTableRESTResponse.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.rest; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.iceberg.rest.RESTResponse; +import org.apache.polaris.service.types.GenericTable; +import org.apache.polaris.service.types.LoadGenericTableResponse; + +/** + * RESTResponse definition for LoadGenericTable which extends the iceberg RESTResponse. This is + * currently required because the Iceberg HTTPClient requires the request and response to be a class + * of RESTRequest and RESTResponse. + */ +public class LoadGenericTableRESTResponse extends LoadGenericTableResponse implements RESTResponse { + + @JsonCreator + public LoadGenericTableRESTResponse( + @JsonProperty(value = "table", required = true) GenericTable table) { + super(table); + } + + @Override + public void validate() {} +} diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java new file mode 100644 index 000000000..297438424 --- /dev/null +++ b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/DeltaHelper.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.utils; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import org.apache.iceberg.common.DynConstructors; +import org.apache.polaris.spark.PolarisSparkCatalog; +import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DeltaHelper { + private static final Logger LOG = LoggerFactory.getLogger(DeltaHelper.class); + + public static final String DELTA_CATALOG_IMPL_KEY = "delta-catalog-impl"; + private static final String DEFAULT_DELTA_CATALOG_CLASS = + "org.apache.spark.sql.delta.catalog.DeltaCatalog"; + + private TableCatalog deltaCatalog = null; + private String deltaCatalogImpl = DEFAULT_DELTA_CATALOG_CLASS; + + public DeltaHelper(CaseInsensitiveStringMap options) { + if (options.get(DELTA_CATALOG_IMPL_KEY) != null) { + this.deltaCatalogImpl = options.get(DELTA_CATALOG_IMPL_KEY); + } + } + + public TableCatalog loadDeltaCatalog(PolarisSparkCatalog polarisSparkCatalog) { + if (this.deltaCatalog != null) { + return this.deltaCatalog; + } + + DynConstructors.Ctor<TableCatalog> ctor; + try { + ctor = DynConstructors.builder(TableCatalog.class).impl(deltaCatalogImpl).buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException( + String.format("Cannot initialize Delta Catalog %s: %s", deltaCatalogImpl, e.getMessage()), + e); + } + + try { + this.deltaCatalog = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize Delta Catalog, %s does not implement Table Catalog.", + deltaCatalogImpl), + e); + } + + // set the polaris spark catalog as the delegate catalog of delta catalog + ((DelegatingCatalogExtension) this.deltaCatalog).setDelegateCatalog(polarisSparkCatalog); + + // We want to behave exactly the same as unity catalog for Delta. However, DeltaCatalog + // implementation today is hard coded for unity catalog. Following issue is used to track + // the extension of the usage https://github.com/delta-io/delta/issues/4306. + // Here, we use reflection to set the isUnityCatalog to true for exactly same behavior as + // unity catalog for now. + try { + // isUnityCatalog is a lazy val, access the compute method for the lazy val + // make sure the method is triggered before the value is set, otherwise, the + // value will be overwritten later when the method is triggered. + String methodGetName = "isUnityCatalog" + "$lzycompute"; + Method method = this.deltaCatalog.getClass().getDeclaredMethod(methodGetName); + method.setAccessible(true); + // invoke the lazy methods before it is set + method.invoke(this.deltaCatalog); + } catch (NoSuchMethodException e) { + LOG.warn("No lazy compute method found for variable isUnityCatalog"); + } catch (Exception e) { + throw new RuntimeException("Failed to invoke the lazy compute methods for isUnityCatalog", e); + } + + try { + Field field = this.deltaCatalog.getClass().getDeclaredField("isUnityCatalog"); + field.setAccessible(true); + field.set(this.deltaCatalog, true); + } catch (NoSuchFieldException e) { + throw new RuntimeException( + "Failed find the isUnityCatalog field, delta-spark version >= 3.2.1 is required", e); + } catch (IllegalAccessException e) { + throw new RuntimeException("Failed to set the isUnityCatalog field", e); + } + + return this.deltaCatalog; + } +} diff --git a/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java new file mode 100644 index 000000000..01a4af45d --- /dev/null +++ b/plugins/spark/v3.5/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.utils; + +import com.google.common.collect.Maps; +import java.lang.reflect.Field; +import java.util.Map; +import org.apache.iceberg.CachingCatalog; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.rest.RESTCatalog; +import org.apache.iceberg.rest.RESTSessionCatalog; +import org.apache.iceberg.rest.auth.OAuth2Util; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.polaris.service.types.GenericTable; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.TableProvider; +import org.apache.spark.sql.execution.datasources.DataSource; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils; +import org.apache.spark.sql.util.CaseInsensitiveStringMap; + +public class PolarisCatalogUtils { + public static final String TABLE_PROVIDER_KEY = "provider"; + public static final String TABLE_PATH_KEY = "path"; + + /** Check whether the table provider is iceberg. */ + public static boolean useIceberg(String provider) { + return provider == null || "iceberg".equalsIgnoreCase(provider); + } + + /** Check whether the table provider is delta. */ + public static boolean useDelta(String provider) { + return "delta".equalsIgnoreCase(provider); + } + + /** + * Load spark table using DataSourceV2. + * + * @return V2Table if DataSourceV2 is available for the table format. For delta table, it returns + * DeltaTableV2. + */ + public static Table loadSparkTable(GenericTable genericTable) { + SparkSession sparkSession = SparkSession.active(); + TableProvider provider = + DataSource.lookupDataSourceV2(genericTable.getFormat(), sparkSession.sessionState().conf()) + .get(); + Map<String, String> properties = genericTable.getProperties(); + boolean hasLocationClause = properties.get(TableCatalog.PROP_LOCATION) != null; + boolean hasPathClause = properties.get(TABLE_PATH_KEY) != null; + Map<String, String> tableProperties = Maps.newHashMap(); + tableProperties.putAll(properties); + if (!hasPathClause && hasLocationClause) { + // DataSourceV2 requires the path property on table loading. However, spark today + // doesn't create the corresponding path property if the path keyword is not + // provided by user when location is provided. Here, we duplicate the location + // property as path to make sure the table can be loaded. + tableProperties.put(TABLE_PATH_KEY, properties.get(TableCatalog.PROP_LOCATION)); + } + return DataSourceV2Utils.getTableFromProvider( + provider, new CaseInsensitiveStringMap(tableProperties), scala.Option.empty()); + } + + /** + * Get the catalogAuth field inside the RESTSessionCatalog used by Iceberg Spark Catalog use + * reflection. TODO: Deprecate this function once the iceberg client is updated to 1.9.0 to use + * AuthManager and the capability of injecting an AuthManger is available. Related iceberg PR: + * https://github.com/apache/iceberg/pull/12655 + */ + public static OAuth2Util.AuthSession getAuthSession(SparkCatalog sparkCatalog) { + try { + Field icebergCatalogField = sparkCatalog.getClass().getDeclaredField("icebergCatalog"); + icebergCatalogField.setAccessible(true); + Catalog icebergCatalog = (Catalog) icebergCatalogField.get(sparkCatalog); + RESTCatalog icebergRestCatalog; + if (icebergCatalog instanceof CachingCatalog) { + Field catalogField = icebergCatalog.getClass().getDeclaredField("catalog"); + catalogField.setAccessible(true); + icebergRestCatalog = (RESTCatalog) catalogField.get(icebergCatalog); + } else { + icebergRestCatalog = (RESTCatalog) icebergCatalog; + } + + Field sessionCatalogField = icebergRestCatalog.getClass().getDeclaredField("sessionCatalog"); + sessionCatalogField.setAccessible(true); + RESTSessionCatalog sessionCatalog = + (RESTSessionCatalog) sessionCatalogField.get(icebergRestCatalog); + + Field authField = sessionCatalog.getClass().getDeclaredField("catalogAuth"); + authField.setAccessible(true); + return (OAuth2Util.AuthSession) authField.get(sessionCatalog); + } catch (Exception e) { + throw new RuntimeException("Failed to get the catalogAuth from the Iceberg SparkCatalog", e); + } + } +} diff --git a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java new file mode 100644 index 000000000..c11e8de3b --- /dev/null +++ b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/NoopDeltaCatalog.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark; + +import org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension; + +/** + * This is a fake delta catalog class that is used for testing. This class is a noop class that + * directly passes all calls to the delegate CatalogPlugin configured as part of + * DelegatingCatalogExtension. + */ +public class NoopDeltaCatalog extends DelegatingCatalogExtension { + // This is a mock of isUnityCatalog scala val in + // org.apache.spark.sql.delta.catalog.DeltaCatalog. + private boolean isUnityCatalog = false; +} diff --git a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java new file mode 100644 index 000000000..5c3d59710 --- /dev/null +++ b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/PolarisInMemoryCatalog.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark; + +import com.google.common.collect.Maps; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentMap; +import java.util.stream.Collectors; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.inmemory.InMemoryCatalog; +import org.apache.polaris.service.types.GenericTable; + +/** InMemory implementation for the Polaris Catalog. This class is mainly used by testing. */ +public class PolarisInMemoryCatalog extends InMemoryCatalog implements PolarisCatalog { + private final ConcurrentMap<TableIdentifier, GenericTable> genericTables; + + public PolarisInMemoryCatalog() { + this.genericTables = Maps.newConcurrentMap(); + } + + @Override + public List<TableIdentifier> listGenericTables(Namespace ns) { + return this.genericTables.keySet().stream() + .filter(t -> t.namespace().equals(ns)) + .sorted(Comparator.comparing(TableIdentifier::toString)) + .collect(Collectors.toList()); + } + + @Override + public GenericTable loadGenericTable(TableIdentifier identifier) { + GenericTable table = this.genericTables.get(identifier); + if (table == null) { + throw new NoSuchTableException("Generic table does not exist: %s", identifier); + } + + return table; + } + + @Override + public boolean dropGenericTable(TableIdentifier identifier) { + return null != this.genericTables.remove(identifier); + } + + @Override + public GenericTable createGenericTable( + TableIdentifier identifier, String format, String doc, Map<String, String> props) { + if (!namespaceExists(identifier.namespace())) { + throw new NoSuchNamespaceException( + "Cannot create generic table %s. Namespace does not exist: %s", + identifier, identifier.namespace()); + } + + GenericTable previous = + this.genericTables.putIfAbsent( + identifier, + GenericTable.builder() + .setName(identifier.name()) + .setFormat(format) + .setProperties(props) + .build()); + + if (previous != null) { + throw new AlreadyExistsException("Generic table already exists: %s", identifier); + } + + return this.genericTables.get(identifier); + } +} diff --git a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java index 70e9b00c5..0d142cbcb 100644 --- a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java +++ b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/SparkCatalogTest.java @@ -19,7 +19,6 @@ package org.apache.polaris.spark; import static org.apache.iceberg.CatalogProperties.CATALOG_IMPL; -import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -28,32 +27,85 @@ import java.util.Arrays; import java.util.Map; import java.util.UUID; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.Schema; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.spark.SparkUtil; -import org.apache.iceberg.types.Types; +import org.apache.iceberg.spark.source.SparkTable; +import org.apache.polaris.spark.utils.DeltaHelper; +import org.apache.polaris.spark.utils.PolarisCatalogUtils; import org.apache.spark.SparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException; import org.apache.spark.sql.catalyst.analysis.NoSuchViewException; -import org.apache.spark.sql.connector.catalog.*; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.NamespaceChange; +import org.apache.spark.sql.connector.catalog.Table; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.connector.catalog.TableProvider; +import org.apache.spark.sql.connector.catalog.V1Table; +import org.apache.spark.sql.connector.catalog.View; +import org.apache.spark.sql.connector.catalog.ViewChange; import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.execution.datasources.DataSource; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.internal.SessionState; import org.apache.spark.sql.types.StructType; import org.apache.spark.sql.util.CaseInsensitiveStringMap; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.MockedStatic; import org.mockito.Mockito; +import scala.Option; public class SparkCatalogTest { - private SparkCatalog catalog; + private static class InMemoryIcebergSparkCatalog extends org.apache.iceberg.spark.SparkCatalog { + private PolarisInMemoryCatalog inMemoryCatalog = null; + + @Override + protected Catalog buildIcebergCatalog(String name, CaseInsensitiveStringMap options) { + PolarisInMemoryCatalog inMemoryCatalog = new PolarisInMemoryCatalog(); + inMemoryCatalog.initialize(name, options); + + this.inMemoryCatalog = inMemoryCatalog; + + return inMemoryCatalog; + } + + public PolarisInMemoryCatalog getInMemoryCatalog() { + return this.inMemoryCatalog; + } + } + + /** + * And SparkCatalog implementation that uses InMemory catalog implementation for both Iceberg and + * Polaris + */ + private static class InMemorySparkCatalog extends SparkCatalog { + @Override + public void initialize(String name, CaseInsensitiveStringMap options) { + this.catalogName = name; + // initialize the InMemory icebergSparkCatalog + this.icebergsSparkCatalog = new InMemoryIcebergSparkCatalog(); + this.icebergsSparkCatalog.initialize(name, options); + + // initialize the polarisSparkCatalog with PolarisSparkCatalog + this.polarisSparkCatalog = + new PolarisSparkCatalog( + ((InMemoryIcebergSparkCatalog) this.icebergsSparkCatalog).getInMemoryCatalog()); + this.polarisSparkCatalog.initialize(name, options); + + this.deltaHelper = new DeltaHelper(options); + } + } + + private InMemorySparkCatalog catalog; private String catalogName; private static final String[] defaultNS = new String[] {"ns"}; - private static final Schema defaultSchema = - new Schema( - 5, - required(3, "id", Types.IntegerType.get(), "unique ID"), - required(4, "data", Types.StringType.get())); @BeforeEach public void setup() throws Exception { @@ -61,8 +113,9 @@ public class SparkCatalogTest { Map<String, String> catalogConfig = Maps.newHashMap(); catalogConfig.put(CATALOG_IMPL, "org.apache.iceberg.inmemory.InMemoryCatalog"); catalogConfig.put("cache-enabled", "false"); - - catalog = new SparkCatalog(); + catalogConfig.put( + DeltaHelper.DELTA_CATALOG_IMPL_KEY, "org.apache.polaris.spark.NoopDeltaCatalog"); + catalog = new InMemorySparkCatalog(); Configuration conf = new Configuration(); try (MockedStatic<SparkSession> mockedStaticSparkSession = Mockito.mockStatic(SparkSession.class); @@ -83,6 +136,34 @@ public class SparkCatalogTest { catalog.createNamespace(defaultNS, Maps.newHashMap()); } + @Test + void testCatalogValidation() { + Map<String, String> catalogConfigWithImpl = Maps.newHashMap(); + catalogConfigWithImpl.put(CATALOG_IMPL, "org.apache.iceberg.inmemory.InMemoryCatalog"); + catalogConfigWithImpl.put("cache-enabled", "false"); + SparkCatalog testCatalog = new SparkCatalog(); + assertThatThrownBy( + () -> + testCatalog.validateAndResolveCatalogOptions( + new CaseInsensitiveStringMap(catalogConfigWithImpl))) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Customized catalog implementation is not supported and not needed"); + + Map<String, String> catalogConfigInvalidType = Maps.newHashMap(); + catalogConfigInvalidType.put(CatalogUtil.ICEBERG_CATALOG_TYPE, "hive"); + assertThatThrownBy( + () -> + testCatalog.validateAndResolveCatalogOptions( + new CaseInsensitiveStringMap(catalogConfigInvalidType))) + .isInstanceOf(IllegalArgumentException.class); + + CaseInsensitiveStringMap resolvedMap = + testCatalog.validateAndResolveCatalogOptions( + new CaseInsensitiveStringMap(Maps.newHashMap())); + assertThat(resolvedMap.get(CatalogUtil.ICEBERG_CATALOG_TYPE)) + .isEqualTo(CatalogUtil.ICEBERG_CATALOG_TYPE_REST); + } + @Test void testCreateAndLoadNamespace() throws Exception { String[] namespace = new String[] {"ns1"}; @@ -286,17 +367,87 @@ public class SparkCatalogTest { } } + @Test + void testCreateAndLoadIcebergTable() throws Exception { + Identifier identifier = Identifier.of(defaultNS, "iceberg-table"); + Map<String, String> properties = Maps.newHashMap(); + properties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "iceberg"); + properties.put(TableCatalog.PROP_LOCATION, "file:///tmp/path/to/table/"); + StructType schema = new StructType().add("boolType", "boolean"); + + Table createdTable = catalog.createTable(identifier, schema, new Transform[0], properties); + assertThat(createdTable).isInstanceOf(SparkTable.class); + + // load the table + Table table = catalog.loadTable(identifier); + // verify iceberg SparkTable is loaded + assertThat(table).isInstanceOf(SparkTable.class); + + // verify create table with the same identifier fails with spark TableAlreadyExistsException + StructType newSchema = new StructType().add("LongType", "Long"); + Map<String, String> newProperties = Maps.newHashMap(); + newProperties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "iceberg"); + newProperties.put(TableCatalog.PROP_LOCATION, "file:///tmp/path/to/table/"); + assertThatThrownBy( + () -> catalog.createTable(identifier, newSchema, new Transform[0], newProperties)) + .isInstanceOf(TableAlreadyExistsException.class); + } + + @ParameterizedTest + @ValueSource(strings = {"delta", "csv"}) + void testCreateAndLoadGenericTable(String format) throws Exception { + Identifier identifier = Identifier.of(defaultNS, "generic-test-table"); + Map<String, String> properties = Maps.newHashMap(); + properties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, format); + properties.put(TableCatalog.PROP_LOCATION, "file:///tmp/delta/path/to/table/"); + StructType schema = new StructType().add("boolType", "boolean"); + + SQLConf conf = new SQLConf(); + try (MockedStatic<SparkSession> mockedStaticSparkSession = + Mockito.mockStatic(SparkSession.class); + MockedStatic<DataSource> mockedStaticDS = Mockito.mockStatic(DataSource.class); + MockedStatic<DataSourceV2Utils> mockedStaticDSV2 = + Mockito.mockStatic(DataSourceV2Utils.class)) { + SparkSession mockedSession = Mockito.mock(SparkSession.class); + mockedStaticSparkSession.when(SparkSession::active).thenReturn(mockedSession); + SessionState mockedState = Mockito.mock(SessionState.class); + Mockito.when(mockedSession.sessionState()).thenReturn(mockedState); + Mockito.when(mockedState.conf()).thenReturn(conf); + + TableProvider provider = Mockito.mock(TableProvider.class); + mockedStaticDS + .when(() -> DataSource.lookupDataSourceV2(Mockito.eq(format), Mockito.any())) + .thenReturn(Option.apply(provider)); + V1Table table = Mockito.mock(V1Table.class); + mockedStaticDSV2 + .when( + () -> + DataSourceV2Utils.getTableFromProvider( + Mockito.eq(provider), Mockito.any(), Mockito.any())) + .thenReturn(table); + Table createdTable = catalog.createTable(identifier, schema, new Transform[0], properties); + assertThat(createdTable).isInstanceOf(V1Table.class); + + // load the table + Table loadedTable = catalog.loadTable(identifier); + assertThat(loadedTable).isInstanceOf(V1Table.class); + } + + StructType newSchema = new StructType().add("LongType", "Long"); + Map<String, String> newProperties = Maps.newHashMap(); + newProperties.put(PolarisCatalogUtils.TABLE_PROVIDER_KEY, "parquet"); + newProperties.put(TableCatalog.PROP_LOCATION, "file:///tmp/path/to/table/"); + assertThatThrownBy( + () -> catalog.createTable(identifier, newSchema, new Transform[0], newProperties)) + .isInstanceOf(TableAlreadyExistsException.class); + } + @Test public void testUnsupportedOperations() { String[] namespace = new String[] {"ns1"}; Identifier identifier = Identifier.of(namespace, "table1"); Identifier new_identifier = Identifier.of(namespace, "table2"); // table methods - assertThatThrownBy(() -> catalog.loadTable(identifier)) - .isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy( - () -> catalog.createTable(identifier, Mockito.mock(StructType.class), null, null)) - .isInstanceOf(UnsupportedOperationException.class); assertThatThrownBy(() -> catalog.alterTable(identifier)) .isInstanceOf(UnsupportedOperationException.class); assertThatThrownBy(() -> catalog.dropTable(identifier)) diff --git a/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java new file mode 100644 index 000000000..542fd05d8 --- /dev/null +++ b/plugins/spark/v3.5/src/test/java/org/apache/polaris/spark/rest/DeserializationTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.polaris.spark.rest; + +import static org.assertj.core.api.Assertions.assertThat; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; +import java.util.Map; +import java.util.stream.Stream; +import org.apache.polaris.service.types.GenericTable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +public class DeserializationTest { + private ObjectMapper mapper; + + @BeforeEach + public void setUp() { + mapper = new ObjectMapper(); + } + + @ParameterizedTest + @MethodSource("genericTableTestCases") + public void testLoadGenericTableRESTResponse(String doc, Map<String, String> properties) + throws JsonProcessingException { + GenericTable table = + GenericTable.builder() + .setFormat("delta") + .setName("test-table") + .setProperties(properties) + .setDoc(doc) + .build(); + LoadGenericTableRESTResponse response = new LoadGenericTableRESTResponse(table); + String json = mapper.writeValueAsString(response); + LoadGenericTableRESTResponse deserializedResponse = + mapper.readValue(json, LoadGenericTableRESTResponse.class); + assertThat(deserializedResponse.getTable().getFormat()).isEqualTo("delta"); + assertThat(deserializedResponse.getTable().getName()).isEqualTo("test-table"); + assertThat(deserializedResponse.getTable().getDoc()).isEqualTo(doc); + assertThat(deserializedResponse.getTable().getProperties().size()).isEqualTo(properties.size()); + } + + @ParameterizedTest + @MethodSource("genericTableTestCases") + public void testCreateGenericTableRESTRequest(String doc, Map<String, String> properties) + throws JsonProcessingException { + CreateGenericTableRESTRequest request = + new CreateGenericTableRESTRequest("test-table", "delta", doc, properties); + String json = mapper.writeValueAsString(request); + CreateGenericTableRESTRequest deserializedRequest = + mapper.readValue(json, CreateGenericTableRESTRequest.class); + assertThat(deserializedRequest.getName()).isEqualTo("test-table"); + assertThat(deserializedRequest.getFormat()).isEqualTo("delta"); + assertThat(deserializedRequest.getDoc()).isEqualTo(doc); + assertThat(deserializedRequest.getProperties().size()).isEqualTo(properties.size()); + } + + private static Stream<Arguments> genericTableTestCases() { + var doc = "table for testing"; + var properties = Maps.newHashMap(); + properties.put("location", "s3://path/to/table/"); + return Stream.of( + Arguments.of(doc, properties), + Arguments.of(null, Maps.newHashMap()), + Arguments.of(doc, Maps.newHashMap()), + Arguments.of(null, properties)); + } +}