This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new ea7665e788 Nessie: Reimplement namespace operations (#8857)
ea7665e788 is described below

commit ea7665e788cb33507e6eb19eddd4f0c24eb92983
Author: Alexandre Dutra <[email protected]>
AuthorDate: Thu Dec 7 12:34:21 2023 +0100

    Nessie: Reimplement namespace operations (#8857)
    
    This change enhances the process of creating new namespaces by
    retaining commit authorship information when committing the new
    namespace.
---
 .../apache/iceberg/nessie/NessieIcebergClient.java | 371 +++++++++++------
 .../iceberg/nessie/NessieTableOperations.java      |  60 ++-
 .../java/org/apache/iceberg/nessie/NessieUtil.java |  25 ++
 .../apache/iceberg/nessie/TestMultipleClients.java |  80 +++-
 .../org/apache/iceberg/nessie/TestNamespace.java   |   2 +-
 .../iceberg/nessie/TestNessieIcebergClient.java    | 458 ++++++++++++++++++++-
 6 files changed, 819 insertions(+), 177 deletions(-)

diff --git 
a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java 
b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
index 26b3701816..4cbbe4a562 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java
@@ -19,12 +19,17 @@
 package org.apache.iceberg.nessie;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.catalog.Namespace;
@@ -36,24 +41,25 @@ import 
org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.relocated.com.google.common.base.Suppliers;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.util.Tasks;
 import org.projectnessie.client.NessieConfigConstants;
 import org.projectnessie.client.api.CommitMultipleOperationsBuilder;
+import org.projectnessie.client.api.GetContentBuilder;
 import org.projectnessie.client.api.NessieApiV1;
 import org.projectnessie.client.api.OnReferenceBuilder;
 import org.projectnessie.client.http.HttpClientException;
 import org.projectnessie.error.BaseNessieClientServerException;
 import org.projectnessie.error.NessieConflictException;
-import org.projectnessie.error.NessieNamespaceAlreadyExistsException;
-import org.projectnessie.error.NessieNamespaceNotEmptyException;
-import org.projectnessie.error.NessieNamespaceNotFoundException;
+import org.projectnessie.error.NessieContentNotFoundException;
 import org.projectnessie.error.NessieNotFoundException;
+import org.projectnessie.error.NessieReferenceConflictException;
 import org.projectnessie.error.NessieReferenceNotFoundException;
 import org.projectnessie.model.Branch;
+import org.projectnessie.model.Conflict;
 import org.projectnessie.model.Content;
 import org.projectnessie.model.ContentKey;
 import org.projectnessie.model.EntriesResponse;
-import org.projectnessie.model.GetNamespacesResponse;
 import org.projectnessie.model.IcebergTable;
 import org.projectnessie.model.ImmutableCommitMeta;
 import org.projectnessie.model.ImmutableIcebergTable;
@@ -181,133 +187,214 @@ public class NessieIcebergClient implements 
AutoCloseable {
   }
 
   public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    getRef().checkMutable();
+    if (namespace.isEmpty()) {
+      throw new IllegalArgumentException("Creating empty namespaces is not 
supported");
+    }
+    ContentKey key = ContentKey.of(namespace.levels());
+    org.projectnessie.model.Namespace content =
+        org.projectnessie.model.Namespace.of(key.getElements(), metadata);
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .createNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-                  .properties(metadata))
-          .create();
-      refresh();
-    } catch (NessieNamespaceAlreadyExistsException e) {
-      throw new AlreadyExistsException(e, "Namespace already exists: %s", 
namespace);
+      Content existing = 
api.getContent().reference(getReference()).key(key).get().get(key);
+      if (existing != null) {
+        throw namespaceAlreadyExists(key, existing, null);
+      }
+      try {
+        commitRetry("create namespace " + key, Operation.Put.of(key, content));
+      } catch (NessieReferenceConflictException e) {
+        Optional<Conflict> conflict =
+            NessieUtil.extractSingleConflict(
+                e,
+                EnumSet.of(
+                    Conflict.ConflictType.KEY_EXISTS, 
Conflict.ConflictType.NAMESPACE_ABSENT));
+        if (conflict.isPresent()) {
+          switch (conflict.get().conflictType()) {
+            case KEY_EXISTS:
+              Content conflicting = 
withReference(api.getContent()).key(key).get().get(key);
+              throw namespaceAlreadyExists(key, conflicting, e);
+            case NAMESPACE_ABSENT:
+              throw new NoSuchNamespaceException(
+                  e,
+                  "Cannot create namespace '%s': parent namespace '%s' does 
not exist",
+                  namespace,
+                  conflict.get().key());
+          }
+        }
+        throw new RuntimeException(
+            String.format("Cannot create namespace '%s': %s", namespace, 
e.getMessage()));
+      }
     } catch (NessieNotFoundException e) {
       throw new RuntimeException(
           String.format(
-              "Cannot create Namespace '%s': " + "ref '%s' is no longer 
valid.",
+              "Cannot create namespace '%s': ref '%s' is no longer valid.",
               namespace, getRef().getName()),
           e);
+    } catch (BaseNessieClientServerException e) {
+      throw new RuntimeException(
+          String.format("Cannot create namespace '%s': %s", namespace, 
e.getMessage()), e);
     }
   }
 
   public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
     try {
-      GetNamespacesResponse response =
-          withReference(
-                  getApi()
-                      .getMultipleNamespaces()
-                      
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
-              .get();
-      return response.getNamespaces().stream()
-          .map(ns -> Namespace.of(ns.getElements().toArray(new String[0])))
-          .filter(ns -> ns.length() == namespace.length() + 1)
+      String filter = "entry.contentType == 'NAMESPACE' && ";
+      if (namespace.isEmpty()) {
+        filter += "size(entry.keyElements) == 1";
+      } else {
+        org.projectnessie.model.Namespace root =
+            org.projectnessie.model.Namespace.of(namespace.levels());
+        filter +=
+            String.format(
+                "size(entry.keyElements) == %d && 
entry.encodedKey.startsWith('%s.')",
+                root.getElementCount() + 1, root.name());
+      }
+      List<ContentKey> entries =
+          withReference(api.getEntries()).filter(filter).stream()
+              .map(EntriesResponse.Entry::getName)
+              .collect(Collectors.toList());
+      if (entries.isEmpty()) {
+        return Collections.emptyList();
+      }
+      GetContentBuilder getContent = withReference(api.getContent());
+      entries.forEach(getContent::key);
+      return getContent.get().values().stream()
+          .map(v -> v.unwrap(org.projectnessie.model.Namespace.class))
+          .filter(Optional::isPresent)
+          .map(Optional::get)
+          .map(v -> Namespace.of(v.getElements().toArray(new String[0])))
           .collect(Collectors.toList());
-    } catch (NessieReferenceNotFoundException e) {
-      throw new RuntimeException(
-          String.format(
-              "Cannot list Namespaces starting from '%s': " + "ref '%s' is no 
longer valid.",
-              namespace, getRef().getName()),
-          e);
+    } catch (NessieNotFoundException e) {
+      if (namespace.isEmpty()) {
+        throw new NoSuchNamespaceException(
+            e,
+            "Cannot list top-level namespaces: ref '%s' is no longer valid.",
+            getRef().getName());
+      }
+      throw new NoSuchNamespaceException(
+          e,
+          "Cannot list child namespaces from '%s': ref '%s' is no longer 
valid.",
+          namespace,
+          getRef().getName());
     }
   }
 
   public boolean dropNamespace(Namespace namespace) throws 
NamespaceNotEmptyException {
+    getRef().checkMutable();
+    ContentKey key = ContentKey.of(namespace.levels());
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .deleteNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
-          .delete();
-      refresh();
-      return true;
-    } catch (NessieNamespaceNotFoundException e) {
-      return false;
+      Map<ContentKey, Content> contentMap =
+          api.getContent().reference(getReference()).key(key).get();
+      Content existing = contentMap.get(key);
+      if (existing != null && 
!existing.getType().equals(Content.Type.NAMESPACE)) {
+        throw new NoSuchNamespaceException(
+            "Content object with name '%s' is not a namespace.", namespace);
+      }
+      try {
+        commitRetry("drop namespace " + key, Operation.Delete.of(key));
+        return true;
+      } catch (NessieReferenceConflictException e) {
+        Optional<Conflict> conflict =
+            NessieUtil.extractSingleConflict(
+                e,
+                EnumSet.of(
+                    Conflict.ConflictType.KEY_DOES_NOT_EXIST,
+                    Conflict.ConflictType.NAMESPACE_NOT_EMPTY));
+        if (conflict.isPresent()) {
+          Conflict.ConflictType conflictType = conflict.get().conflictType();
+          switch (conflictType) {
+            case KEY_DOES_NOT_EXIST:
+              return false;
+            case NAMESPACE_NOT_EMPTY:
+              throw new NamespaceNotEmptyException(e, "Namespace '%s' is not 
empty.", namespace);
+          }
+        }
+        throw new RuntimeException(
+            String.format("Cannot drop namespace '%s': %s", namespace, 
e.getMessage()));
+      }
     } catch (NessieNotFoundException e) {
       LOG.error(
-          "Cannot drop Namespace '{}': ref '{}' is no longer valid.",
+          "Cannot drop namespace '{}': ref '{}' is no longer valid.",
           namespace,
           getRef().getName(),
           e);
-      return false;
-    } catch (NessieNamespaceNotEmptyException e) {
-      throw new NamespaceNotEmptyException(
-          e, "Namespace '%s' is not empty. One or more tables exist.", 
namespace);
+    } catch (BaseNessieClientServerException e) {
+      throw new RuntimeException(
+          String.format("Cannot drop namespace '%s': %s", namespace, 
e.getMessage()), e);
     }
+    return false;
   }
 
   public Map<String, String> loadNamespaceMetadata(Namespace namespace)
       throws NoSuchNamespaceException {
+    ContentKey key = ContentKey.of(namespace.levels());
     try {
-      return withReference(
-              getApi()
-                  .getNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
-          .get()
+      Map<ContentKey, Content> contentMap = 
withReference(api.getContent()).key(key).get();
+      return unwrapNamespace(contentMap.get(key))
+          .orElseThrow(
+              () -> new NoSuchNamespaceException("Namespace does not exist: 
%s", namespace))
           .getProperties();
-    } catch (NessieNamespaceNotFoundException e) {
-      throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", 
namespace);
-    } catch (NessieReferenceNotFoundException e) {
+    } catch (NessieNotFoundException e) {
       throw new RuntimeException(
           String.format(
-              "Cannot load Namespace '%s': " + "ref '%s' is no longer valid.",
+              "Cannot load namespace '%s': ref '%s' is no longer valid.",
               namespace, getRef().getName()),
           e);
     }
   }
 
   public boolean setProperties(Namespace namespace, Map<String, String> 
properties) {
-    try {
-      withReference(
-              getApi()
-                  .updateProperties()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-                  .updateProperties(properties))
-          .update();
-      refresh();
-      // always successful, otherwise an exception is thrown
-      return true;
-    } catch (NessieNamespaceNotFoundException e) {
-      throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", 
namespace);
-    } catch (NessieNotFoundException e) {
-      throw new RuntimeException(
-          String.format(
-              "Cannot update properties on Namespace '%s': ref '%s' is no 
longer valid.",
-              namespace, getRef().getName()),
-          e);
-    }
+    return updateProperties(namespace, props -> props.putAll(properties));
   }
 
   public boolean removeProperties(Namespace namespace, Set<String> properties) 
{
+    return updateProperties(namespace, props -> 
props.keySet().removeAll(properties));
+  }
+
+  private boolean updateProperties(Namespace namespace, Consumer<Map<String, 
String>> action) {
+    getRef().checkMutable();
+    ContentKey key = ContentKey.of(namespace.levels());
     try {
-      withReference(
-              getApi()
-                  .updateProperties()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-                  .removeProperties(properties))
-          .update();
-      refresh();
+      commitRetry(
+          "update namespace " + key,
+          true,
+          commitBuilder -> {
+            org.projectnessie.model.Namespace oldNamespace =
+                
unwrapNamespace(api.getContent().reference(getReference()).key(key).get().get(key))
+                    .orElseThrow(
+                        () -> new NessieContentNotFoundException(key, 
getReference().getName()));
+            Map<String, String> newProperties = 
Maps.newHashMap(oldNamespace.getProperties());
+            action.accept(newProperties);
+            org.projectnessie.model.Namespace updatedNamespace =
+                org.projectnessie.model.Namespace.builder()
+                    .from(oldNamespace)
+                    .properties(newProperties)
+                    .build();
+            commitBuilder.operation(Operation.Put.of(key, updatedNamespace));
+            return commitBuilder;
+          });
       // always successful, otherwise an exception is thrown
       return true;
-    } catch (NessieNamespaceNotFoundException e) {
-      throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", 
namespace);
-    } catch (NessieNotFoundException e) {
+    } catch (NessieReferenceConflictException e) {
+      Optional<Conflict> conflict =
+          NessieUtil.extractSingleConflict(e, 
EnumSet.of(Conflict.ConflictType.KEY_DOES_NOT_EXIST));
+      if (conflict.isPresent()
+          && conflict.get().conflictType() == 
Conflict.ConflictType.KEY_DOES_NOT_EXIST) {
+        throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", 
namespace);
+      }
+      throw new RuntimeException(
+          String.format(
+              "Cannot update properties on namespace '%s': %s", namespace, 
e.getMessage()));
+    } catch (NessieContentNotFoundException e) {
+      throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
+    } catch (NessieReferenceNotFoundException e) {
       throw new RuntimeException(
           String.format(
-              "Cannot remove properties from Namespace '%s': ref '%s' is no 
longer valid.",
+              "Cannot update properties on namespace '%s': ref '%s' is no 
longer valid.",
               namespace, getRef().getName()),
           e);
+    } catch (BaseNessieClientServerException e) {
+      throw new RuntimeException(
+          String.format("Cannot update namespace '%s': %s", namespace, 
e.getMessage()), e);
     }
   }
 
@@ -323,28 +410,11 @@ public class NessieIcebergClient implements AutoCloseable 
{
       throw new AlreadyExistsException("Table already exists: %s", to.name());
     }
 
-    CommitMultipleOperationsBuilder operations =
-        getApi()
-            .commitMultipleOperations()
-            .commitMeta(
-                NessieUtil.buildCommitMetadata(
-                    String.format("Iceberg rename table from '%s' to '%s'", 
from, to),
-                    catalogOptions))
-            .operation(Operation.Delete.of(NessieUtil.toKey(from)))
-            .operation(Operation.Put.of(NessieUtil.toKey(to), 
existingFromTable));
-
     try {
-      Tasks.foreach(operations)
-          .retry(5)
-          .stopRetryOn(NessieNotFoundException.class)
-          .throwFailureWhenFinished()
-          .onFailure((o, exception) -> refresh())
-          .run(
-              ops -> {
-                Branch branch = ops.branch((Branch) 
getRef().getReference()).commit();
-                getRef().updateReference(branch);
-              },
-              BaseNessieClientServerException.class);
+      commitRetry(
+          String.format("Iceberg rename table from '%s' to '%s'", from, to),
+          Operation.Delete.of(NessieUtil.toKey(from)),
+          Operation.Put.of(NessieUtil.toKey(to), existingFromTable));
     } catch (NessieNotFoundException e) {
       // important note: the NotFoundException refers to the ref only. If a 
table was not found it
       // would imply that the
@@ -355,13 +425,13 @@ public class NessieIcebergClient implements AutoCloseable 
{
       // and removed by another.
       throw new RuntimeException(
           String.format(
-              "Cannot rename table '%s' to '%s': " + "ref '%s' no longer 
exists.",
+              "Cannot rename table '%s' to '%s': ref '%s' no longer exists.",
               from.name(), to.name(), getRef().getName()),
           e);
     } catch (BaseNessieClientServerException e) {
       throw new CommitFailedException(
           e,
-          "Cannot rename table '%s' to '%s': " + "the current reference is not 
up to date.",
+          "Cannot rename table '%s' to '%s': the current reference is not up 
to date.",
           from.name(),
           to.name());
     } catch (HttpClientException ex) {
@@ -390,29 +460,12 @@ public class NessieIcebergClient implements AutoCloseable 
{
       LOG.info("Purging data for table {} was set to true but is ignored", 
identifier.toString());
     }
 
-    CommitMultipleOperationsBuilder commitBuilderBase =
-        getApi()
-            .commitMultipleOperations()
-            .commitMeta(
-                NessieUtil.buildCommitMetadata(
-                    String.format("Iceberg delete table %s", identifier), 
catalogOptions))
-            .operation(Operation.Delete.of(NessieUtil.toKey(identifier)));
-
     // We try to drop the table. Simple retry after ref update.
-    boolean threw = true;
     try {
-      Tasks.foreach(commitBuilderBase)
-          .retry(5)
-          .stopRetryOn(NessieNotFoundException.class)
-          .throwFailureWhenFinished()
-          .onFailure((o, exception) -> refresh())
-          .run(
-              commitBuilder -> {
-                Branch branch = commitBuilder.branch((Branch) 
getRef().getReference()).commit();
-                getRef().updateReference(branch);
-              },
-              BaseNessieClientServerException.class);
-      threw = false;
+      commitRetry(
+          String.format("Iceberg delete table %s", identifier),
+          Operation.Delete.of(NessieUtil.toKey(identifier)));
+      return true;
     } catch (NessieConflictException e) {
       LOG.error(
           "Cannot drop table: failed after retry (update ref '{}' and retry)",
@@ -423,7 +476,7 @@ public class NessieIcebergClient implements AutoCloseable {
     } catch (BaseNessieClientServerException e) {
       LOG.error("Cannot drop table: unknown error", e);
     }
-    return !threw;
+    return false;
   }
 
   /** @deprecated will be removed after 1.5.0 */
@@ -540,4 +593,64 @@ public class NessieIcebergClient implements AutoCloseable {
       api.close();
     }
   }
+
+  private void commitRetry(String message, Operation... ops)
+      throws BaseNessieClientServerException {
+    commitRetry(message, false, builder -> 
builder.operations(Arrays.asList(ops)));
+  }
+
+  private void commitRetry(String message, boolean retryConflicts, 
CommitEnhancer commitEnhancer)
+      throws BaseNessieClientServerException {
+    // Retry all errors except for NessieNotFoundException and also 
NessieConflictException, unless
+    // retryConflicts is set to true.
+    Predicate<Exception> shouldRetry =
+        e ->
+            !(e instanceof NessieNotFoundException)
+                && (!(e instanceof NessieConflictException) || retryConflicts);
+    Tasks.range(1)
+        .retry(5)
+        .shouldRetryTest(shouldRetry)
+        .throwFailureWhenFinished()
+        .onFailure((o, exception) -> refresh())
+        .run(
+            i -> {
+              try {
+                Branch branch =
+                    commitEnhancer
+                        .enhance(api.commitMultipleOperations())
+                        .commitMeta(NessieUtil.buildCommitMetadata(message, 
catalogOptions))
+                        .branch((Branch) getReference())
+                        .commit();
+                getRef().updateReference(branch);
+              } catch (NessieConflictException e) {
+                if (retryConflicts) {
+                  refresh(); // otherwise retrying a conflict doesn't make 
sense
+                }
+                throw e;
+              }
+            },
+            BaseNessieClientServerException.class);
+  }
+
+  private static AlreadyExistsException namespaceAlreadyExists(
+      ContentKey key, @Nullable Content existing, @Nullable Exception ex) {
+    if (existing instanceof org.projectnessie.model.Namespace) {
+      return new AlreadyExistsException(ex, "Namespace already exists: %s", 
key);
+    } else {
+      return new AlreadyExistsException(
+          ex, "Another content object with name '%s' already exists", key);
+    }
+  }
+
+  private static Optional<org.projectnessie.model.Namespace> 
unwrapNamespace(Content content) {
+    return content == null
+        ? Optional.empty()
+        : content.unwrap(org.projectnessie.model.Namespace.class);
+  }
+
+  private interface CommitEnhancer {
+
+    CommitMultipleOperationsBuilder enhance(CommitMultipleOperationsBuilder 
builder)
+        throws BaseNessieClientServerException;
+  }
 }
diff --git 
a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java 
b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
index 0b4c293f86..a5d7e7b214 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
@@ -18,7 +18,7 @@
  */
 package org.apache.iceberg.nessie;
 
-import java.util.List;
+import java.util.EnumSet;
 import java.util.Map;
 import org.apache.iceberg.BaseMetastoreTableOperations;
 import org.apache.iceberg.TableMetadata;
@@ -34,9 +34,7 @@ import org.projectnessie.client.http.HttpClientException;
 import org.projectnessie.error.NessieConflictException;
 import org.projectnessie.error.NessieNotFoundException;
 import org.projectnessie.error.NessieReferenceConflictException;
-import org.projectnessie.error.ReferenceConflicts;
 import org.projectnessie.model.Conflict;
-import org.projectnessie.model.Conflict.ConflictType;
 import org.projectnessie.model.Content;
 import org.projectnessie.model.ContentKey;
 import org.projectnessie.model.IcebergTable;
@@ -169,35 +167,33 @@ public class NessieTableOperations extends 
BaseMetastoreTableOperations {
   }
 
   private static void 
maybeThrowSpecializedException(NessieReferenceConflictException ex) {
-    // Check if the server returned 'ReferenceConflicts' information
-    ReferenceConflicts referenceConflicts = ex.getErrorDetails();
-    if (referenceConflicts == null) {
-      return;
-    }
-
-    // Can only narrow down to a single exception, if there is only one 
conflict.
-    List<Conflict> conflicts = referenceConflicts.conflicts();
-    if (conflicts.size() != 1) {
-      return;
-    }
-
-    Conflict conflict = conflicts.get(0);
-    ConflictType conflictType = conflict.conflictType();
-    if (conflictType != null) {
-      switch (conflictType) {
-        case NAMESPACE_ABSENT:
-          throw new NoSuchNamespaceException(ex, "Namespace does not exist: 
%s", conflict.key());
-        case NAMESPACE_NOT_EMPTY:
-          throw new NamespaceNotEmptyException(ex, "Namespace not empty: %s", 
conflict.key());
-        case KEY_DOES_NOT_EXIST:
-          throw new NoSuchTableException(ex, "Table or view does not exist: 
%s", conflict.key());
-        case KEY_EXISTS:
-          throw new AlreadyExistsException(ex, "Table or view already exists: 
%s", conflict.key());
-        default:
-          // Explicit fall-through
-          break;
-      }
-    }
+    NessieUtil.extractSingleConflict(
+            ex,
+            EnumSet.of(
+                Conflict.ConflictType.NAMESPACE_ABSENT,
+                Conflict.ConflictType.NAMESPACE_NOT_EMPTY,
+                Conflict.ConflictType.KEY_DOES_NOT_EXIST,
+                Conflict.ConflictType.KEY_EXISTS))
+        .ifPresent(
+            conflict -> {
+              switch (conflict.conflictType()) {
+                case NAMESPACE_ABSENT:
+                  throw new NoSuchNamespaceException(
+                      ex, "Namespace does not exist: %s", conflict.key());
+                case NAMESPACE_NOT_EMPTY:
+                  throw new NamespaceNotEmptyException(
+                      ex, "Namespace not empty: %s", conflict.key());
+                case KEY_DOES_NOT_EXIST:
+                  throw new NoSuchTableException(
+                      ex, "Table or view does not exist: %s", conflict.key());
+                case KEY_EXISTS:
+                  throw new AlreadyExistsException(
+                      ex, "Table or view already exists: %s", conflict.key());
+                default:
+                  // Explicit fall-through
+                  break;
+              }
+            });
   }
 
   @Override
diff --git a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java 
b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java
index 8c69f844db..3c3b0afd64 100644
--- a/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java
+++ b/nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java
@@ -19,9 +19,11 @@
 package org.apache.iceberg.nessie;
 
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.SnapshotRef;
@@ -32,7 +34,10 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.projectnessie.error.NessieReferenceConflictException;
+import org.projectnessie.error.ReferenceConflicts;
 import org.projectnessie.model.CommitMeta;
+import org.projectnessie.model.Conflict;
 import org.projectnessie.model.ContentKey;
 import org.projectnessie.model.IcebergTable;
 import org.projectnessie.model.ImmutableCommitMeta;
@@ -165,4 +170,24 @@ public final class NessieUtil {
 
     return builder.discardChanges().build();
   }
+
+  public static Optional<Conflict> extractSingleConflict(
+      NessieReferenceConflictException ex, Collection<Conflict.ConflictType> 
handledConflictTypes) {
+    // Check if the server returned 'ReferenceConflicts' information
+    ReferenceConflicts referenceConflicts = ex.getErrorDetails();
+    if (referenceConflicts == null) {
+      return Optional.empty();
+    }
+
+    List<Conflict> conflicts =
+        referenceConflicts.conflicts().stream()
+            .filter(c -> handledConflictTypes.contains(c.conflictType()))
+            .collect(Collectors.toList());
+    if (conflicts.size() != 1) {
+      return Optional.empty();
+    }
+
+    Conflict conflict = conflicts.get(0);
+    return Optional.of(conflict);
+  }
 }
diff --git 
a/nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java 
b/nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java
index fb7f4ea309..b6ae90650e 100644
--- a/nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java
+++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java
@@ -22,8 +22,8 @@ import static 
org.apache.iceberg.types.Types.NestedField.required;
 
 import java.io.IOException;
 import java.net.URI;
-import java.util.AbstractMap;
 import java.util.Collections;
+import java.util.Map;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
@@ -31,6 +31,7 @@ import org.apache.iceberg.TableOperations;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.types.Types;
 import org.assertj.core.api.Assertions;
@@ -39,6 +40,9 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.projectnessie.client.ext.NessieClientFactory;
 import org.projectnessie.client.ext.NessieClientUri;
+import org.projectnessie.error.NessieConflictException;
+import org.projectnessie.error.NessieNotFoundException;
+import org.projectnessie.model.Branch;
 
 public class TestMultipleClients extends BaseTestIceberg {
 
@@ -67,33 +71,87 @@ public class TestMultipleClients extends BaseTestIceberg {
   }
 
   @Test
-  public void testListNamespaces() {
+  public void testListNamespaces() throws NessieConflictException, 
NessieNotFoundException {
+    Assertions.assertThat(catalog.listNamespaces()).isEmpty();
+    Assertions.assertThat(anotherCatalog.listNamespaces()).isEmpty();
+
+    // listing a non-existent namespace should return empty
+    
Assertions.assertThat(catalog.listNamespaces(Namespace.of("db1"))).isEmpty();
+    
Assertions.assertThat(anotherCatalog.listNamespaces(Namespace.of("db1"))).isEmpty();
+
     catalog.createNamespace(Namespace.of("db1"), Collections.emptyMap());
+
     
Assertions.assertThat(catalog.listNamespaces()).containsExactlyInAnyOrder(Namespace.of("db1"));
+    Assertions.assertThat(anotherCatalog.listNamespaces())
+        .containsExactlyInAnyOrder(Namespace.of("db1"));
 
     // another client creates a namespace with the same nessie server
     anotherCatalog.createNamespace(Namespace.of("db2"), 
Collections.emptyMap());
-    Assertions.assertThat(anotherCatalog.listNamespaces())
-        .containsExactlyInAnyOrder(Namespace.of("db1"), Namespace.of("db2"));
 
     Assertions.assertThat(catalog.listNamespaces())
         .containsExactlyInAnyOrder(Namespace.of("db1"), Namespace.of("db2"));
+    Assertions.assertThat(anotherCatalog.listNamespaces())
+        .containsExactlyInAnyOrder(Namespace.of("db1"), Namespace.of("db2"));
+
+    api.deleteBranch().branch((Branch) 
api.getReference().refName(branch).get()).delete();
+
+    Assertions.assertThatThrownBy(() -> catalog.listNamespaces())
+        .isInstanceOf(NoSuchNamespaceException.class)
+        .hasMessageContaining(
+            "Cannot list top-level namespaces: ref '%s' is no longer valid", 
branch);
+    Assertions.assertThatThrownBy(() -> 
anotherCatalog.listNamespaces(Namespace.of("db1")))
+        .isInstanceOf(NoSuchNamespaceException.class)
+        .hasMessageContaining(
+            "Cannot list child namespaces from 'db1': ref '%s' is no longer 
valid", branch);
   }
 
   @Test
-  public void testLoadNamespaceMetadata() {
+  public void testLoadNamespaceMetadata() throws NessieConflictException, 
NessieNotFoundException {
+    Assertions.assertThatThrownBy(() -> 
catalog.loadNamespaceMetadata(Namespace.of("namespace1")))
+        .isInstanceOf(NoSuchNamespaceException.class)
+        .hasMessageContaining("Namespace does not exist: namespace1");
+    Assertions.assertThatThrownBy(
+            () -> 
anotherCatalog.loadNamespaceMetadata(Namespace.of("namespace1")))
+        .isInstanceOf(NoSuchNamespaceException.class)
+        .hasMessageContaining("Namespace does not exist: namespace1");
+
     catalog.createNamespace(Namespace.of("namespace1"), 
Collections.emptyMap());
+
+    // both clients should see the namespace because we read the HEAD of the 
ref
     Assertions.assertThat(catalog.listNamespaces())
         .containsExactlyInAnyOrder(Namespace.of("namespace1"));
+    Assertions.assertThat(anotherCatalog.listNamespaces())
+        .containsExactlyInAnyOrder(Namespace.of("namespace1"));
 
-    // another client adds a metadata to the same namespace
-    anotherCatalog.setProperties(Namespace.of("namespace1"), 
Collections.singletonMap("k1", "v1"));
-    AbstractMap.SimpleEntry<String, String> entry = new 
AbstractMap.SimpleEntry<>("k1", "v1");
+    // the other client should not be able to update the namespace
+    // because it is still on the old ref hash
+    Assertions.assertThatThrownBy(
+            () ->
+                anotherCatalog.setProperties(
+                    Namespace.of("namespace1"), Collections.singletonMap("k1", 
"v1")))
+        .isInstanceOf(NoSuchNamespaceException.class)
+        .hasMessageContaining("Namespace does not exist: namespace1");
+    // the same client adds a metadata to the namespace: expect success
+    catalog.setProperties(Namespace.of("namespace1"), 
Collections.singletonMap("k1", "v1"));
+
+    // load metadata from the same client and another client both should work 
fine
+    // because we read the HEAD of the ref
     
Assertions.assertThat(anotherCatalog.loadNamespaceMetadata(Namespace.of("namespace1")))
-        .containsExactly(entry);
-
+        .containsExactly(Map.entry("k1", "v1"));
     
Assertions.assertThat(catalog.loadNamespaceMetadata(Namespace.of("namespace1")))
-        .containsExactly(entry);
+        .containsExactly(Map.entry("k1", "v1"));
+
+    api.deleteBranch().branch((Branch) 
api.getReference().refName(branch).get()).delete();
+
+    Assertions.assertThatThrownBy(() -> 
catalog.loadNamespaceMetadata(Namespace.of("namespace1")))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining(
+            "Cannot load namespace 'namespace1': ref '%s' is no longer valid", 
branch);
+    Assertions.assertThatThrownBy(
+            () -> 
anotherCatalog.loadNamespaceMetadata(Namespace.of("namespace1")))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining(
+            "Cannot load namespace 'namespace1': ref '%s' is no longer valid", 
branch);
   }
 
   @Test
diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestNamespace.java 
b/nessie/src/test/java/org/apache/iceberg/nessie/TestNamespace.java
index 8267329678..0b1af9763d 100644
--- a/nessie/src/test/java/org/apache/iceberg/nessie/TestNamespace.java
+++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestNamespace.java
@@ -106,7 +106,7 @@ public class TestNamespace extends BaseTestIceberg {
 
     Assertions.assertThatThrownBy(() -> catalog.dropNamespace(namespace))
         .isInstanceOf(NamespaceNotEmptyException.class)
-        .hasMessage("Namespace 'test' is not empty. One or more tables 
exist.");
+        .hasMessageContaining("Namespace 'test' is not empty");
 
     catalog.dropTable(identifier, true);
     catalog.dropNamespace(namespace);
diff --git 
a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java 
b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java
index f1d6159d93..d4e079ad26 100644
--- 
a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java
+++ 
b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java
@@ -18,14 +18,36 @@
  */
 package org.apache.iceberg.nessie;
 
+import static org.apache.iceberg.types.Types.NestedField.required;
+
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
 import org.assertj.core.api.Assertions;
+import org.assertj.core.data.Index;
 import org.junit.jupiter.api.Test;
 import org.projectnessie.error.NessieConflictException;
 import org.projectnessie.error.NessieNotFoundException;
 import org.projectnessie.model.Branch;
+import org.projectnessie.model.CommitMeta;
+import org.projectnessie.model.Content;
+import org.projectnessie.model.ContentKey;
+import org.projectnessie.model.IcebergTable;
+import org.projectnessie.model.LogResponse;
+import org.projectnessie.model.Operation;
 import org.projectnessie.model.Reference;
 
 public class TestNessieIcebergClient extends BaseTestIceberg {
@@ -80,7 +102,7 @@ public class TestNessieIcebergClient extends BaseTestIceberg 
{
     client
         .getApi()
         .deleteBranch()
-        .branch((Branch) client.getApi().getReference().refName(branch).get())
+        .branch((Branch) api.getReference().refName(branch).get())
         .delete();
     createBranch(branch);
 
@@ -91,15 +113,443 @@ public class TestNessieIcebergClient extends 
BaseTestIceberg {
     Assertions.assertThat(client.withReference(branch, 
null)).isNotEqualTo(client);
   }
 
+  @Test
+  public void testCreateNamespace() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "createNamespaceBranch";
+    createBranch(branch);
+    Map<String, String> catalogOptions =
+        Map.of(
+            CatalogProperties.USER, "iceberg-user",
+            CatalogProperties.APP_ID, "iceberg-nessie");
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
catalogOptions);
+
+    client.createNamespace(Namespace.of("a"), Map.of());
+    
Assertions.assertThat(client.listNamespaces(Namespace.of("a"))).isNotNull();
+
+    List<LogResponse.LogEntry> entries = 
api.getCommitLog().refName(branch).get().getLogEntries();
+    Assertions.assertThat(entries)
+        .isNotEmpty()
+        .first()
+        .extracting(LogResponse.LogEntry::getCommitMeta)
+        .extracting(CommitMeta::getMessage, CommitMeta::getAuthor, 
CommitMeta::getProperties)
+        .containsExactly(
+            "create namespace a",
+            "iceberg-user",
+            ImmutableMap.of(
+                "application-type", "iceberg",
+                "app-id", "iceberg-nessie"));
+  }
+
+  @Test
+  public void testCreateNamespaceInvalid() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "createNamespaceInvalidBranch";
+    createBranch(branch);
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
Map.of());
+
+    Assertions.assertThatIllegalArgumentException()
+        .isThrownBy(() -> client.createNamespace(Namespace.empty(), Map.of()))
+        .withMessageContaining("Creating empty namespaces is not supported");
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.of("a", "b"), Map.of()))
+        .isInstanceOf(NoSuchNamespaceException.class)
+        .hasMessageContaining("Cannot create namespace 'a.b': parent namespace 
'a' does not exist");
+  }
+
+  @Test
+  public void testCreateNamespaceConflict()
+      throws NessieConflictException, NessieNotFoundException {
+    String branch = "createNamespaceConflictBranch";
+    createBranch(branch);
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
Map.of());
+
+    client.createNamespace(Namespace.of("a"), Map.of());
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.of("a"), Map.of()))
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageContaining("Namespace already exists: a");
+
+    client.commitTable(
+        null, newTableMetadata(), "file:///tmp/iceberg", (String) null, 
ContentKey.of("a", "tbl"));
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.of("a", "tbl"), Map.of()))
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageContaining("Another content object with name 'a.tbl' 
already exists");
+  }
+
+  @Test
+  public void testCreateNamespaceExternalConflict()
+      throws NessieConflictException, NessieNotFoundException {
+    String branch = "createNamespaceExternalConflictBranch";
+    createBranch(branch);
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
Map.of());
+
+    org.projectnessie.model.Namespace nessieNs =
+        org.projectnessie.model.Namespace.of(ContentKey.of("a"));
+    commit(branch, "create namespace a", Operation.Put.of(ContentKey.of("a"), 
nessieNs));
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.of("a"), Map.of()))
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageContaining("Namespace already exists: a");
+
+    IcebergTable table = IcebergTable.of("file:///tmp/iceberg", 1, 1, 1, 1);
+    commit(branch, "create table a.tbl2", Operation.Put.of(ContentKey.of("a", 
"tbl"), table));
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.of("a", "tbl"), Map.of()))
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageContaining("Another content object with name 'a.tbl' 
already exists");
+  }
+
+  @Test
+  public void testCreateNamespaceNonExistingRef()
+      throws NessieConflictException, NessieNotFoundException {
+    String branch = "createNamespaceNonExistingRefBranch";
+    createBranch(branch);
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
Map.of());
+
+    client.createNamespace(Namespace.of("a"), Map.of());
+
+    api.deleteBranch().branch((Branch) 
api.getReference().refName(branch).get()).delete();
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.of("b"), Map.of()))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining(
+            "Cannot create namespace 'b': ref 
'createNamespaceNonExistingRefBranch' is no longer valid");
+  }
+
+  @Test
+  public void testDropNamespace() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "dropNamespaceBranch";
+    createBranch(branch);
+    Map<String, String> catalogOptions =
+        Map.of(
+            CatalogProperties.USER, "iceberg-user",
+            CatalogProperties.APP_ID, "iceberg-nessie");
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
catalogOptions);
+
+    Namespace parent = Namespace.of("a");
+    Namespace child = Namespace.of("a", "b");
+
+    Assertions.assertThat(client.dropNamespace(parent)).isFalse();
+    Assertions.assertThat(client.dropNamespace(child)).isFalse();
+
+    client.createNamespace(parent, Map.of());
+    client.createNamespace(child, Map.of());
+
+    Assertions.assertThat(client.dropNamespace(child)).isTrue();
+    Assertions.assertThat(client.dropNamespace(parent)).isTrue();
+
+    List<LogResponse.LogEntry> entries = 
api.getCommitLog().refName(branch).get().getLogEntries();
+    Assertions.assertThat(entries)
+        .isNotEmpty()
+        .extracting(LogResponse.LogEntry::getCommitMeta)
+        .satisfies(
+            meta -> {
+              Assertions.assertThat(meta.getMessage()).contains("drop 
namespace a");
+              
Assertions.assertThat(meta.getAuthor()).isEqualTo("iceberg-user");
+              Assertions.assertThat(meta.getProperties())
+                  .containsEntry(NessieUtil.APPLICATION_TYPE, "iceberg")
+                  .containsEntry(CatalogProperties.APP_ID, "iceberg-nessie");
+            },
+            Index.atIndex(0))
+        .satisfies(
+            meta -> {
+              Assertions.assertThat(meta.getMessage()).contains("drop 
namespace a.b");
+              
Assertions.assertThat(meta.getAuthor()).isEqualTo("iceberg-user");
+              Assertions.assertThat(meta.getProperties())
+                  .containsEntry(NessieUtil.APPLICATION_TYPE, "iceberg")
+                  .containsEntry(CatalogProperties.APP_ID, "iceberg-nessie");
+            },
+            Index.atIndex(1));
+  }
+
+  @Test
+  public void testDropNamespaceNotEmpty() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "dropNamespaceInvalidBranch";
+    createBranch(branch);
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
Map.of());
+
+    client.createNamespace(Namespace.of("a"), Map.of());
+    client.createNamespace(Namespace.of("a", "b"), Map.of());
+
+    Assertions.assertThatThrownBy(() -> 
client.dropNamespace(Namespace.of("a")))
+        .isInstanceOf(NamespaceNotEmptyException.class)
+        .hasMessageContaining("Namespace 'a' is not empty.");
+  }
+
+  @Test
+  public void testDropNamespaceConflict() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "dropNamespaceConflictBranch";
+    createBranch(branch);
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
Map.of());
+
+    client.createNamespace(Namespace.of("a"), Map.of());
+
+    client.commitTable(
+        null, newTableMetadata(), "file:///tmp/iceberg", (String) null, 
ContentKey.of("a", "tbl"));
+
+    Assertions.assertThatThrownBy(() -> client.dropNamespace(Namespace.of("a", 
"tbl")))
+        .isInstanceOf(NoSuchNamespaceException.class)
+        .hasMessageContaining("Content object with name 'a.tbl' is not a 
namespace.");
+  }
+
+  @Test
+  public void testDropNamespaceExternalConflict()
+      throws NessieConflictException, NessieNotFoundException {
+    String branch = "dropNamespaceExternalConflictBranch";
+    createBranch(branch);
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
Map.of());
+
+    client.createNamespace(Namespace.of("a"), Map.of());
+
+    org.projectnessie.model.Namespace original = 
fetchNamespace(ContentKey.of("a"), branch);
+    org.projectnessie.model.Namespace updated =
+        org.projectnessie.model.Namespace.builder()
+            .from(original)
+            .properties(Map.of("k1", "v1"))
+            .build();
+    commit(branch, "update namespace a", Operation.Put.of(ContentKey.of("a"), 
updated));
+
+    Assertions.assertThatThrownBy(() -> 
client.dropNamespace(Namespace.of("a")))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining(
+            "Cannot drop namespace 'a': Values of existing and expected 
content for key 'a' are different.");
+  }
+
+  @Test
+  public void testDropNamespaceNonExistingRef()
+      throws NessieConflictException, NessieNotFoundException {
+    String branch = "dropNamespaceNonExistingRefBranch";
+    createBranch(branch);
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
Map.of());
+
+    client.createNamespace(Namespace.of("a"), Map.of());
+
+    api.deleteBranch().branch((Branch) 
api.getReference().refName(branch).get()).delete();
+
+    Assertions.assertThat(client.dropNamespace(Namespace.of("a"))).isFalse();
+  }
+
+  @Test
+  public void testSetProperties() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "setPropertiesBranch";
+    createBranch(branch);
+    Map<String, String> catalogOptions =
+        Map.of(
+            CatalogProperties.USER, "iceberg-user",
+            CatalogProperties.APP_ID, "iceberg-nessie");
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
catalogOptions);
+
+    Namespace ns = Namespace.of("a");
+    client.createNamespace(ns, Map.of("k1", "v1a"));
+
+    Assertions.assertThat(client.setProperties(ns, Map.of("k1", "v1b", "k2", 
"v2"))).isTrue();
+
+    Assertions.assertThat(client.loadNamespaceMetadata(ns))
+        .hasSize(2)
+        .containsEntry("k1", "v1b")
+        .containsEntry("k2", "v2");
+
+    List<LogResponse.LogEntry> entries = 
api.getCommitLog().refName(branch).get().getLogEntries();
+    Assertions.assertThat(entries)
+        .isNotEmpty()
+        .first()
+        .extracting(LogResponse.LogEntry::getCommitMeta)
+        .satisfies(
+            meta -> {
+              Assertions.assertThat(meta.getMessage()).contains("update 
namespace a");
+              
Assertions.assertThat(meta.getAuthor()).isEqualTo("iceberg-user");
+              Assertions.assertThat(meta.getProperties())
+                  .containsEntry(NessieUtil.APPLICATION_TYPE, "iceberg")
+                  .containsEntry(CatalogProperties.APP_ID, "iceberg-nessie");
+            });
+  }
+
+  @Test
+  public void testSetPropertiesExternalConflict()
+      throws NessieConflictException, NessieNotFoundException {
+    String branch = "setPropertiesExternalConflictBranch";
+    createBranch(branch);
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
Map.of());
+
+    Namespace ns = Namespace.of("a");
+    client.createNamespace(ns, Map.of("k1", "v1a"));
+
+    ContentKey key = ContentKey.of("a");
+    org.projectnessie.model.Namespace original = fetchNamespace(key, branch);
+    org.projectnessie.model.Namespace updated =
+        org.projectnessie.model.Namespace.builder()
+            .from(original)
+            .properties(Map.of("k1", "v1b", "k2", "v2"))
+            .build();
+    commit(branch, "update namespace a", Operation.Put.of(key, updated));
+
+    // will generate a conflict and a retry
+    Assertions.assertThat(client.setProperties(ns, Map.of("k1", "v1c", "k3", 
"v3"))).isTrue();
+
+    Assertions.assertThat(client.loadNamespaceMetadata(ns))
+        .hasSize(3)
+        .containsEntry("k1", "v1c")
+        .containsEntry("k2", "v2")
+        .containsEntry("k3", "v3");
+  }
+
+  @Test
+  public void testSetPropertiesNonExistingNs()
+      throws NessieConflictException, NessieNotFoundException {
+    String branch = "setPropertiesNonExistingNsBranch";
+    createBranch(branch);
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
Map.of());
+
+    client.createNamespace(Namespace.of("a"), Map.of());
+
+    commit(branch, "delete namespace a", 
Operation.Delete.of(ContentKey.of("a")));
+
+    Assertions.assertThatThrownBy(
+            () -> client.setProperties(Namespace.of("a"), Map.of("k1", "v1a")))
+        .isInstanceOf(NoSuchNamespaceException.class)
+        .hasMessageContaining("Namespace does not exist: a");
+  }
+
+  @Test
+  public void testSetPropertiesNonExistingRef()
+      throws NessieConflictException, NessieNotFoundException {
+    String branch = "setPropertiesNonExistingRefBranch";
+    createBranch(branch);
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
Map.of());
+
+    client.createNamespace(Namespace.of("a"), Map.of());
+
+    api.deleteBranch().branch((Branch) 
api.getReference().refName(branch).get()).delete();
+
+    Assertions.assertThatThrownBy(() -> 
client.setProperties(Namespace.of("a"), Map.of("k1", "v1")))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining(
+            "Cannot update properties on namespace 'a': ref 
'setPropertiesNonExistingRefBranch' is no longer valid");
+  }
+
+  @Test
+  public void testRemoveProperties() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "removePropertiesBranch";
+    createBranch(branch);
+    Map<String, String> catalogOptions =
+        Map.of(
+            CatalogProperties.USER, "iceberg-user",
+            CatalogProperties.APP_ID, "iceberg-nessie");
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
catalogOptions);
+
+    Namespace ns = Namespace.of("a");
+
+    client.createNamespace(ns, Map.of("k1", "v1", "k2", "v2"));
+
+    Assertions.assertThat(client.removeProperties(ns, Set.of("k1"))).isTrue();
+
+    
Assertions.assertThat(client.loadNamespaceMetadata(ns)).hasSize(1).containsOnlyKeys("k2");
+
+    List<LogResponse.LogEntry> entries = 
api.getCommitLog().refName(branch).get().getLogEntries();
+    Assertions.assertThat(entries)
+        .isNotEmpty()
+        .first()
+        .extracting(LogResponse.LogEntry::getCommitMeta)
+        .satisfies(
+            meta -> {
+              Assertions.assertThat(meta.getMessage()).contains("update 
namespace a");
+              
Assertions.assertThat(meta.getAuthor()).isEqualTo("iceberg-user");
+              Assertions.assertThat(meta.getProperties())
+                  .containsEntry(NessieUtil.APPLICATION_TYPE, "iceberg")
+                  .containsEntry(CatalogProperties.APP_ID, "iceberg-nessie");
+            });
+  }
+
+  @Test
+  public void testRemovePropertiesExternalConflict()
+      throws NessieConflictException, NessieNotFoundException {
+    String branch = "removePropertiesExternalConflictBranch";
+    createBranch(branch);
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
Map.of());
+
+    Namespace ns = Namespace.of("a");
+    client.createNamespace(ns, Map.of("k1", "v1"));
+
+    ContentKey key = ContentKey.of("a");
+    org.projectnessie.model.Namespace original = fetchNamespace(key, branch);
+    org.projectnessie.model.Namespace updated =
+        org.projectnessie.model.Namespace.builder()
+            .from(original)
+            .properties(Map.of("k2", "v2", "k3", "v3"))
+            .build();
+    commit(branch, "update namespace a", Operation.Put.of(key, updated));
+
+    // will generate a conflict and a retry
+    Assertions.assertThat(client.removeProperties(ns, Set.of("k2"))).isTrue();
+
+    
Assertions.assertThat(client.loadNamespaceMetadata(ns)).hasSize(1).containsOnlyKeys("k3");
+  }
+
+  @Test
+  public void testRemovePropertiesNonExistingNs()
+      throws NessieConflictException, NessieNotFoundException {
+    String branch = "removePropertiesNonExistingNsBranch";
+    createBranch(branch);
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
Map.of());
+
+    client.createNamespace(Namespace.of("a"), Map.of("k1", "v1"));
+
+    commit(branch, "delete namespace a", 
Operation.Delete.of(ContentKey.of("a")));
+
+    Assertions.assertThatThrownBy(() -> 
client.removeProperties(Namespace.of("a"), Set.of("k1")))
+        .isInstanceOf(NoSuchNamespaceException.class)
+        .hasMessageContaining("Namespace does not exist: a");
+  }
+
+  @Test
+  public void testRemovePropertiesNonExistingRef()
+      throws NessieConflictException, NessieNotFoundException {
+    String branch = "removePropertiesNonExistingRefBranch";
+    createBranch(branch);
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
Map.of());
+
+    client.createNamespace(Namespace.of("a"), Map.of("k1", "v1"));
+
+    api.deleteBranch().branch((Branch) 
api.getReference().refName(branch).get()).delete();
+
+    Assertions.assertThatThrownBy(() -> 
client.removeProperties(Namespace.of("a"), Set.of("k1")))
+        .isInstanceOf(RuntimeException.class)
+        .hasMessageContaining(
+            "Cannot update properties on namespace 'a': ref 
'removePropertiesNonExistingRefBranch' is no longer valid");
+  }
+
   @Test
   public void testInvalidClientApiVersion() throws IOException {
     try (NessieCatalog newCatalog = new NessieCatalog()) {
       newCatalog.setConf(hadoopConfig);
       ImmutableMap.Builder<String, String> options =
           ImmutableMap.<String, String>builder().put("client-api-version", 
"3");
-      Assertions.assertThatThrownBy(() -> newCatalog.initialize("nessie", 
options.buildOrThrow()))
-          .isInstanceOf(IllegalArgumentException.class)
-          .hasMessage("Unsupported client-api-version: 3. Can only be 1 or 2");
+      Assertions.assertThatIllegalArgumentException()
+          .isThrownBy(() -> newCatalog.initialize("nessie", 
options.buildOrThrow()))
+          .withMessage("Unsupported client-api-version: 3. Can only be 1 or 
2");
     }
   }
+
+  private void commit(String branch, String message, Operation... operations)
+      throws NessieNotFoundException, NessieConflictException {
+    Branch ref = (Branch) api.getReference().refName(branch).get();
+    api.commitMultipleOperations()
+        .branch(ref)
+        .commitMeta(NessieUtil.buildCommitMetadata(message, Map.of()))
+        .operations(Arrays.asList(operations))
+        .commit();
+  }
+
+  private org.projectnessie.model.Namespace fetchNamespace(ContentKey key, 
String branch)
+      throws NessieNotFoundException {
+    Reference reference = api.getReference().refName(branch).get();
+    Content content = 
api.getContent().key(key).reference(reference).get().get(key);
+    return 
content.unwrap(org.projectnessie.model.Namespace.class).orElseThrow();
+  }
+
+  private static TableMetadata newTableMetadata() {
+    Schema schema = new Schema(required(1, "id", Types.LongType.get()));
+    return TableMetadata.newTableMetadata(
+        schema, PartitionSpec.unpartitioned(), SortOrder.unsorted(), null, 
Map.of());
+  }
 }

Reply via email to