rymurr commented on a change in pull request #1587:
URL: https://github.com/apache/iceberg/pull/1587#discussion_r511959931



##########
File path: build.gradle
##########
@@ -891,6 +924,49 @@ project(':iceberg-pig') {
   }
 }
 
+project(':iceberg-nessie') {
+  apply plugin: 'org.projectnessie'

Review comment:
       It uses `quarkusAppRunnerConfig` dependencies to discover the Nessie 
Quarkus server and its dependencies then uses that to start a server. Some of 
the operations to discover all runtime dependencies are non-trivial and require 
a full gradle dependency graph, hence why its non-trivial to do in a test 
suite. I believe the primary reason for all this is to facilitate easily 
building graalvm native images.
   
   See 
https://github.com/projectnessie/nessie/tree/main/tools/apprunner-gradle-plugin 
for the actual code

##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.api.TreeApi;
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.client.NessieClient.AuthType;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.EntriesResponse;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableDelete;
+import com.dremio.nessie.model.ImmutableMultiContents;
+import com.dremio.nessie.model.ImmutablePut;
+import com.dremio.nessie.model.MultiContents;
+import com.dremio.nessie.model.Reference;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+
+/**
+ * Nessie implementation of Iceberg Catalog.
+ */
+public class NessieCatalog extends BaseMetastoreCatalog implements 
AutoCloseable {
+
+  public static final String CONF_NESSIE_URL = "nessie.url";
+  public static final String CONF_NESSIE_USERNAME = "nessie.username";
+  public static final String CONF_NESSIE_PASSWORD = "nessie.password";
+  public static final String CONF_NESSIE_AUTH_TYPE = "nessie.auth_type";
+  public static final String NESSIE_AUTH_TYPE_DEFAULT = "BASIC";
+  public static final String CONF_NESSIE_REF = "nessie.ref";
+
+  private static final Joiner SLASH = Joiner.on("/");
+  private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = 
"iceberg/warehouse";
+  private final NessieClient client;
+  private final String warehouseLocation;
+  private final Configuration config;
+  private final UpdateableReference reference;
+  private final String name;
+
+  /**
+   * create a catalog from a hadoop configuration.
+   */
+  public NessieCatalog(Configuration config) {
+    this("nessie", config);
+  }
+
+  /**
+   * create a catalog from a hadoop configuration.
+   */
+  public NessieCatalog(Configuration config, String ref) {
+    this("nessie", config, ref);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config) {
+    this(name, config, null);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config, String ref) {
+    this(name, config, ref, null);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config, String ref, String 
url) {
+    this.config = config;
+    this.name = name;
+    String path = url == null ? config.get(CONF_NESSIE_URL) : url;
+    String username = config.get(CONF_NESSIE_USERNAME);
+    String password = config.get(CONF_NESSIE_PASSWORD);
+    String authTypeStr = config.get(CONF_NESSIE_AUTH_TYPE, 
NESSIE_AUTH_TYPE_DEFAULT);
+    AuthType authType = AuthType.valueOf(authTypeStr);
+    this.client = new NessieClient(authType, path, username, password);
+
+    warehouseLocation = getWarehouseLocation();
+
+    final String requestedRef = ref != null ? ref : 
config.get(CONF_NESSIE_REF);
+    this.reference = get(requestedRef);
+  }
+
+  private String getWarehouseLocation() {
+    String nessieWarehouseDir = config.get("nessie.warehouse.dir");

Review comment:
       I have cleaned this up a bit and tried to follow the pattern you 
suggested in #1640

##########
File path: 
nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableIcebergTable;
+import java.lang.reflect.Method;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileIO;
+
+/**
+ * Nessie implementation of Iceberg TableOperations.
+ */
+public class NessieTableOperations extends BaseMetastoreTableOperations {
+
+  private static Method sparkConfMethod;
+  private static Method appIdMethod;
+  private static Method sparkEnvMethod;
+
+  private final Configuration conf;
+  private final NessieClient client;
+  private final ContentsKey key;
+  private UpdateableReference reference;
+  private IcebergTable table;
+  private HadoopFileIO fileIO;
+
+  /**
+   * Create a nessie table operations given a table identifier.
+   */
+  public NessieTableOperations(
+      Configuration conf,
+      ContentsKey key,
+      UpdateableReference reference,
+      NessieClient client) {
+    this.conf = conf;
+    this.key = key;
+    this.reference = reference;
+    this.client = client;
+  }
+
+  @Override
+  protected void doRefresh() {
+    // break reference with parent (to avoid cross-over refresh)
+    // TODO, confirm this is correct behavior.
+    // reference = reference.copy();
+
+    reference.refresh();
+    String metadataLocation = null;
+    try {
+      Contents contents = client.getContentsApi().getContents(key, 
reference.getHash());
+      this.table = contents.unwrap(IcebergTable.class)
+          .orElseThrow(() -> new IllegalStateException("Nessie points to a 
non-Iceberg object for that path."));

Review comment:
       fixed

##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.api.TreeApi;
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.EntriesResponse;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableDelete;
+import com.dremio.nessie.model.ImmutableMultiContents;
+import com.dremio.nessie.model.ImmutablePut;
+import com.dremio.nessie.model.MultiContents;
+import com.dremio.nessie.model.Reference;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+
+/**
+ * Nessie implementation of Iceberg Catalog.
+ */
+public class NessieCatalog extends BaseMetastoreCatalog implements 
AutoCloseable {
+
+  private static final Joiner SLASH = Joiner.on("/");
+  private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = 
"iceberg/warehouse";
+  private final NessieClient client;
+  private final String warehouseLocation;
+  private final Configuration config;
+  private final UpdateableReference reference;
+  private final String name;
+
+  /**
+   * create a catalog from a hadoop configuration.
+   */
+  public NessieCatalog(Configuration config) {
+    this("nessie", config);
+  }
+
+  /**
+   * create a catalog from a hadoop configuration.
+   */
+  public NessieCatalog(Configuration config, String ref) {
+    this("nessie", config, ref);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config) {
+    this(name, config, null);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config, String ref) {
+    this(name, config, ref, null);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config, String ref, String 
url) {
+    this.config = config;
+    this.name = name;
+
+    this.client = NessieClient.withConfig(s -> {
+      if (s.equals(NessieClient.CONF_NESSIE_URL)) {
+        return url == null ? config.get(s) : url;
+      }
+      return config.get(s);
+    });
+
+    warehouseLocation = getWarehouseLocation();
+
+    final String requestedRef = ref != null ? ref : 
config.get(NessieClient.CONF_NESSIE_REF);
+    this.reference = get(requestedRef);
+  }
+
+  private String getWarehouseLocation() {
+    String nessieWarehouseDir = config.get("nessie.warehouse.dir");
+    if (nessieWarehouseDir != null) {
+      return nessieWarehouseDir;
+    }
+    String hiveWarehouseDir = config.get("hive.metastore.warehouse.dir");
+    if (hiveWarehouseDir != null) {
+      return hiveWarehouseDir;
+    }
+    String defaultFS = config.get("fs.defaultFS");
+    if (defaultFS != null) {
+      return defaultFS + "/" + ICEBERG_HADOOP_WAREHOUSE_BASE;
+    }
+    throw new IllegalStateException("Don't know where to put the nessie 
iceberg data. " +
+        "Please set one of the following:\n" +
+        "nessie.warehouse.dir\n" +
+        "hive.metastore.warehouse.dir\n" +
+        "fs.defaultFS.");
+  }
+
+  private UpdateableReference get(String requestedRef) {
+    try {
+      Reference ref = requestedRef == null ? 
client.getTreeApi().getDefaultBranch()
+          : client.getTreeApi().getReferenceByName(requestedRef);
+      return new UpdateableReference(ref, client.getTreeApi());
+    } catch (NessieNotFoundException ex) {
+      if (requestedRef != null) {
+        throw new IllegalArgumentException(String.format("Nessie ref '%s' 
provided via %s does not exist. " +
+          "This ref must exist before creating a NessieCatalog.", 
requestedRef, NessieClient.CONF_NESSIE_REF), ex);
+      }
+
+      throw new IllegalArgumentException(String.format("Nessie does not have 
an existing default branch." +
+        "Either configure an alternative ref via %s or create the default 
branch on the server.",
+          NessieClient.CONF_NESSIE_REF), ex);
+    }
+  }
+
+  @Override
+  public void close() {
+    client.close();
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  private static ContentsKey toKey(TableIdentifier tableIdentifier) {
+    List<String> identifiers = new ArrayList<>();
+    if (tableIdentifier.hasNamespace()) {
+      identifiers.addAll(Arrays.asList(tableIdentifier.namespace().levels()));
+    }
+    identifiers.add(tableIdentifier.name());
+
+    ContentsKey key = new ContentsKey(identifiers);
+    return key;
+  }
+
+  private IcebergTable table(TableIdentifier tableIdentifier) {
+    try {
+      Contents table = 
client.getContentsApi().getContents(toKey(tableIdentifier), 
reference.getHash());
+      if (table instanceof IcebergTable) {
+        return (IcebergTable) table;
+      }
+    } catch (NessieNotFoundException e) {
+      // ignore
+    }
+    return null;
+  }
+
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    ParsedTableIdentifier pti = 
ParsedTableIdentifier.getParsedTableIdentifier(tableIdentifier, new 
HashMap<>());
+    UpdateableReference newReference = this.reference;
+    if (pti.getReference() != null) {
+      newReference = get(pti.getReference());
+    }
+    return new NessieTableOperations(config,
+                                     toKey(pti.getTableIdentifier()),
+                                     newReference,
+                                     client);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier table) {
+    if (table.hasNamespace()) {
+      return SLASH.join(warehouseLocation, table.namespace().toString(), 
table.name());
+    }
+    return SLASH.join(warehouseLocation, table.name());
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    try {
+      return client.getTreeApi()
+          .getEntries(reference.getHash())
+          .getEntries()
+          .stream()
+          .filter(namespacePredicate(namespace))
+          .map(NessieCatalog::toIdentifier)
+          .collect(Collectors.toList());
+    } catch (NessieNotFoundException ex) {
+      throw new RuntimeException("Unable to list tables due to missing ref.", 
ex);

Review comment:
       :+1: 

##########
File path: build.gradle
##########
@@ -693,6 +709,8 @@ if (jdkVersion == '8') {
       // Vectorized reads need more memory
       maxHeapSize '2500m'
     }
+    // start and stop quarkus for nessie tests
+    tasks.test.dependsOn("quarkus-start").finalizedBy("quarkus-stop")

Review comment:
       Quarkus (which is the underlying http framework)  behaviour is slightly 
counterintuitive in that it doesn't offer an option to start Nessie like you 
can start the hive metastore. Hence we start it once per module and test suites 
are responsible for cleanup

##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.api.TreeApi;
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.client.NessieClient.AuthType;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.EntriesResponse;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableDelete;
+import com.dremio.nessie.model.ImmutableMultiContents;
+import com.dremio.nessie.model.ImmutablePut;
+import com.dremio.nessie.model.MultiContents;
+import com.dremio.nessie.model.Reference;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+
+/**
+ * Nessie implementation of Iceberg Catalog.
+ */
+public class NessieCatalog extends BaseMetastoreCatalog implements 
AutoCloseable {

Review comment:
       I will take another pass at this today, I can see totally valid reasons 
to support listing namespaces if they have tables in them. The problem as I see 
it comes from creating or deleting namespaces, and storing namespace metadata.
   
   * create/delete: in Nessie (similar to git) a namespace would be created 
implicitly with the first table in that namespace tree and deleted with the 
last table in that namespace tree. Separate crerate/delete options in nessie 
are either no-ops or require a dummy to be placed in that namespace. Both of 
which are odd operations. eg if its a no-op then creating namespace `foo.bar` 
then asking if `foo.bar` exists will return `false`.
   
   * namespace metadata: What is the use case envisioned for those operations? 
I think for Nessie we would start with the same behaviour as the hdfs catalog 
but am curious to know the benefit of supporting those apis.

##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
##########
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.api.TreeApi;
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.client.NessieClient.AuthType;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.EntriesResponse;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableDelete;
+import com.dremio.nessie.model.ImmutableMultiContents;
+import com.dremio.nessie.model.ImmutablePut;
+import com.dremio.nessie.model.MultiContents;
+import com.dremio.nessie.model.Reference;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+
+/**
+ * Nessie implementation of Iceberg Catalog.
+ */
+public class NessieCatalog extends BaseMetastoreCatalog implements 
AutoCloseable {

Review comment:
       Having another look we could add valid impls for `namespaceExists` and 
`listNamespaces` and do no-op or throw for the others. Then the clients can 
still navigate namespaces. Thoughts?

##########
File path: 
spark3/src/test/java/org/apache/iceberg/spark/sql/TestNamespaceSQL.java
##########
@@ -56,6 +56,8 @@ public void cleanNamespaces() {
 
   @Test
   public void testCreateNamespace() {
+    // Nessie namespaces are explicit and do not need to be explicitly managed
+    Assume.assumeFalse(catalogName.endsWith("testnessie"));

Review comment:
       Sure, the hadoop catalog is also skipped for most of these. Makes sense 
to have separate tests

##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.api.TreeApi;
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.EntriesResponse;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableDelete;
+import com.dremio.nessie.model.ImmutableMultiContents;
+import com.dremio.nessie.model.ImmutablePut;
+import com.dremio.nessie.model.MultiContents;
+import com.dremio.nessie.model.Reference;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+
+/**
+ * Nessie implementation of Iceberg Catalog.
+ */
+public class NessieCatalog extends BaseMetastoreCatalog implements 
AutoCloseable {
+
+  private static final Joiner SLASH = Joiner.on("/");
+  private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = 
"iceberg/warehouse";
+  private final NessieClient client;
+  private final String warehouseLocation;
+  private final Configuration config;
+  private final UpdateableReference reference;
+  private final String name;
+
+  /**
+   * create a catalog from a hadoop configuration.
+   */
+  public NessieCatalog(Configuration config) {
+    this("nessie", config);
+  }
+
+  /**
+   * create a catalog from a hadoop configuration.
+   */
+  public NessieCatalog(Configuration config, String ref) {
+    this("nessie", config, ref);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config) {
+    this(name, config, null);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config, String ref) {
+    this(name, config, ref, null);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config, String ref, String 
url) {
+    this.config = config;
+    this.name = name;
+
+    this.client = NessieClient.withConfig(s -> {
+      if (s.equals(NessieClient.CONF_NESSIE_URL)) {
+        return url == null ? config.get(s) : url;
+      }
+      return config.get(s);
+    });
+
+    warehouseLocation = getWarehouseLocation();
+
+    final String requestedRef = ref != null ? ref : 
config.get(NessieClient.CONF_NESSIE_REF);
+    this.reference = get(requestedRef);
+  }
+
+  private String getWarehouseLocation() {
+    String nessieWarehouseDir = config.get("nessie.warehouse.dir");
+    if (nessieWarehouseDir != null) {
+      return nessieWarehouseDir;
+    }
+    String hiveWarehouseDir = config.get("hive.metastore.warehouse.dir");
+    if (hiveWarehouseDir != null) {
+      return hiveWarehouseDir;
+    }
+    String defaultFS = config.get("fs.defaultFS");
+    if (defaultFS != null) {
+      return defaultFS + "/" + ICEBERG_HADOOP_WAREHOUSE_BASE;
+    }
+    throw new IllegalStateException("Don't know where to put the nessie 
iceberg data. " +
+        "Please set one of the following:\n" +
+        "nessie.warehouse.dir\n" +
+        "hive.metastore.warehouse.dir\n" +
+        "fs.defaultFS.");
+  }
+
+  private UpdateableReference get(String requestedRef) {
+    try {
+      Reference ref = requestedRef == null ? 
client.getTreeApi().getDefaultBranch()
+          : client.getTreeApi().getReferenceByName(requestedRef);
+      return new UpdateableReference(ref, client.getTreeApi());
+    } catch (NessieNotFoundException ex) {
+      if (requestedRef != null) {
+        throw new IllegalArgumentException(String.format("Nessie ref '%s' 
provided via %s does not exist. " +
+          "This ref must exist before creating a NessieCatalog.", 
requestedRef, NessieClient.CONF_NESSIE_REF), ex);
+      }
+
+      throw new IllegalArgumentException(String.format("Nessie does not have 
an existing default branch." +
+        "Either configure an alternative ref via %s or create the default 
branch on the server.",
+          NessieClient.CONF_NESSIE_REF), ex);
+    }
+  }
+
+  @Override
+  public void close() {
+    client.close();
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  private static ContentsKey toKey(TableIdentifier tableIdentifier) {
+    List<String> identifiers = new ArrayList<>();
+    if (tableIdentifier.hasNamespace()) {
+      identifiers.addAll(Arrays.asList(tableIdentifier.namespace().levels()));
+    }
+    identifiers.add(tableIdentifier.name());
+
+    ContentsKey key = new ContentsKey(identifiers);
+    return key;
+  }
+
+  private IcebergTable table(TableIdentifier tableIdentifier) {
+    try {
+      Contents table = 
client.getContentsApi().getContents(toKey(tableIdentifier), 
reference.getHash());
+      if (table instanceof IcebergTable) {
+        return (IcebergTable) table;
+      }
+    } catch (NessieNotFoundException e) {
+      // ignore
+    }
+    return null;
+  }
+
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    ParsedTableIdentifier pti = 
ParsedTableIdentifier.getParsedTableIdentifier(tableIdentifier, new 
HashMap<>());
+    UpdateableReference newReference = this.reference;
+    if (pti.getReference() != null) {
+      newReference = get(pti.getReference());
+    }
+    return new NessieTableOperations(config,
+                                     toKey(pti.getTableIdentifier()),
+                                     newReference,
+                                     client);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier table) {
+    if (table.hasNamespace()) {
+      return SLASH.join(warehouseLocation, table.namespace().toString(), 
table.name());
+    }
+    return SLASH.join(warehouseLocation, table.name());
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    try {
+      return client.getTreeApi()
+          .getEntries(reference.getHash())
+          .getEntries()
+          .stream()
+          .filter(namespacePredicate(namespace))
+          .map(NessieCatalog::toIdentifier)
+          .collect(Collectors.toList());

Review comment:
       Just checked and the contract is `Return all the identifiers under this 
namespace.` I took this to mean everything under this and all sub namespaces. 
If that was not the intention of the method I will fix the predicate.

##########
File path: nessie/src/main/java/org/apache/iceberg/nessie/NessieCatalog.java
##########
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.api.TreeApi;
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.EntriesResponse;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableDelete;
+import com.dremio.nessie.model.ImmutableMultiContents;
+import com.dremio.nessie.model.ImmutablePut;
+import com.dremio.nessie.model.MultiContents;
+import com.dremio.nessie.model.Reference;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.relocated.com.google.common.base.Joiner;
+
+/**
+ * Nessie implementation of Iceberg Catalog.
+ */
+public class NessieCatalog extends BaseMetastoreCatalog implements 
AutoCloseable {
+
+  private static final Joiner SLASH = Joiner.on("/");
+  private static final String ICEBERG_HADOOP_WAREHOUSE_BASE = 
"iceberg/warehouse";
+  private final NessieClient client;
+  private final String warehouseLocation;
+  private final Configuration config;
+  private final UpdateableReference reference;
+  private final String name;
+
+  /**
+   * create a catalog from a hadoop configuration.
+   */
+  public NessieCatalog(Configuration config) {
+    this("nessie", config);
+  }
+
+  /**
+   * create a catalog from a hadoop configuration.
+   */
+  public NessieCatalog(Configuration config, String ref) {
+    this("nessie", config, ref);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config) {
+    this(name, config, null);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config, String ref) {
+    this(name, config, ref, null);
+  }
+
+  /**
+   * Create a catalog with a known name from a hadoop configuration.
+   */
+  public NessieCatalog(String name, Configuration config, String ref, String 
url) {
+    this.config = config;
+    this.name = name;
+
+    this.client = NessieClient.withConfig(s -> {
+      if (s.equals(NessieClient.CONF_NESSIE_URL)) {
+        return url == null ? config.get(s) : url;
+      }
+      return config.get(s);
+    });
+
+    warehouseLocation = getWarehouseLocation();
+
+    final String requestedRef = ref != null ? ref : 
config.get(NessieClient.CONF_NESSIE_REF);
+    this.reference = get(requestedRef);
+  }
+
+  private String getWarehouseLocation() {
+    String nessieWarehouseDir = config.get("nessie.warehouse.dir");
+    if (nessieWarehouseDir != null) {
+      return nessieWarehouseDir;
+    }
+    String hiveWarehouseDir = config.get("hive.metastore.warehouse.dir");
+    if (hiveWarehouseDir != null) {
+      return hiveWarehouseDir;
+    }
+    String defaultFS = config.get("fs.defaultFS");
+    if (defaultFS != null) {
+      return defaultFS + "/" + ICEBERG_HADOOP_WAREHOUSE_BASE;
+    }
+    throw new IllegalStateException("Don't know where to put the nessie 
iceberg data. " +
+        "Please set one of the following:\n" +
+        "nessie.warehouse.dir\n" +
+        "hive.metastore.warehouse.dir\n" +
+        "fs.defaultFS.");
+  }
+
+  private UpdateableReference get(String requestedRef) {
+    try {
+      Reference ref = requestedRef == null ? 
client.getTreeApi().getDefaultBranch()
+          : client.getTreeApi().getReferenceByName(requestedRef);
+      return new UpdateableReference(ref, client.getTreeApi());
+    } catch (NessieNotFoundException ex) {
+      if (requestedRef != null) {
+        throw new IllegalArgumentException(String.format("Nessie ref '%s' 
provided via %s does not exist. " +
+          "This ref must exist before creating a NessieCatalog.", 
requestedRef, NessieClient.CONF_NESSIE_REF), ex);
+      }
+
+      throw new IllegalArgumentException(String.format("Nessie does not have 
an existing default branch." +
+        "Either configure an alternative ref via %s or create the default 
branch on the server.",
+          NessieClient.CONF_NESSIE_REF), ex);
+    }
+  }
+
+  @Override
+  public void close() {
+    client.close();
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  private static ContentsKey toKey(TableIdentifier tableIdentifier) {
+    List<String> identifiers = new ArrayList<>();
+    if (tableIdentifier.hasNamespace()) {
+      identifiers.addAll(Arrays.asList(tableIdentifier.namespace().levels()));
+    }
+    identifiers.add(tableIdentifier.name());
+
+    ContentsKey key = new ContentsKey(identifiers);
+    return key;
+  }
+
+  private IcebergTable table(TableIdentifier tableIdentifier) {
+    try {
+      Contents table = 
client.getContentsApi().getContents(toKey(tableIdentifier), 
reference.getHash());
+      if (table instanceof IcebergTable) {
+        return (IcebergTable) table;
+      }
+    } catch (NessieNotFoundException e) {
+      // ignore
+    }
+    return null;
+  }
+
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier tableIdentifier) {
+    ParsedTableIdentifier pti = 
ParsedTableIdentifier.getParsedTableIdentifier(tableIdentifier, new 
HashMap<>());
+    UpdateableReference newReference = this.reference;
+    if (pti.getReference() != null) {
+      newReference = get(pti.getReference());
+    }
+    return new NessieTableOperations(config,
+                                     toKey(pti.getTableIdentifier()),
+                                     newReference,
+                                     client);
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier table) {
+    if (table.hasNamespace()) {
+      return SLASH.join(warehouseLocation, table.namespace().toString(), 
table.name());
+    }
+    return SLASH.join(warehouseLocation, table.name());
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    try {
+      return client.getTreeApi()
+          .getEntries(reference.getHash())
+          .getEntries()
+          .stream()
+          .filter(namespacePredicate(namespace))
+          .map(NessieCatalog::toIdentifier)
+          .collect(Collectors.toList());

Review comment:
       You are correct, it will return everythiing in and below `namespace`. 
What is the contract supposed to be? Only tables in this namespace? 

##########
File path: 
spark3/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
##########
@@ -50,6 +50,7 @@ public TestCreateTableAsSelect(String catalogName, String 
implementation, Map<St
   @After
   public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
+    sql("DROP TABLE IF EXISTS %s", sourceName);

Review comment:
       The way I was running in the test made it get deleted on the backend 
nessie server but not in the cached spark context I will clean this up as part 
of the Spark rework

##########
File path: 
nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableIcebergTable;
+import java.lang.reflect.Method;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileIO;
+
+/**
+ * Nessie implementation of Iceberg TableOperations.
+ */
+public class NessieTableOperations extends BaseMetastoreTableOperations {
+
+  private static Method sparkConfMethod;
+  private static Method appIdMethod;
+  private static Method sparkEnvMethod;
+
+  private final Configuration conf;
+  private final NessieClient client;
+  private final ContentsKey key;
+  private UpdateableReference reference;
+  private IcebergTable table;
+  private HadoopFileIO fileIO;
+
+  /**
+   * Create a nessie table operations given a table identifier.
+   */
+  public NessieTableOperations(
+      Configuration conf,
+      ContentsKey key,
+      UpdateableReference reference,
+      NessieClient client) {
+    this.conf = conf;
+    this.key = key;
+    this.reference = reference;
+    this.client = client;
+  }
+
+  @Override
+  protected void doRefresh() {
+    // break reference with parent (to avoid cross-over refresh)
+    // TODO, confirm this is correct behavior.
+    // reference = reference.copy();
+
+    reference.refresh();
+    String metadataLocation = null;
+    try {
+      Contents contents = client.getContentsApi().getContents(key, 
reference.getHash());
+      this.table = contents.unwrap(IcebergTable.class)
+          .orElseThrow(() -> new IllegalStateException("Nessie points to a 
non-Iceberg object for that path."));
+      metadataLocation = table.getMetadataLocation();
+    } catch (NessieNotFoundException ex) {
+      this.table = null;
+    }
+    refreshFromMetadataLocation(metadataLocation, 2);
+  }
+
+  @Override
+  protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    reference.checkMutable();
+
+    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 
1);
+
+    try {
+      IcebergTable newTable = 
ImmutableIcebergTable.builder().metadataLocation(newMetadataLocation).build();
+      client.getContentsApi().setContents(key,
+                                          reference.getAsBranch().getName(),
+                                          reference.getHash(),
+                                          String.format("iceberg commit%s", 
applicationId()),
+                                          newTable);
+    } catch (NessieNotFoundException | NessieConflictException ex) {
+      io().deleteFile(newMetadataLocation);
+      throw new CommitFailedException(ex, "failed");
+    } catch (Throwable e) {
+      io().deleteFile(newMetadataLocation);
+      throw new RuntimeException("Unexpected commit exception", e);
+    }
+  }
+
+  @Override
+  public FileIO io() {
+    if (fileIO == null) {
+      fileIO = new HadoopFileIO(conf);
+    }
+
+    return fileIO;
+  }
+
+  /**
+   * try and get a Spark application id if one exists.
+   *
+   * <p>
+   *   We haven't figured out a general way to pass commit messages through to 
the Nessie committer yet.
+   *   This is hacky but gets the job done until we can have a more complete 
commit/audit log.
+   * </p>
+   */
+  private static String applicationId() {
+    try {
+      if (sparkConfMethod == null) {
+        Class sparkEnvClazz = Class.forName("org.apache.spark.SparkEnv");
+        sparkEnvMethod = sparkEnvClazz.getMethod("get");
+        Class sparkConfClazz = Class.forName("org.apache.spark.SparkConf");
+        sparkConfMethod = sparkEnvClazz.getMethod("conf");
+        appIdMethod = sparkConfClazz.getMethod("getAppId");

Review comment:
       :+1: 
   

##########
File path: 
nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableIcebergTable;
+import java.lang.reflect.Method;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileIO;
+
+/**
+ * Nessie implementation of Iceberg TableOperations.
+ */
+public class NessieTableOperations extends BaseMetastoreTableOperations {
+
+  private static Method sparkConfMethod;
+  private static Method appIdMethod;
+  private static Method sparkEnvMethod;
+
+  private final Configuration conf;
+  private final NessieClient client;
+  private final ContentsKey key;
+  private UpdateableReference reference;
+  private IcebergTable table;
+  private HadoopFileIO fileIO;
+
+  /**
+   * Create a nessie table operations given a table identifier.
+   */
+  public NessieTableOperations(
+      Configuration conf,
+      ContentsKey key,
+      UpdateableReference reference,
+      NessieClient client) {
+    this.conf = conf;
+    this.key = key;
+    this.reference = reference;
+    this.client = client;
+  }
+
+  @Override
+  protected void doRefresh() {
+    // break reference with parent (to avoid cross-over refresh)
+    // TODO, confirm this is correct behavior.
+    // reference = reference.copy();
+
+    reference.refresh();
+    String metadataLocation = null;
+    try {
+      Contents contents = client.getContentsApi().getContents(key, 
reference.getHash());
+      this.table = contents.unwrap(IcebergTable.class)
+          .orElseThrow(() -> new IllegalStateException("Nessie points to a 
non-Iceberg object for that path."));
+      metadataLocation = table.getMetadataLocation();
+    } catch (NessieNotFoundException ex) {
+      this.table = null;
+    }
+    refreshFromMetadataLocation(metadataLocation, 2);
+  }
+
+  @Override
+  protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    reference.checkMutable();
+
+    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 
1);
+
+    try {
+      IcebergTable newTable = 
ImmutableIcebergTable.builder().metadataLocation(newMetadataLocation).build();
+      client.getContentsApi().setContents(key,
+                                          reference.getAsBranch().getName(),
+                                          reference.getHash(),
+                                          String.format("iceberg commit%s", 
applicationId()),

Review comment:
       good eye, the first char of the `applicationId` is a newline. I've put 
no space between `commit` and `%s` to not have extra trailing whitespace in 
message.
   
   Also note that the handling of commit messages in nessie is still fairly 
primitive. This should get replaced by a structured object in the near future.

##########
File path: 
nessie/src/test/java/org/apache/iceberg/nessie/TestParsedTableIdentifier.java
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.client.NessieClient;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestParsedTableIdentifier {
+
+
+  @Test
+  public void noMarkings() {
+    String path = "foo";
+    ParsedTableIdentifier pti = 
ParsedTableIdentifier.getParsedTableIdentifier(path, new HashMap<>());
+    Assert.assertEquals(path, pti.getTableIdentifier().name());
+    Assert.assertNull(pti.getReference());
+    Assert.assertNull(pti.getTimestamp());
+  }
+
+  @Test
+  public void branchOnly() {
+    String path = "foo@bar";
+    ParsedTableIdentifier pti = 
ParsedTableIdentifier.getParsedTableIdentifier(path, new HashMap<>());
+    Assert.assertEquals("foo", pti.getTableIdentifier().name());
+    Assert.assertEquals("bar", pti.getReference());
+    Assert.assertNull(pti.getTimestamp());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void timestampOnly() {
+    String path = "foo#baz";
+    ParsedTableIdentifier.getParsedTableIdentifier(path, new HashMap<>());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void branchAndTimestamp() {
+    String path = "foo@bar#baz";
+    ParsedTableIdentifier.getParsedTableIdentifier(path, new HashMap<>());
+  }
+
+  @Test(expected = IllegalArgumentException.class)

Review comment:
       fixed

##########
File path: 
nessie/src/main/java/org/apache/iceberg/nessie/NessieTableOperations.java
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.nessie;
+
+import com.dremio.nessie.client.NessieClient;
+import com.dremio.nessie.error.NessieConflictException;
+import com.dremio.nessie.error.NessieNotFoundException;
+import com.dremio.nessie.model.Contents;
+import com.dremio.nessie.model.ContentsKey;
+import com.dremio.nessie.model.IcebergTable;
+import com.dremio.nessie.model.ImmutableIcebergTable;
+import java.lang.reflect.Method;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.hadoop.HadoopFileIO;
+import org.apache.iceberg.io.FileIO;
+
+/**
+ * Nessie implementation of Iceberg TableOperations.
+ */
+public class NessieTableOperations extends BaseMetastoreTableOperations {
+
+  private static Method sparkConfMethod;
+  private static Method appIdMethod;
+  private static Method sparkEnvMethod;
+
+  private final Configuration conf;
+  private final NessieClient client;
+  private final ContentsKey key;
+  private UpdateableReference reference;
+  private IcebergTable table;
+  private HadoopFileIO fileIO;
+
+  /**
+   * Create a nessie table operations given a table identifier.
+   */
+  public NessieTableOperations(
+      Configuration conf,
+      ContentsKey key,
+      UpdateableReference reference,
+      NessieClient client) {
+    this.conf = conf;
+    this.key = key;
+    this.reference = reference;
+    this.client = client;
+  }
+
+  @Override
+  protected void doRefresh() {
+    // break reference with parent (to avoid cross-over refresh)
+    // TODO, confirm this is correct behavior.
+    // reference = reference.copy();
+
+    reference.refresh();
+    String metadataLocation = null;
+    try {
+      Contents contents = client.getContentsApi().getContents(key, 
reference.getHash());
+      this.table = contents.unwrap(IcebergTable.class)
+          .orElseThrow(() -> new IllegalStateException("Nessie points to a 
non-Iceberg object for that path."));
+      metadataLocation = table.getMetadataLocation();
+    } catch (NessieNotFoundException ex) {
+      this.table = null;
+    }
+    refreshFromMetadataLocation(metadataLocation, 2);
+  }
+
+  @Override
+  protected void doCommit(TableMetadata base, TableMetadata metadata) {
+    reference.checkMutable();
+
+    String newMetadataLocation = writeNewMetadata(metadata, currentVersion() + 
1);
+
+    try {
+      IcebergTable newTable = 
ImmutableIcebergTable.builder().metadataLocation(newMetadataLocation).build();
+      client.getContentsApi().setContents(key,
+                                          reference.getAsBranch().getName(),
+                                          reference.getHash(),
+                                          String.format("iceberg commit%s", 
applicationId()),
+                                          newTable);
+    } catch (NessieNotFoundException | NessieConflictException ex) {

Review comment:
       good eye, cleaned up exception message and handled throwing better




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to