This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new ea7665e788 Nessie: Reimplement namespace operations (#8857)
ea7665e788 is described below
commit ea7665e788cb33507e6eb19eddd4f0c24eb92983
Author: Alexandre Dutra <[email protected]>
AuthorDate: Thu Dec 7 12:34:21 2023 +0100
Nessie: Reimplement namespace operations (#8857)
This change enhances the process of creating new namespaces by
retaining commit authorship information when committing the new
namespace.
---
.../apache/iceberg/nessie/NessieIcebergClient.java | 371 +++++++++++------
.../iceberg/nessie/NessieTableOperations.java | 60 ++-
.../java/org/apache/iceberg/nessie/NessieUtil.java | 25 ++
.../apache/iceberg/nessie/TestMultipleClients.java | 80 +++-
.../org/apache/iceberg/nessie/TestNamespace.java | 2 +-
.../iceberg/nessie/TestNessieIcebergClient.java | 458 ++++++++++++++++++++-
6 files changed, 819 insertions(+), 177 deletions(-)
diff --git
a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
index 26b3701816..4cbbe4a562 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
@@ -19,12 +19,17 @@
package org.apache.iceberg.nessie;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
+import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.Namespace;
@@ -36,24 +41,25 @@ import
org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.base.Suppliers;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.Tasks;
import org.projectnessie.client.NessieConfigConstants;
import org.projectnessie.client.api.CommitMultipleOperationsBuilder;
+import org.projectnessie.client.api.GetContentBuilder;
import org.projectnessie.client.api.NessieApiV1;
import org.projectnessie.client.api.OnReferenceBuilder;
import org.projectnessie.client.http.HttpClientException;
import org.projectnessie.error.BaseNessieClientServerException;
import org.projectnessie.error.NessieConflictException;
-import org.projectnessie.error.NessieNamespaceAlreadyExistsException;
-import org.projectnessie.error.NessieNamespaceNotEmptyException;
-import org.projectnessie.error.NessieNamespaceNotFoundException;
+import org.projectnessie.error.NessieContentNotFoundException;
import org.projectnessie.error.NessieNotFoundException;
+import org.projectnessie.error.NessieReferenceConflictException;
import org.projectnessie.error.NessieReferenceNotFoundException;
import org.projectnessie.model.Branch;
+import org.projectnessie.model.Conflict;
import org.projectnessie.model.Content;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.EntriesResponse;
-import org.projectnessie.model.GetNamespacesResponse;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.ImmutableCommitMeta;
import org.projectnessie.model.ImmutableIcebergTable;
@@ -181,133 +187,214 @@ public class NessieIcebergClient implements
AutoCloseable {
}
public void createNamespace(Namespace namespace, Map<String, String>
metadata) {
+ getRef().checkMutable();
+ if (namespace.isEmpty()) {
+ throw new IllegalArgumentException("Creating empty namespaces is not
supported");
+ }
+ ContentKey key = ContentKey.of(namespace.levels());
+ org.projectnessie.model.Namespace content =
+ org.projectnessie.model.Namespace.of(key.getElements(), metadata);
try {
- getRef().checkMutable();
- withReference(
- getApi()
- .createNamespace()
-
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
- .properties(metadata))
- .create();
- refresh();
- } catch (NessieNamespaceAlreadyExistsException e) {
- throw new AlreadyExistsException(e, "Namespace already exists: %s",
namespace);
+ Content existing =
api.getContent().reference(getReference()).key(key).get().get(key);
+ if (existing != null) {
+ throw namespaceAlreadyExists(key, existing, null);
+ }
+ try {
+ commitRetry("create namespace " + key, Operation.Put.of(key, content));
+ } catch (NessieReferenceConflictException e) {
+ Optional<Conflict> conflict =
+ NessieUtil.extractSingleConflict(
+ e,
+ EnumSet.of(
+ Conflict.ConflictType.KEY_EXISTS,
Conflict.ConflictType.NAMESPACE_ABSENT));
+ if (conflict.isPresent()) {
+ switch (conflict.get().conflictType()) {
+ case KEY_EXISTS:
+ Content conflicting =
withReference(api.getContent()).key(key).get().get(key);
+ throw namespaceAlreadyExists(key, conflicting, e);
+ case NAMESPACE_ABSENT:
+ throw new NoSuchNamespaceException(
+ e,
+ "Cannot create namespace '%s': parent namespace '%s' does
not exist",
+ namespace,
+ conflict.get().key());
+ }
+ }
+ throw new RuntimeException(
+ String.format("Cannot create namespace '%s': %s", namespace,
e.getMessage()));
+ }
} catch (NessieNotFoundException e) {
throw new RuntimeException(
String.format(
- "Cannot create Namespace '%s': " + "ref '%s' is no longer
valid.",
+ "Cannot create namespace '%s': ref '%s' is no longer valid.",
namespace, getRef().getName()),
e);
+ } catch (BaseNessieClientServerException e) {
+ throw new RuntimeException(
+ String.format("Cannot create namespace '%s': %s", namespace,
e.getMessage()), e);
}
}
public List<Namespace> listNamespaces(Namespace namespace) throws
NoSuchNamespaceException {
try {
- GetNamespacesResponse response =
- withReference(
- getApi()
- .getMultipleNamespaces()
-
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
- .get();
- return response.getNamespaces().stream()
- .map(ns -> Namespace.of(ns.getElements().toArray(new String[0])))
- .filter(ns -> ns.length() == namespace.length() + 1)
+ String filter = "entry.contentType == 'NAMESPACE' && ";
+ if (namespace.isEmpty()) {
+ filter += "size(entry.keyElements) == 1";
+ } else {
+ org.projectnessie.model.Namespace root =
+ org.projectnessie.model.Namespace.of(namespace.levels());
+ filter +=
+ String.format(
+ "size(entry.keyElements) == %d &&
entry.encodedKey.startsWith('%s.')",
+ root.getElementCount() + 1, root.name());
+ }
+ List<ContentKey> entries =
+ withReference(api.getEntries()).filter(filter).stream()
+ .map(EntriesResponse.Entry::getName)
+ .collect(Collectors.toList());
+ if (entries.isEmpty()) {
+ return Collections.emptyList();
+ }
+ GetContentBuilder getContent = withReference(api.getContent());
+ entries.forEach(getContent::key);
+ return getContent.get().values().stream()
+ .map(v -> v.unwrap(org.projectnessie.model.Namespace.class))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .map(v -> Namespace.of(v.getElements().toArray(new String[0])))
.collect(Collectors.toList());
- } catch (NessieReferenceNotFoundException e) {
- throw new RuntimeException(
- String.format(
- "Cannot list Namespaces starting from '%s': " + "ref '%s' is no
longer valid.",
- namespace, getRef().getName()),
- e);
+ } catch (NessieNotFoundException e) {
+ if (namespace.isEmpty()) {
+ throw new NoSuchNamespaceException(
+ e,
+ "Cannot list top-level namespaces: ref '%s' is no longer valid.",
+ getRef().getName());
+ }
+ throw new NoSuchNamespaceException(
+ e,
+ "Cannot list child namespaces from '%s': ref '%s' is no longer
valid.",
+ namespace,
+ getRef().getName());
}
}
public boolean dropNamespace(Namespace namespace) throws
NamespaceNotEmptyException {
+ getRef().checkMutable();
+ ContentKey key = ContentKey.of(namespace.levels());
try {
- getRef().checkMutable();
- withReference(
- getApi()
- .deleteNamespace()
-
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
- .delete();
- refresh();
- return true;
- } catch (NessieNamespaceNotFoundException e) {
- return false;
+ Map<ContentKey, Content> contentMap =
+ api.getContent().reference(getReference()).key(key).get();
+ Content existing = contentMap.get(key);
+ if (existing != null &&
!existing.getType().equals(Content.Type.NAMESPACE)) {
+ throw new NoSuchNamespaceException(
+ "Content object with name '%s' is not a namespace.", namespace);
+ }
+ try {
+ commitRetry("drop namespace " + key, Operation.Delete.of(key));
+ return true;
+ } catch (NessieReferenceConflictException e) {
+ Optional<Conflict> conflict =
+ NessieUtil.extractSingleConflict(
+ e,
+ EnumSet.of(
+ Conflict.ConflictType.KEY_DOES_NOT_EXIST,
+ Conflict.ConflictType.NAMESPACE_NOT_EMPTY));
+ if (conflict.isPresent()) {
+ Conflict.ConflictType conflictType = conflict.get().conflictType();
+ switch (conflictType) {
+ case KEY_DOES_NOT_EXIST:
+ return false;
+ case NAMESPACE_NOT_EMPTY:
+ throw new NamespaceNotEmptyException(e, "Namespace '%s' is not
empty.", namespace);
+ }
+ }
+ throw new RuntimeException(
+ String.format("Cannot drop namespace '%s': %s", namespace,
e.getMessage()));
+ }
} catch (NessieNotFoundException e) {
LOG.error(
- "Cannot drop Namespace '{}': ref '{}' is no longer valid.",
+ "Cannot drop namespace '{}': ref '{}' is no longer valid.",
namespace,
getRef().getName(),
e);
- return false;
- } catch (NessieNamespaceNotEmptyException e) {
- throw new NamespaceNotEmptyException(
- e, "Namespace '%s' is not empty. One or more tables exist.",
namespace);
+ } catch (BaseNessieClientServerException e) {
+ throw new RuntimeException(
+ String.format("Cannot drop namespace '%s': %s", namespace,
e.getMessage()), e);
}
+ return false;
}
public Map<String, String> loadNamespaceMetadata(Namespace namespace)
throws NoSuchNamespaceException {
+ ContentKey key = ContentKey.of(namespace.levels());
try {
- return withReference(
- getApi()
- .getNamespace()
-
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
- .get()
+ Map<ContentKey, Content> contentMap =
withReference(api.getContent()).key(key).get();
+ return unwrapNamespace(contentMap.get(key))
+ .orElseThrow(
+ () -> new NoSuchNamespaceException("Namespace does not exist:
%s", namespace))
.getProperties();
- } catch (NessieNamespaceNotFoundException e) {
- throw new NoSuchNamespaceException(e, "Namespace does not exist: %s",
namespace);
- } catch (NessieReferenceNotFoundException e) {
+ } catch (NessieNotFoundException e) {
throw new RuntimeException(
String.format(
- "Cannot load Namespace '%s': " + "ref '%s' is no longer valid.",
+ "Cannot load namespace '%s': ref '%s' is no longer valid.",
namespace, getRef().getName()),
e);
}
}
public boolean setProperties(Namespace namespace, Map<String, String>
properties) {
- try {
- withReference(
- getApi()
- .updateProperties()
-
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
- .updateProperties(properties))
- .update();
- refresh();
- // always successful, otherwise an exception is thrown
- return true;
- } catch (NessieNamespaceNotFoundException e) {
- throw new NoSuchNamespaceException(e, "Namespace does not exist: %s",
namespace);
- } catch (NessieNotFoundException e) {
- throw new RuntimeException(
- String.format(
- "Cannot update properties on Namespace '%s': ref '%s' is no
longer valid.",
- namespace, getRef().getName()),
- e);
- }
+ return updateProperties(namespace, props -> props.putAll(properties));
}
public boolean removeProperties(Namespace namespace, Set<String> properties)
{
+ return updateProperties(namespace, props ->
props.keySet().removeAll(properties));
+ }
+
+ private boolean updateProperties(Namespace namespace, Consumer<Map<String,
String>> action) {
+ getRef().checkMutable();
+ ContentKey key = ContentKey.of(namespace.levels());
try {
- withReference(
- getApi()
- .updateProperties()
-
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
- .removeProperties(properties))
- .update();
- refresh();
+ commitRetry(
+ "update namespace " + key,
+ true,
+ commitBuilder -> {
+ org.projectnessie.model.Namespace oldNamespace =
+
unwrapNamespace(api.getContent().reference(getReference()).key(key).get().get(key))
+ .orElseThrow(
+ () -> new NessieContentNotFoundException(key,
getReference().getName()));
+ Map<String, String> newProperties =
Maps.newHashMap(oldNamespace.getProperties());
+ action.accept(newProperties);
+ org.projectnessie.model.Namespace updatedNamespace =
+ org.projectnessie.model.Namespace.builder()
+ .from(oldNamespace)
+ .properties(newProperties)
+ .build();
+ commitBuilder.operation(Operation.Put.of(key, updatedNamespace));
+ return commitBuilder;
+ });
// always successful, otherwise an exception is thrown
return true;
- } catch (NessieNamespaceNotFoundException e) {
- throw new NoSuchNamespaceException(e, "Namespace does not exist: %s",
namespace);
- } catch (NessieNotFoundException e) {
+ } catch (NessieReferenceConflictException e) {
+ Optional<Conflict> conflict =
+ NessieUtil.extractSingleConflict(e,
EnumSet.of(Conflict.ConflictType.KEY_DOES_NOT_EXIST));
+ if (conflict.isPresent()
+ && conflict.get().conflictType() ==
Conflict.ConflictType.KEY_DOES_NOT_EXIST) {
+ throw new NoSuchNamespaceException(e, "Namespace does not exist: %s",
namespace);
+ }
+ throw new RuntimeException(
+ String.format(
+ "Cannot update properties on namespace '%s': %s", namespace,
e.getMessage()));
+ } catch (NessieContentNotFoundException e) {
+ throw new NoSuchNamespaceException("Namespace does not exist: %s",
namespace);
+ } catch (NessieReferenceNotFoundException e) {
throw new RuntimeException(
String.format(
- "Cannot remove properties from Namespace '%s': ref '%s' is no
longer valid.",
+ "Cannot update properties on namespace '%s': ref '%s' is no
longer valid.",
namespace, getRef().getName()),
e);
+ } catch (BaseNessieClientServerException e) {
+ throw new RuntimeException(
+ String.format("Cannot update namespace '%s': %s", namespace,
e.getMessage()), e);
}
}
@@ -323,28 +410,11 @@ public class NessieIcebergClient implements AutoCloseable
{
throw new AlreadyExistsException("Table already exists: %s", to.name());
}
- CommitMultipleOperationsBuilder operations =
- getApi()
- .commitMultipleOperations()
- .commitMeta(
- NessieUtil.buildCommitMetadata(
- String.format("Iceberg rename table from '%s' to '%s'",
from, to),
- catalogOptions))
- .operation(Operation.Delete.of(NessieUtil.toKey(from)))
- .operation(Operation.Put.of(NessieUtil.toKey(to),
existingFromTable));
-
try {
- Tasks.foreach(operations)
- .retry(5)
- .stopRetryOn(NessieNotFoundException.class)
- .throwFailureWhenFinished()
- .onFailure((o, exception) -> refresh())
- .run(
- ops -> {
- Branch branch = ops.branch((Branch)
getRef().getReference()).commit();
- getRef().updateReference(branch);
- },
- BaseNessieClientServerException.class);
+ commitRetry(
+ String.format("Iceberg rename table from '%s' to '%s'", from, to),
+ Operation.Delete.of(NessieUtil.toKey(from)),
+ Operation.Put.of(NessieUtil.toKey(to), existingFromTable));
} catch (NessieNotFoundException e) {
// important note: the NotFoundException refers to the ref only. If a
table was not found it
// would imply that the
@@ -355,13 +425,13 @@ public class NessieIcebergClient implements AutoCloseable
{
// and removed by another.
throw new RuntimeException(
String.format(
- "Cannot rename table '%s' to '%s': " + "ref '%s' no longer
exists.",
+ "Cannot rename table '%s' to '%s': ref '%s' no longer exists.",
from.name(), to.name(), getRef().getName()),
e);
} catch (BaseNessieClientServerException e) {
throw new CommitFailedException(
e,
- "Cannot rename table '%s' to '%s': " + "the current reference is not
up to date.",
+ "Cannot rename table '%s' to '%s': the current reference is not up
to date.",
from.name(),
to.name());
} catch (HttpClientException ex) {
@@ -390,29 +460,12 @@ public class NessieIcebergClient implements AutoCloseable
{
LOG.info("Purging data for table {} was set to true but is ignored",
identifier.toString());
}
- CommitMultipleOperationsBuilder commitBuilderBase =
- getApi()
- .commitMultipleOperations()
- .commitMeta(
- NessieUtil.buildCommitMetadata(
- String.format("Iceberg delete table %s", identifier),
catalogOptions))
- .operation(Operation.Delete.of(NessieUtil.toKey(identifier)));
-
// We try to drop the table. Simple retry after ref update.
- boolean threw = true;
try {
- Tasks.foreach(commitBuilderBase)
- .retry(5)
- .stopRetryOn(NessieNotFoundException.class)
- .throwFailureWhenFinished()
- .onFailure((o, exception) -> refresh())
- .run(
- commitBuilder -> {
- Branch branch = commitBuilder.branch((Branch)
getRef().getReference()).commit();
- getRef().updateReference(branch);
- },
- BaseNessieClientServerException.class);
- threw = false;
+ commitRetry(
+ String.format("Iceberg delete table %s", identifier),
+ Operation.Delete.of(NessieUtil.toKey(identifier)));
+ return true;
} catch (NessieConflictException e) {
LOG.error(
"Cannot drop table: failed after retry (update ref '{}' and retry)",
@@ -423,7 +476,7 @@ public class NessieIcebergClient implements AutoCloseable {
} catch (BaseNessieClientServerException e) {
LOG.error("Cannot drop table: unknown error", e);
}
- return !threw;
+ return false;
}
/** @deprecated will be removed after 1.5.0 */
@@ -540,4 +593,64 @@ public class NessieIcebergClient implements AutoCloseable {
api.close();
}
}
+
+ private void commitRetry(String message, Operation... ops)
+ throws BaseNessieClientServerException {
+ commitRetry(message, false, builder ->
builder.operations(Arrays.asList(ops)));
+ }
+
+ private void commitRetry(String message, boolean retryConflicts,
CommitEnhancer commitEnhancer)
+ throws BaseNessieClientServerException {
+ // Retry all errors except for NessieNotFoundException and also
NessieConflictException, unless
+ // retryConflicts is set to true.
+ Predicate<Exception> shouldRetry =
+ e ->
+ !(e instanceof NessieNotFoundException)
+ && (!(e instanceof NessieConflictException) || retryConflicts);
+ Tasks.range(1)
+ .retry(5)
+ .shouldRetryTest(shouldRetry)
+ .throwFailureWhenFinished()
+ .onFailure((o, exception) -> refresh())
+ .run(
+ i -> {
+ try {
+ Branch branch =
+ commitEnhancer
+ .enhance(api.commitMultipleOperations())
+ .commitMeta(NessieUtil.buildCommitMetadata(message,
catalogOptions))
+ .branch((Branch) getReference())
+ .commit();
+ getRef().updateReference(branch);
+ } catch (NessieConflictException e) {
+ if (retryConflicts) {
+ refresh(); // otherwise retrying a conflict doesn't make
sense
+ }
+ throw e;
+ }
+ },
+ BaseNessieClientServerException.class);
+ }
+
+ private static AlreadyExistsException namespaceAlreadyExists(
+ ContentKey key, @Nullable Content existing, @Nullable Exception ex) {
+ if (existing instanceof org.projectnessie.model.Namespace) {
+ return new AlreadyExistsException(ex, "Namespace already exists: %s",
key);
+ } else {
+ return new AlreadyExistsException(
+ ex, "Another content object with name '%s' already exists", key);
+ }
+ }
+
+ private static Optional<org.projectnessie.model.Namespace>
unwrapNamespace(Content content) {
+ return content == null
+ ? Optional.empty()
+ : content.unwrap(org.projectnessie.model.Namespace.class);
+ }
+
+ private interface CommitEnhancer {
+
+ CommitMultipleOperationsBuilder enhance(CommitMultipleOperationsBuilder
builder)
+ throws BaseNessieClientServerException;
+ }
}
diff --git
a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
index 0b4c293f86..a5d7e7b214 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
@@ -18,7 +18,7 @@
*/
package org.apache.iceberg.nessie;
-import java.util.List;
+import java.util.EnumSet;
import java.util.Map;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.TableMetadata;
@@ -34,9 +34,7 @@ import org.projectnessie.client.http.HttpClientException;
import org.projectnessie.error.NessieConflictException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.error.NessieReferenceConflictException;
-import org.projectnessie.error.ReferenceConflicts;
import org.projectnessie.model.Conflict;
-import org.projectnessie.model.Conflict.ConflictType;
import org.projectnessie.model.Content;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.IcebergTable;
@@ -169,35 +167,33 @@ public class NessieTableOperations extends
BaseMetastoreTableOperations {
}
private static void
maybeThrowSpecializedException(NessieReferenceConflictException ex) {
- // Check if the server returned 'ReferenceConflicts' information
- ReferenceConflicts referenceConflicts = ex.getErrorDetails();
- if (referenceConflicts == null) {
- return;
- }
-
- // Can only narrow down to a single exception, if there is only one
conflict.
- List<Conflict> conflicts = referenceConflicts.conflicts();
- if (conflicts.size() != 1) {
- return;
- }
-
- Conflict conflict = conflicts.get(0);
- ConflictType conflictType = conflict.conflictType();
- if (conflictType != null) {
- switch (conflictType) {
- case NAMESPACE_ABSENT:
- throw new NoSuchNamespaceException(ex, "Namespace does not exist:
%s", conflict.key());
- case NAMESPACE_NOT_EMPTY:
- throw new NamespaceNotEmptyException(ex, "Namespace not empty: %s",
conflict.key());
- case KEY_DOES_NOT_EXIST:
- throw new NoSuchTableException(ex, "Table or view does not exist:
%s", conflict.key());
- case KEY_EXISTS:
- throw new AlreadyExistsException(ex, "Table or view already exists:
%s", conflict.key());
- default:
- // Explicit fall-through
- break;
- }
- }
+ NessieUtil.extractSingleConflict(
+ ex,
+ EnumSet.of(
+ Conflict.ConflictType.NAMESPACE_ABSENT,
+ Conflict.ConflictType.NAMESPACE_NOT_EMPTY,
+ Conflict.ConflictType.KEY_DOES_NOT_EXIST,
+ Conflict.ConflictType.KEY_EXISTS))
+ .ifPresent(
+ conflict -> {
+ switch (conflict.conflictType()) {
+ case NAMESPACE_ABSENT:
+ throw new NoSuchNamespaceException(
+ ex, "Namespace does not exist: %s", conflict.key());
+ case NAMESPACE_NOT_EMPTY:
+ throw new NamespaceNotEmptyException(
+ ex, "Namespace not empty: %s", conflict.key());
+ case KEY_DOES_NOT_EXIST:
+ throw new NoSuchTableException(
+ ex, "Table or view does not exist: %s", conflict.key());
+ case KEY_EXISTS:
+ throw new AlreadyExistsException(
+ ex, "Table or view already exists: %s", conflict.key());
+ default:
+ // Explicit fall-through
+ break;
+ }
+ });
}
@Override
diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java
b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java
index 8c69f844db..3c3b0afd64 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java
@@ -19,9 +19,11 @@
package org.apache.iceberg.nessie;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.SnapshotRef;
@@ -32,7 +34,10 @@ import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.projectnessie.error.NessieReferenceConflictException;
+import org.projectnessie.error.ReferenceConflicts;
import org.projectnessie.model.CommitMeta;
+import org.projectnessie.model.Conflict;
import org.projectnessie.model.ContentKey;
import org.projectnessie.model.IcebergTable;
import org.projectnessie.model.ImmutableCommitMeta;
@@ -165,4 +170,24 @@ public final class NessieUtil {
return builder.discardChanges().build();
}
+
+ public static Optional<Conflict> extractSingleConflict(
+ NessieReferenceConflictException ex, Collection<Conflict.ConflictType>
handledConflictTypes) {
+ // Check if the server returned 'ReferenceConflicts' information
+ ReferenceConflicts referenceConflicts = ex.getErrorDetails();
+ if (referenceConflicts == null) {
+ return Optional.empty();
+ }
+
+ List<Conflict> conflicts =
+ referenceConflicts.conflicts().stream()
+ .filter(c -> handledConflictTypes.contains(c.conflictType()))
+ .collect(Collectors.toList());
+ if (conflicts.size() != 1) {
+ return Optional.empty();
+ }
+
+ Conflict conflict = conflicts.get(0);
+ return Optional.of(conflict);
+ }
}
diff --git
a/nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java
b/nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java
index fb7f4ea309..b6ae90650e 100644
--- a/nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java
+++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java
@@ -22,8 +22,8 @@ import static
org.apache.iceberg.types.Types.NestedField.required;
import java.io.IOException;
import java.net.URI;
-import java.util.AbstractMap;
import java.util.Collections;
+import java.util.Map;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
@@ -31,6 +31,7 @@ import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
@@ -39,6 +40,9 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.projectnessie.client.ext.NessieClientFactory;
import org.projectnessie.client.ext.NessieClientUri;
+import org.projectnessie.error.NessieConflictException;
+import org.projectnessie.error.NessieNotFoundException;
+import org.projectnessie.model.Branch;
public class TestMultipleClients extends BaseTestIceberg {
@@ -67,33 +71,87 @@ public class TestMultipleClients extends BaseTestIceberg {
}
@Test
- public void testListNamespaces() {
+ public void testListNamespaces() throws NessieConflictException,
NessieNotFoundException {
+ Assertions.assertThat(catalog.listNamespaces()).isEmpty();
+ Assertions.assertThat(anotherCatalog.listNamespaces()).isEmpty();
+
+ // listing a non-existent namespace should return empty
+
Assertions.assertThat(catalog.listNamespaces(Namespace.of("db1"))).isEmpty();
+
Assertions.assertThat(anotherCatalog.listNamespaces(Namespace.of("db1"))).isEmpty();
+
catalog.createNamespace(Namespace.of("db1"), Collections.emptyMap());
+
Assertions.assertThat(catalog.listNamespaces()).containsExactlyInAnyOrder(Namespace.of("db1"));
+ Assertions.assertThat(anotherCatalog.listNamespaces())
+ .containsExactlyInAnyOrder(Namespace.of("db1"));
// another client creates a namespace with the same nessie server
anotherCatalog.createNamespace(Namespace.of("db2"),
Collections.emptyMap());
- Assertions.assertThat(anotherCatalog.listNamespaces())
- .containsExactlyInAnyOrder(Namespace.of("db1"), Namespace.of("db2"));
Assertions.assertThat(catalog.listNamespaces())
.containsExactlyInAnyOrder(Namespace.of("db1"), Namespace.of("db2"));
+ Assertions.assertThat(anotherCatalog.listNamespaces())
+ .containsExactlyInAnyOrder(Namespace.of("db1"), Namespace.of("db2"));
+
+ api.deleteBranch().branch((Branch)
api.getReference().refName(branch).get()).delete();
+
+ Assertions.assertThatThrownBy(() -> catalog.listNamespaces())
+ .isInstanceOf(NoSuchNamespaceException.class)
+ .hasMessageContaining(
+ "Cannot list top-level namespaces: ref '%s' is no longer valid",
branch);
+ Assertions.assertThatThrownBy(() ->
anotherCatalog.listNamespaces(Namespace.of("db1")))
+ .isInstanceOf(NoSuchNamespaceException.class)
+ .hasMessageContaining(
+ "Cannot list child namespaces from 'db1': ref '%s' is no longer
valid", branch);
}
@Test
- public void testLoadNamespaceMetadata() {
+ public void testLoadNamespaceMetadata() throws NessieConflictException,
NessieNotFoundException {
+ Assertions.assertThatThrownBy(() ->
catalog.loadNamespaceMetadata(Namespace.of("namespace1")))
+ .isInstanceOf(NoSuchNamespaceException.class)
+ .hasMessageContaining("Namespace does not exist: namespace1");
+ Assertions.assertThatThrownBy(
+ () ->
anotherCatalog.loadNamespaceMetadata(Namespace.of("namespace1")))
+ .isInstanceOf(NoSuchNamespaceException.class)
+ .hasMessageContaining("Namespace does not exist: namespace1");
+
catalog.createNamespace(Namespace.of("namespace1"),
Collections.emptyMap());
+
+ // both clients should see the namespace because we read the HEAD of the
ref
Assertions.assertThat(catalog.listNamespaces())
.containsExactlyInAnyOrder(Namespace.of("namespace1"));
+ Assertions.assertThat(anotherCatalog.listNamespaces())
+ .containsExactlyInAnyOrder(Namespace.of("namespace1"));
- // another client adds a metadata to the same namespace
- anotherCatalog.setProperties(Namespace.of("namespace1"),
Collections.singletonMap("k1", "v1"));
- AbstractMap.SimpleEntry<String, String> entry = new
AbstractMap.SimpleEntry<>("k1", "v1");
+ // the other client should not be able to update the namespace
+ // because it is still on the old ref hash
+ Assertions.assertThatThrownBy(
+ () ->
+ anotherCatalog.setProperties(
+ Namespace.of("namespace1"), Collections.singletonMap("k1",
"v1")))
+ .isInstanceOf(NoSuchNamespaceException.class)
+ .hasMessageContaining("Namespace does not exist: namespace1");
+ // the same client adds a metadata to the namespace: expect success
+ catalog.setProperties(Namespace.of("namespace1"),
Collections.singletonMap("k1", "v1"));
+
+ // load metadata from the same client and another client both should work
fine
+ // because we read the HEAD of the ref
Assertions.assertThat(anotherCatalog.loadNamespaceMetadata(Namespace.of("namespace1")))
- .containsExactly(entry);
-
+ .containsExactly(Map.entry("k1", "v1"));
Assertions.assertThat(catalog.loadNamespaceMetadata(Namespace.of("namespace1")))
- .containsExactly(entry);
+ .containsExactly(Map.entry("k1", "v1"));
+
+ api.deleteBranch().branch((Branch)
api.getReference().refName(branch).get()).delete();
+
+ Assertions.assertThatThrownBy(() ->
catalog.loadNamespaceMetadata(Namespace.of("namespace1")))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining(
+ "Cannot load namespace 'namespace1': ref '%s' is no longer valid",
branch);
+ Assertions.assertThatThrownBy(
+ () ->
anotherCatalog.loadNamespaceMetadata(Namespace.of("namespace1")))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining(
+ "Cannot load namespace 'namespace1': ref '%s' is no longer valid",
branch);
}
@Test
diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestNamespace.java
b/nessie/src/test/java/org/apache/iceberg/nessie/TestNamespace.java
index 8267329678..0b1af9763d 100644
--- a/nessie/src/test/java/org/apache/iceberg/nessie/TestNamespace.java
+++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestNamespace.java
@@ -106,7 +106,7 @@ public class TestNamespace extends BaseTestIceberg {
Assertions.assertThatThrownBy(() -> catalog.dropNamespace(namespace))
.isInstanceOf(NamespaceNotEmptyException.class)
- .hasMessage("Namespace 'test' is not empty. One or more tables
exist.");
+ .hasMessageContaining("Namespace 'test' is not empty");
catalog.dropTable(identifier, true);
catalog.dropNamespace(namespace);
diff --git
a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java
b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java
index f1d6159d93..d4e079ad26 100644
---
a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java
+++
b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java
@@ -18,14 +18,36 @@
*/
package org.apache.iceberg.nessie;
+import static org.apache.iceberg.types.Types.NestedField.required;
+
import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
+import org.assertj.core.data.Index;
import org.junit.jupiter.api.Test;
import org.projectnessie.error.NessieConflictException;
import org.projectnessie.error.NessieNotFoundException;
import org.projectnessie.model.Branch;
+import org.projectnessie.model.CommitMeta;
+import org.projectnessie.model.Content;
+import org.projectnessie.model.ContentKey;
+import org.projectnessie.model.IcebergTable;
+import org.projectnessie.model.LogResponse;
+import org.projectnessie.model.Operation;
import org.projectnessie.model.Reference;
public class TestNessieIcebergClient extends BaseTestIceberg {
@@ -80,7 +102,7 @@ public class TestNessieIcebergClient extends BaseTestIceberg
{
client
.getApi()
.deleteBranch()
- .branch((Branch) client.getApi().getReference().refName(branch).get())
+ .branch((Branch) api.getReference().refName(branch).get())
.delete();
createBranch(branch);
@@ -91,15 +113,443 @@ public class TestNessieIcebergClient extends
BaseTestIceberg {
Assertions.assertThat(client.withReference(branch,
null)).isNotEqualTo(client);
}
+ @Test
+ public void testCreateNamespace() throws NessieConflictException,
NessieNotFoundException {
+ String branch = "createNamespaceBranch";
+ createBranch(branch);
+ Map<String, String> catalogOptions =
+ Map.of(
+ CatalogProperties.USER, "iceberg-user",
+ CatalogProperties.APP_ID, "iceberg-nessie");
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
catalogOptions);
+
+ client.createNamespace(Namespace.of("a"), Map.of());
+
Assertions.assertThat(client.listNamespaces(Namespace.of("a"))).isNotNull();
+
+ List<LogResponse.LogEntry> entries =
api.getCommitLog().refName(branch).get().getLogEntries();
+ Assertions.assertThat(entries)
+ .isNotEmpty()
+ .first()
+ .extracting(LogResponse.LogEntry::getCommitMeta)
+ .extracting(CommitMeta::getMessage, CommitMeta::getAuthor,
CommitMeta::getProperties)
+ .containsExactly(
+ "create namespace a",
+ "iceberg-user",
+ ImmutableMap.of(
+ "application-type", "iceberg",
+ "app-id", "iceberg-nessie"));
+ }
+
+ @Test
+ public void testCreateNamespaceInvalid() throws NessieConflictException,
NessieNotFoundException {
+ String branch = "createNamespaceInvalidBranch";
+ createBranch(branch);
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
Map.of());
+
+ Assertions.assertThatIllegalArgumentException()
+ .isThrownBy(() -> client.createNamespace(Namespace.empty(), Map.of()))
+ .withMessageContaining("Creating empty namespaces is not supported");
+
+ Assertions.assertThatThrownBy(() ->
client.createNamespace(Namespace.of("a", "b"), Map.of()))
+ .isInstanceOf(NoSuchNamespaceException.class)
+ .hasMessageContaining("Cannot create namespace 'a.b': parent namespace
'a' does not exist");
+ }
+
+ @Test
+ public void testCreateNamespaceConflict()
+ throws NessieConflictException, NessieNotFoundException {
+ String branch = "createNamespaceConflictBranch";
+ createBranch(branch);
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
Map.of());
+
+ client.createNamespace(Namespace.of("a"), Map.of());
+
+ Assertions.assertThatThrownBy(() ->
client.createNamespace(Namespace.of("a"), Map.of()))
+ .isInstanceOf(AlreadyExistsException.class)
+ .hasMessageContaining("Namespace already exists: a");
+
+ client.commitTable(
+ null, newTableMetadata(), "file:///tmp/iceberg", (String) null,
ContentKey.of("a", "tbl"));
+
+ Assertions.assertThatThrownBy(() ->
client.createNamespace(Namespace.of("a", "tbl"), Map.of()))
+ .isInstanceOf(AlreadyExistsException.class)
+ .hasMessageContaining("Another content object with name 'a.tbl'
already exists");
+ }
+
+ @Test
+ public void testCreateNamespaceExternalConflict()
+ throws NessieConflictException, NessieNotFoundException {
+ String branch = "createNamespaceExternalConflictBranch";
+ createBranch(branch);
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
Map.of());
+
+ org.projectnessie.model.Namespace nessieNs =
+ org.projectnessie.model.Namespace.of(ContentKey.of("a"));
+ commit(branch, "create namespace a", Operation.Put.of(ContentKey.of("a"),
nessieNs));
+
+ Assertions.assertThatThrownBy(() ->
client.createNamespace(Namespace.of("a"), Map.of()))
+ .isInstanceOf(AlreadyExistsException.class)
+ .hasMessageContaining("Namespace already exists: a");
+
+ IcebergTable table = IcebergTable.of("file:///tmp/iceberg", 1, 1, 1, 1);
+ commit(branch, "create table a.tbl2", Operation.Put.of(ContentKey.of("a",
"tbl"), table));
+
+ Assertions.assertThatThrownBy(() ->
client.createNamespace(Namespace.of("a", "tbl"), Map.of()))
+ .isInstanceOf(AlreadyExistsException.class)
+ .hasMessageContaining("Another content object with name 'a.tbl'
already exists");
+ }
+
+ @Test
+ public void testCreateNamespaceNonExistingRef()
+ throws NessieConflictException, NessieNotFoundException {
+ String branch = "createNamespaceNonExistingRefBranch";
+ createBranch(branch);
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
Map.of());
+
+ client.createNamespace(Namespace.of("a"), Map.of());
+
+ api.deleteBranch().branch((Branch)
api.getReference().refName(branch).get()).delete();
+
+ Assertions.assertThatThrownBy(() ->
client.createNamespace(Namespace.of("b"), Map.of()))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining(
+ "Cannot create namespace 'b': ref
'createNamespaceNonExistingRefBranch' is no longer valid");
+ }
+
+ @Test
+ public void testDropNamespace() throws NessieConflictException,
NessieNotFoundException {
+ String branch = "dropNamespaceBranch";
+ createBranch(branch);
+ Map<String, String> catalogOptions =
+ Map.of(
+ CatalogProperties.USER, "iceberg-user",
+ CatalogProperties.APP_ID, "iceberg-nessie");
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
catalogOptions);
+
+ Namespace parent = Namespace.of("a");
+ Namespace child = Namespace.of("a", "b");
+
+ Assertions.assertThat(client.dropNamespace(parent)).isFalse();
+ Assertions.assertThat(client.dropNamespace(child)).isFalse();
+
+ client.createNamespace(parent, Map.of());
+ client.createNamespace(child, Map.of());
+
+ Assertions.assertThat(client.dropNamespace(child)).isTrue();
+ Assertions.assertThat(client.dropNamespace(parent)).isTrue();
+
+ List<LogResponse.LogEntry> entries =
api.getCommitLog().refName(branch).get().getLogEntries();
+ Assertions.assertThat(entries)
+ .isNotEmpty()
+ .extracting(LogResponse.LogEntry::getCommitMeta)
+ .satisfies(
+ meta -> {
+ Assertions.assertThat(meta.getMessage()).contains("drop
namespace a");
+
Assertions.assertThat(meta.getAuthor()).isEqualTo("iceberg-user");
+ Assertions.assertThat(meta.getProperties())
+ .containsEntry(NessieUtil.APPLICATION_TYPE, "iceberg")
+ .containsEntry(CatalogProperties.APP_ID, "iceberg-nessie");
+ },
+ Index.atIndex(0))
+ .satisfies(
+ meta -> {
+ Assertions.assertThat(meta.getMessage()).contains("drop
namespace a.b");
+
Assertions.assertThat(meta.getAuthor()).isEqualTo("iceberg-user");
+ Assertions.assertThat(meta.getProperties())
+ .containsEntry(NessieUtil.APPLICATION_TYPE, "iceberg")
+ .containsEntry(CatalogProperties.APP_ID, "iceberg-nessie");
+ },
+ Index.atIndex(1));
+ }
+
+ @Test
+ public void testDropNamespaceNotEmpty() throws NessieConflictException,
NessieNotFoundException {
+ String branch = "dropNamespaceInvalidBranch";
+ createBranch(branch);
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
Map.of());
+
+ client.createNamespace(Namespace.of("a"), Map.of());
+ client.createNamespace(Namespace.of("a", "b"), Map.of());
+
+ Assertions.assertThatThrownBy(() ->
client.dropNamespace(Namespace.of("a")))
+ .isInstanceOf(NamespaceNotEmptyException.class)
+ .hasMessageContaining("Namespace 'a' is not empty.");
+ }
+
+ @Test
+ public void testDropNamespaceConflict() throws NessieConflictException,
NessieNotFoundException {
+ String branch = "dropNamespaceConflictBranch";
+ createBranch(branch);
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
Map.of());
+
+ client.createNamespace(Namespace.of("a"), Map.of());
+
+ client.commitTable(
+ null, newTableMetadata(), "file:///tmp/iceberg", (String) null,
ContentKey.of("a", "tbl"));
+
+ Assertions.assertThatThrownBy(() -> client.dropNamespace(Namespace.of("a",
"tbl")))
+ .isInstanceOf(NoSuchNamespaceException.class)
+ .hasMessageContaining("Content object with name 'a.tbl' is not a
namespace.");
+ }
+
+ @Test
+ public void testDropNamespaceExternalConflict()
+ throws NessieConflictException, NessieNotFoundException {
+ String branch = "dropNamespaceExternalConflictBranch";
+ createBranch(branch);
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
Map.of());
+
+ client.createNamespace(Namespace.of("a"), Map.of());
+
+ org.projectnessie.model.Namespace original =
fetchNamespace(ContentKey.of("a"), branch);
+ org.projectnessie.model.Namespace updated =
+ org.projectnessie.model.Namespace.builder()
+ .from(original)
+ .properties(Map.of("k1", "v1"))
+ .build();
+ commit(branch, "update namespace a", Operation.Put.of(ContentKey.of("a"),
updated));
+
+ Assertions.assertThatThrownBy(() ->
client.dropNamespace(Namespace.of("a")))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining(
+ "Cannot drop namespace 'a': Values of existing and expected
content for key 'a' are different.");
+ }
+
+ @Test
+ public void testDropNamespaceNonExistingRef()
+ throws NessieConflictException, NessieNotFoundException {
+ String branch = "dropNamespaceNonExistingRefBranch";
+ createBranch(branch);
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
Map.of());
+
+ client.createNamespace(Namespace.of("a"), Map.of());
+
+ api.deleteBranch().branch((Branch)
api.getReference().refName(branch).get()).delete();
+
+ Assertions.assertThat(client.dropNamespace(Namespace.of("a"))).isFalse();
+ }
+
+ @Test
+ public void testSetProperties() throws NessieConflictException,
NessieNotFoundException {
+ String branch = "setPropertiesBranch";
+ createBranch(branch);
+ Map<String, String> catalogOptions =
+ Map.of(
+ CatalogProperties.USER, "iceberg-user",
+ CatalogProperties.APP_ID, "iceberg-nessie");
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
catalogOptions);
+
+ Namespace ns = Namespace.of("a");
+ client.createNamespace(ns, Map.of("k1", "v1a"));
+
+ Assertions.assertThat(client.setProperties(ns, Map.of("k1", "v1b", "k2",
"v2"))).isTrue();
+
+ Assertions.assertThat(client.loadNamespaceMetadata(ns))
+ .hasSize(2)
+ .containsEntry("k1", "v1b")
+ .containsEntry("k2", "v2");
+
+ List<LogResponse.LogEntry> entries =
api.getCommitLog().refName(branch).get().getLogEntries();
+ Assertions.assertThat(entries)
+ .isNotEmpty()
+ .first()
+ .extracting(LogResponse.LogEntry::getCommitMeta)
+ .satisfies(
+ meta -> {
+ Assertions.assertThat(meta.getMessage()).contains("update
namespace a");
+
Assertions.assertThat(meta.getAuthor()).isEqualTo("iceberg-user");
+ Assertions.assertThat(meta.getProperties())
+ .containsEntry(NessieUtil.APPLICATION_TYPE, "iceberg")
+ .containsEntry(CatalogProperties.APP_ID, "iceberg-nessie");
+ });
+ }
+
+ @Test
+ public void testSetPropertiesExternalConflict()
+ throws NessieConflictException, NessieNotFoundException {
+ String branch = "setPropertiesExternalConflictBranch";
+ createBranch(branch);
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
Map.of());
+
+ Namespace ns = Namespace.of("a");
+ client.createNamespace(ns, Map.of("k1", "v1a"));
+
+ ContentKey key = ContentKey.of("a");
+ org.projectnessie.model.Namespace original = fetchNamespace(key, branch);
+ org.projectnessie.model.Namespace updated =
+ org.projectnessie.model.Namespace.builder()
+ .from(original)
+ .properties(Map.of("k1", "v1b", "k2", "v2"))
+ .build();
+ commit(branch, "update namespace a", Operation.Put.of(key, updated));
+
+ // will generate a conflict and a retry
+ Assertions.assertThat(client.setProperties(ns, Map.of("k1", "v1c", "k3",
"v3"))).isTrue();
+
+ Assertions.assertThat(client.loadNamespaceMetadata(ns))
+ .hasSize(3)
+ .containsEntry("k1", "v1c")
+ .containsEntry("k2", "v2")
+ .containsEntry("k3", "v3");
+ }
+
+ @Test
+ public void testSetPropertiesNonExistingNs()
+ throws NessieConflictException, NessieNotFoundException {
+ String branch = "setPropertiesNonExistingNsBranch";
+ createBranch(branch);
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
Map.of());
+
+ client.createNamespace(Namespace.of("a"), Map.of());
+
+ commit(branch, "delete namespace a",
Operation.Delete.of(ContentKey.of("a")));
+
+ Assertions.assertThatThrownBy(
+ () -> client.setProperties(Namespace.of("a"), Map.of("k1", "v1a")))
+ .isInstanceOf(NoSuchNamespaceException.class)
+ .hasMessageContaining("Namespace does not exist: a");
+ }
+
+ @Test
+ public void testSetPropertiesNonExistingRef()
+ throws NessieConflictException, NessieNotFoundException {
+ String branch = "setPropertiesNonExistingRefBranch";
+ createBranch(branch);
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
Map.of());
+
+ client.createNamespace(Namespace.of("a"), Map.of());
+
+ api.deleteBranch().branch((Branch)
api.getReference().refName(branch).get()).delete();
+
+ Assertions.assertThatThrownBy(() ->
client.setProperties(Namespace.of("a"), Map.of("k1", "v1")))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining(
+ "Cannot update properties on namespace 'a': ref
'setPropertiesNonExistingRefBranch' is no longer valid");
+ }
+
+ @Test
+ public void testRemoveProperties() throws NessieConflictException,
NessieNotFoundException {
+ String branch = "removePropertiesBranch";
+ createBranch(branch);
+ Map<String, String> catalogOptions =
+ Map.of(
+ CatalogProperties.USER, "iceberg-user",
+ CatalogProperties.APP_ID, "iceberg-nessie");
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
catalogOptions);
+
+ Namespace ns = Namespace.of("a");
+
+ client.createNamespace(ns, Map.of("k1", "v1", "k2", "v2"));
+
+ Assertions.assertThat(client.removeProperties(ns, Set.of("k1"))).isTrue();
+
+
Assertions.assertThat(client.loadNamespaceMetadata(ns)).hasSize(1).containsOnlyKeys("k2");
+
+ List<LogResponse.LogEntry> entries =
api.getCommitLog().refName(branch).get().getLogEntries();
+ Assertions.assertThat(entries)
+ .isNotEmpty()
+ .first()
+ .extracting(LogResponse.LogEntry::getCommitMeta)
+ .satisfies(
+ meta -> {
+ Assertions.assertThat(meta.getMessage()).contains("update
namespace a");
+
Assertions.assertThat(meta.getAuthor()).isEqualTo("iceberg-user");
+ Assertions.assertThat(meta.getProperties())
+ .containsEntry(NessieUtil.APPLICATION_TYPE, "iceberg")
+ .containsEntry(CatalogProperties.APP_ID, "iceberg-nessie");
+ });
+ }
+
+ @Test
+ public void testRemovePropertiesExternalConflict()
+ throws NessieConflictException, NessieNotFoundException {
+ String branch = "removePropertiesExternalConflictBranch";
+ createBranch(branch);
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
Map.of());
+
+ Namespace ns = Namespace.of("a");
+ client.createNamespace(ns, Map.of("k1", "v1"));
+
+ ContentKey key = ContentKey.of("a");
+ org.projectnessie.model.Namespace original = fetchNamespace(key, branch);
+ org.projectnessie.model.Namespace updated =
+ org.projectnessie.model.Namespace.builder()
+ .from(original)
+ .properties(Map.of("k2", "v2", "k3", "v3"))
+ .build();
+ commit(branch, "update namespace a", Operation.Put.of(key, updated));
+
+ // will generate a conflict and a retry
+ Assertions.assertThat(client.removeProperties(ns, Set.of("k2"))).isTrue();
+
+
Assertions.assertThat(client.loadNamespaceMetadata(ns)).hasSize(1).containsOnlyKeys("k3");
+ }
+
+ @Test
+ public void testRemovePropertiesNonExistingNs()
+ throws NessieConflictException, NessieNotFoundException {
+ String branch = "removePropertiesNonExistingNsBranch";
+ createBranch(branch);
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
Map.of());
+
+ client.createNamespace(Namespace.of("a"), Map.of("k1", "v1"));
+
+ commit(branch, "delete namespace a",
Operation.Delete.of(ContentKey.of("a")));
+
+ Assertions.assertThatThrownBy(() ->
client.removeProperties(Namespace.of("a"), Set.of("k1")))
+ .isInstanceOf(NoSuchNamespaceException.class)
+ .hasMessageContaining("Namespace does not exist: a");
+ }
+
+ @Test
+ public void testRemovePropertiesNonExistingRef()
+ throws NessieConflictException, NessieNotFoundException {
+ String branch = "removePropertiesNonExistingRefBranch";
+ createBranch(branch);
+ NessieIcebergClient client = new NessieIcebergClient(api, branch, null,
Map.of());
+
+ client.createNamespace(Namespace.of("a"), Map.of("k1", "v1"));
+
+ api.deleteBranch().branch((Branch)
api.getReference().refName(branch).get()).delete();
+
+ Assertions.assertThatThrownBy(() ->
client.removeProperties(Namespace.of("a"), Set.of("k1")))
+ .isInstanceOf(RuntimeException.class)
+ .hasMessageContaining(
+ "Cannot update properties on namespace 'a': ref
'removePropertiesNonExistingRefBranch' is no longer valid");
+ }
+
@Test
public void testInvalidClientApiVersion() throws IOException {
try (NessieCatalog newCatalog = new NessieCatalog()) {
newCatalog.setConf(hadoopConfig);
ImmutableMap.Builder<String, String> options =
ImmutableMap.<String, String>builder().put("client-api-version",
"3");
- Assertions.assertThatThrownBy(() -> newCatalog.initialize("nessie",
options.buildOrThrow()))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessage("Unsupported client-api-version: 3. Can only be 1 or 2");
+ Assertions.assertThatIllegalArgumentException()
+ .isThrownBy(() -> newCatalog.initialize("nessie",
options.buildOrThrow()))
+ .withMessage("Unsupported client-api-version: 3. Can only be 1 or
2");
}
}
+
+ private void commit(String branch, String message, Operation... operations)
+ throws NessieNotFoundException, NessieConflictException {
+ Branch ref = (Branch) api.getReference().refName(branch).get();
+ api.commitMultipleOperations()
+ .branch(ref)
+ .commitMeta(NessieUtil.buildCommitMetadata(message, Map.of()))
+ .operations(Arrays.asList(operations))
+ .commit();
+ }
+
+ private org.projectnessie.model.Namespace fetchNamespace(ContentKey key,
String branch)
+ throws NessieNotFoundException {
+ Reference reference = api.getReference().refName(branch).get();
+ Content content =
api.getContent().key(key).reference(reference).get().get(key);
+ return
content.unwrap(org.projectnessie.model.Namespace.class).orElseThrow();
+ }
+
+ private static TableMetadata newTableMetadata() {
+ Schema schema = new Schema(required(1, "id", Types.LongType.get()));
+ return TableMetadata.newTableMetadata(
+ schema, PartitionSpec.unpartitioned(), SortOrder.unsorted(), null,
Map.of());
+ }
}