nastra commented on code in PR #4491:
URL: https://github.com/apache/iceberg/pull/4491#discussion_r847553234


##########
nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java:
##########
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.util.Tasks;
+import org.projectnessie.client.NessieConfigConstants;
+import org.projectnessie.client.api.CommitMultipleOperationsBuilder;
+import org.projectnessie.client.api.NessieApiV1;
+import org.projectnessie.client.http.HttpClientException;
+import org.projectnessie.error.BaseNessieClientServerException;
+import org.projectnessie.error.NessieConflictException;
+import org.projectnessie.error.NessieNamespaceAlreadyExistsException;
+import org.projectnessie.error.NessieNamespaceNotEmptyException;
+import org.projectnessie.error.NessieNamespaceNotFoundException;
+import org.projectnessie.error.NessieNotFoundException;
+import org.projectnessie.error.NessieReferenceNotFoundException;
+import org.projectnessie.model.Branch;
+import org.projectnessie.model.Content;
+import org.projectnessie.model.ContentKey;
+import org.projectnessie.model.EntriesResponse;
+import org.projectnessie.model.GetNamespacesResponse;
+import org.projectnessie.model.IcebergTable;
+import org.projectnessie.model.Operation;
+import org.projectnessie.model.Reference;
+import org.projectnessie.model.Tag;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NessieIcebergClient implements AutoCloseable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(NessieIcebergClient.class);
+
+  private final NessieApiV1 api;
+  private final Supplier<UpdateableReference> reference;
+  private final Map<String, String> catalogOptions;
+
+  public NessieIcebergClient(
+      NessieApiV1 api, String requestedRef, String requestedHash, Map<String, 
String> catalogOptions) {
+    this.api = api;
+    this.catalogOptions = catalogOptions;
+    this.reference = () -> loadReference(requestedRef, requestedHash);
+  }
+
+  public NessieApiV1 getApi() {
+    return api;
+  }
+
+  public UpdateableReference getRef() {
+    return reference.get();
+  }
+
+  public void refresh() throws NessieNotFoundException {
+    getRef().refresh(api);
+  }
+
+  public NessieIcebergClient withReference(String requestedRef, String hash) {
+    if (null == requestedRef) {
+      return this;
+    }
+    return new NessieIcebergClient(getApi(), requestedRef, hash, 
catalogOptions);
+  }
+
+  private UpdateableReference loadReference(String requestedRef, String hash) {
+    try {
+      Reference ref =
+          requestedRef == null ? api.getDefaultBranch() : 
api.getReference().refName(requestedRef).get();
+      if (hash != null) {
+        if (ref instanceof Branch) {
+          ref = Branch.of(ref.getName(), hash);
+        } else {
+          ref = Tag.of(ref.getName(), hash);
+        }
+      }
+      return new UpdateableReference(ref, hash != null);
+    } catch (NessieNotFoundException ex) {
+      if (requestedRef != null) {
+        throw new IllegalArgumentException(String.format("Nessie ref '%s' does 
not exist. This ref must exist " +
+            "before creating a NessieCatalog.", requestedRef), ex);
+      }
+
+      throw new IllegalArgumentException(String.format("Nessie does not have 
an existing default branch. " +
+              "Either configure an alternative ref via '%s' or create the 
default branch on the server.",
+              NessieConfigConstants.CONF_NESSIE_REF), ex);
+    }
+  }
+
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    try {
+      return api.getEntries()
+          .reference(getRef().getReference())
+          .get()
+          .getEntries()
+          .stream()
+          .filter(namespacePredicate(namespace))
+          .filter(e -> Content.Type.ICEBERG_TABLE == e.getType())
+          .map(this::toIdentifier)
+          .collect(Collectors.toList());
+    } catch (NessieNotFoundException ex) {
+      throw new NoSuchNamespaceException(ex, "Unable to list tables due to 
missing ref '%s'", getRef().getName());
+    }
+  }
+
+  private Predicate<EntriesResponse.Entry> namespacePredicate(Namespace ns) {
+    if (ns == null) {
+      return e -> true;
+    }
+
+    final List<String> namespace = Arrays.asList(ns.levels());
+    return e -> {
+      List<String> names = e.getName().getElements();
+
+      if (names.size() <= namespace.size()) {
+        return false;
+      }
+
+      return namespace.equals(names.subList(0, namespace.size()));
+    };
+  }
+
+  private TableIdentifier toIdentifier(EntriesResponse.Entry entry) {
+    List<String> elements = entry.getName().getElements();
+    return TableIdentifier.of(elements.toArray(new String[elements.size()]));
+  }
+
+  public IcebergTable table(TableIdentifier tableIdentifier) {
+    try {
+      ContentKey key = NessieUtil.toKey(tableIdentifier);
+      Content table = 
api.getContent().key(key).reference(getRef().getReference()).get().get(key);
+      return table != null ? table.unwrap(IcebergTable.class).orElse(null) : 
null;
+    } catch (NessieNotFoundException e) {
+      return null;
+    }
+  }
+
+  public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    try {
+      getRef().checkMutable();
+      getApi().createNamespace()
+          .reference(getRef().getReference())
+          .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
+          .create();
+      refresh();
+    } catch (NessieNamespaceAlreadyExistsException e) {
+      throw new AlreadyExistsException(e, "Namespace already exists: %s", 
namespace);
+    } catch (NessieNotFoundException e) {
+      throw new RuntimeException(String.format("Cannot create Namespace '%s': 
" +
+          "ref '%s' is no longer valid.", namespace, getRef().getName()), e);
+    }
+  }
+
+  public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
+    try {
+      GetNamespacesResponse response = getApi().getMultipleNamespaces()
+          .reference(getRef().getReference())
+          .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
+          .get();
+      return response.getNamespaces().stream()
+          .map(ns -> Namespace.of(ns.getElements().toArray(new String[0])))
+          .collect(Collectors.toList());
+    } catch (NessieReferenceNotFoundException e) {
+      throw new RuntimeException(
+          String.format("Cannot list Namespaces starting from '%s': " +
+              "ref '%s' is no longer valid.", namespace, getRef().getName()), 
e);
+    }
+  }
+
+  public boolean dropNamespace(Namespace namespace) throws 
NamespaceNotEmptyException {
+    try {
+      getRef().checkMutable();
+      getApi().deleteNamespace()
+          .reference(getRef().getReference())
+          .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
+          .delete();
+      refresh();
+      return true;
+    } catch (NessieNamespaceNotFoundException e) {
+      return false;
+    } catch (NessieNotFoundException e) {
+      LOG.error("Cannot drop Namespace '{}': ref '{}' is no longer valid.", 
namespace, getRef().getName(), e);
+      return false;
+    } catch (NessieNamespaceNotEmptyException e) {
+      throw new NamespaceNotEmptyException(e, "Namespace '%s' is not empty. 
One or more tables exist.", namespace);
+    }
+  }
+
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) throws 
NoSuchNamespaceException {
+    try {
+      getApi().getNamespace()
+          .reference(getRef().getReference())
+          .namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
+          .get();
+    } catch (NessieNamespaceNotFoundException e) {
+      throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", 
namespace);
+    } catch (NessieReferenceNotFoundException e) {
+      throw new RuntimeException(String.format("Cannot load Namespace '%s': " +
+          "ref '%s' is no longer valid.", namespace, getRef().getName()), e);
+    }
+    return ImmutableMap.of();
+  }
+
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    getRef().checkMutable();
+
+    IcebergTable existingFromTable = table(from);
+    if (existingFromTable == null) {
+      throw new NoSuchTableException("Table does not exist: %s", from.name());
+    }
+    IcebergTable existingToTable = table(to);
+    if (existingToTable != null) {
+      throw new AlreadyExistsException("Table already exists: %s", to.name());
+    }
+
+    CommitMultipleOperationsBuilder operations = 
getApi().commitMultipleOperations()
+        .commitMeta(NessieUtil.buildCommitMetadata(String.format("Iceberg 
rename table from '%s' to '%s'",
+            from, to), catalogOptions))
+        .operation(Operation.Put.of(NessieUtil.toKey(to), existingFromTable, 
existingFromTable))
+        .operation(Operation.Delete.of(NessieUtil.toKey(from)));
+
+    try {
+      Tasks.foreach(operations)
+          .retry(5)
+          .stopRetryOn(NessieNotFoundException.class)
+          .throwFailureWhenFinished()
+          .onFailure((o, exception) -> refresh())
+          .run(ops -> {
+            Branch branch = ops
+                .branch(getRef().getAsBranch())
+                .commit();
+            getRef().updateReference(branch);
+          }, BaseNessieClientServerException.class);
+    } catch (NessieNotFoundException e) {
+      // important note: the NotFoundException refers to the ref only. If a 
table was not found it would imply that the
+      // another commit has deleted the table from underneath us. This would 
arise as a Conflict exception as opposed to
+      // a not found exception. This is analogous to a merge conflict in git 
when a table has been changed by one user
+      // and removed by another.
+      throw new RuntimeException(String.format("Cannot rename table '%s' to 
'%s': " +
+          "ref '%s' no longer exists.", from.name(), to.name(), 
getRef().getName()), e);
+    } catch (BaseNessieClientServerException e) {
+      throw new CommitFailedException(e, "Cannot rename table '%s' to '%s': " +
+          "the current reference is not up to date.", from.name(), to.name());
+    } catch (HttpClientException ex) {
+      // Intentionally catch all nessie-client-exceptions here and not just 
the "timeout" variant
+      // to catch all kinds of network errors (e.g. connection reset). Network 
code implementation
+      // details and all kinds of network devices can induce unexpected 
behavior. So better be
+      // safe than sorry.
+      throw new CommitStateUnknownException(ex);

Review Comment:
   we can probably change this in a follow-up. This is just code that was moved 
from the `NessieCatalog` to this place



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to