mayursrivastava commented on a change in pull request #3294: URL: https://github.com/apache/iceberg/pull/3294#discussion_r735204291
########## File path: core/src/main/java/org/apache/iceberg/io/inmemory/InMemoryCatalogDb.java ########## @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.io.inmemory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.CommitFailedException; +import org.apache.iceberg.exceptions.NoSuchNamespaceException; +import org.apache.iceberg.relocated.com.google.common.base.Objects; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; + +final class InMemoryCatalogDb implements AutoCloseable { + + private final ConcurrentMap<Namespace, Map<String, String>> namespaceDb; + private final ConcurrentMap<TableIdentifier, String> tableDb; + + InMemoryCatalogDb() { + this.namespaceDb = new ConcurrentHashMap<>(); + this.tableDb = new ConcurrentHashMap<>(); + } + + /** + * Put the namespace in the namespace db if it does not already exist. + */ + void putNamespaceEntryIfAbsent(Namespace namespace, Map<String, String> properties) { + namespaceDb.computeIfAbsent(namespace, k -> new HashMap<>(properties)); + } + + /** + * Get a copy of the namespace properties. + * This method returns {@code null} if the namespace does not exist. + */ + ImmutableMap<String, String> getNamespaceProperties(Namespace namespace) { + Map<String, String> properties = namespaceDb.get(namespace); + return properties != null ? ImmutableMap.copyOf(properties) : null; + } + + /** + * Put properties in existing namespace, overwriting previous key-values. + */ + void putNamespaceProperties(Namespace namespace, Map<String, String> properties) { + namespaceDb.compute(namespace, (k, v) -> { + if (v == null) { + throw new IllegalStateException("Namespace does not exist: " + namespace); + } else { + v.putAll(properties); + return v; + } + }); + } + + /** + * Remove properties from existing namespace. + */ + void removeNamespaceProperties(Namespace namespace, Set<String> properties) throws NoSuchNamespaceException { + namespaceDb.compute(namespace, (k, v) -> { + if (v == null) { + throw new IllegalStateException("Namespace does not exist: " + namespace); + } else { + Map<String, String> newProperties = new HashMap<>(v); + properties.forEach(newProperties::remove); + return newProperties; + } + }); + } + + /** + * Remove the namespace from the namespace db. + * This method returns {@code true} if the namespace was successfully removed, + * and {@code false} otherwise. + */ + boolean removeNamespace(Namespace namespace) { + return namespaceDb.remove(namespace) != null; + } + + /** + * Get the list of namespaces in the namespace db. + */ + ImmutableSet<Namespace> getNamespaceList() { + return ImmutableSet.copyOf(namespaceDb.keySet()); + } + + /** + * Create the table entry if it does not already exist. + * Otherwise, if the existing location matches the {@code expectedOldLocation}, + * update the entry with {@code newLocation}. + * If the namespace does not exist, + * put the namespace entry with empty properties as well. + */ + void createOrUpdateTableEntry(TableIdentifier tableIdentifier, String expectedOldLocation, String newLocation) { + putNamespaceEntryIfAbsent(tableIdentifier.namespace(), ImmutableMap.of()); + tableDb.compute(tableIdentifier, (k, existingLocation) -> { + if (!Objects.equal(existingLocation, expectedOldLocation)) { + throw new CommitFailedException("Table: %s cannot be updated from '%s' to '%s'" + + " because it has been concurrently modified to '%s'", + tableIdentifier, expectedOldLocation, newLocation, existingLocation); + } + return newLocation; + }); + } + + /** + * Get table location. + * This method returns {@code null} if the table does not exist. + */ + String getTableLocation(TableIdentifier tableIdentifier) { Review comment: will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
