nastra commented on code in PR #7412:
URL: https://github.com/apache/iceberg/pull/7412#discussion_r1266322803


##########
gcp/src/test/java/org/apache/iceberg/gcp/biglake/BigLakeTableOperationsTest.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.biglake;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.AbortedException;
+import com.google.cloud.bigquery.biglake.v1.HiveTableOptions;
+import com.google.cloud.bigquery.biglake.v1.HiveTableOptions.StorageDescriptor;
+import com.google.cloud.bigquery.biglake.v1.Table;
+import com.google.cloud.bigquery.biglake.v1.TableName;
+import io.grpc.Status.Code;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Optional;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.ArgumentCaptor;
+
+public class BigLakeTableOperationsTest {
+
+  @TempDir private Path temp;
+
+  private static final String CATALOG_NAME = "iceberg";
+
+  private static final String GCP_PROJECT = "my-project";
+  private static final String GCP_REGION = "us";
+  private static final String CATALOG_ID = "biglake";
+  private static final String DB_ID = "db";
+  private static final String TABLE_ID = "tbl";
+  private static final TableName TABLE_NAME =
+      TableName.of(GCP_PROJECT, GCP_REGION, CATALOG_ID, DB_ID, TABLE_ID);
+  private static final TableIdentifier TABLE_IDENTIFIER = 
TableIdentifier.of(DB_ID, TABLE_ID);
+
+  private static final Schema SCHEMA =
+      new Schema(
+          required(1, "id", Types.IntegerType.get(), "unique ID"),
+          required(2, "data", Types.StringType.get()));
+
+  private BigLakeClient bigLakeClient = mock(BigLakeClient.class);
+  private BigLakeCatalog bigLakeCatalog;
+  private BigLakeTableOperations tableOps;
+
+  @BeforeEach
+  public void before() throws Exception {
+    ImmutableMap<String, String> properties =
+        ImmutableMap.of(
+            GCPProperties.PROJECT_ID,
+            GCP_PROJECT,
+            GCPProperties.REGION,
+            GCP_REGION,
+            CatalogProperties.WAREHOUSE_LOCATION,
+            temp.toFile().getAbsolutePath(),
+            GCPProperties.BIGLAKE_CATALOG_ID,
+            CATALOG_ID);
+
+    bigLakeCatalog = new BigLakeCatalog();
+    bigLakeCatalog.setConf(new Configuration());
+    bigLakeCatalog.initialize(CATALOG_NAME, properties, bigLakeClient);
+    tableOps = (BigLakeTableOperations) 
bigLakeCatalog.newTableOps(TABLE_IDENTIFIER);
+  }
+
+  @AfterEach
+  public void after() throws Exception {
+    bigLakeCatalog.close();
+  }
+
+  @Test
+  public void testDoCommitShouldUseEtagForUpdateTable() throws Exception {
+    when(bigLakeClient.getTable(TABLE_NAME))
+        .thenThrow(new NoSuchTableException("error message getTable"));
+    Table createdTable = createTestTable();
+
+    Table tableWithEtag = createdTable.toBuilder().setEtag("etag").build();
+    reset(bigLakeClient);
+    when(bigLakeClient.getTable(TABLE_NAME)).thenReturn(tableWithEtag, 
tableWithEtag);
+
+    org.apache.iceberg.Table loadedTable = 
bigLakeCatalog.loadTable(TABLE_IDENTIFIER);
+
+    when(bigLakeClient.updateTableParameters(any(), any(), 
any())).thenReturn(tableWithEtag);
+    loadedTable.updateSchema().addColumn("n", 
Types.IntegerType.get()).commit();
+
+    ArgumentCaptor<TableName> nameCaptor = 
ArgumentCaptor.forClass(TableName.class);
+    ArgumentCaptor<String> etagCaptor = ArgumentCaptor.forClass(String.class);
+    verify(bigLakeClient, times(1))
+        .updateTableParameters(nameCaptor.capture(), any(), 
etagCaptor.capture());
+    assertThat(nameCaptor.getValue()).isEqualTo(TABLE_NAME);
+    assertThat(etagCaptor.getValue()).isEqualTo("etag");
+  }
+
+  @Test
+  public void testDoCommitShouldFailWhenEtagMismatch() throws Exception {
+    when(bigLakeClient.getTable(TABLE_NAME))
+        .thenThrow(new NoSuchTableException("error message getTable"));
+    Table createdTable = createTestTable();
+
+    Table tableWithEtag = createdTable.toBuilder().setEtag("etag").build();
+    reset(bigLakeClient);
+    when(bigLakeClient.getTable(TABLE_NAME)).thenReturn(tableWithEtag, 
tableWithEtag);
+
+    org.apache.iceberg.Table loadedTable = 
bigLakeCatalog.loadTable(TABLE_IDENTIFIER);
+
+    when(bigLakeClient.updateTableParameters(any(), any(), any()))
+        .thenThrow(
+            new AbortedException(
+                new RuntimeException("error message etag mismatch"),
+                GrpcStatusCode.of(Code.ABORTED),
+                false));
+
+    assertThatThrownBy(
+            () -> loadedTable.updateSchema().addColumn("n", 
Types.IntegerType.get()).commit())
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessage("Updating table failed due to conflict updates (etag 
mismatch)");

Review Comment:
   minor: should this say `conflicting updates` rather than `conflict update`?



##########
gcp/src/test/java/org/apache/iceberg/gcp/biglake/BigLakeCatalogTest.java:
##########
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.biglake;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import com.google.api.gax.grpc.testing.MockGrpcService;
+import com.google.api.gax.grpc.testing.MockServiceHelper;
+import com.google.cloud.bigquery.biglake.v1.Catalog;
+import com.google.cloud.bigquery.biglake.v1.CatalogName;
+import com.google.cloud.bigquery.biglake.v1.Database;
+import com.google.cloud.bigquery.biglake.v1.DatabaseName;
+import com.google.cloud.bigquery.biglake.v1.HiveDatabaseOptions;
+import com.google.cloud.bigquery.biglake.v1.MetastoreServiceSettings;
+import com.google.cloud.bigquery.biglake.v1.Table;
+import com.google.cloud.bigquery.biglake.v1.TableName;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.CatalogTests;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class BigLakeCatalogTest extends CatalogTests<BigLakeCatalog> {
+
+  @TempDir private Path temp;
+
+  private static final String GCP_PROJECT = "my-project";
+  private static final String GCP_REGION = "us";
+  private static final String CATALOG_ID = "biglake";
+
+  // For tests using a BigLake catalog connecting to a mocked service.
+  private MockServiceHelper mockServiceHelper;
+  private BigLakeCatalog bigLakeCatalogUsingMockService;
+
+  // For tests using a BigLake catalog with a mocked client.
+  private BigLakeClient mockBigLakeClient = mock(BigLakeClient.class);
+  private BigLakeCatalog bigLakeCatalogUsingMockClient;
+
+  @BeforeEach
+  public void before() throws Exception {
+    mockServiceHelper =
+        new MockServiceHelper(
+            UUID.randomUUID().toString(),
+            Arrays.<MockGrpcService>asList(new MockMetastoreService()));
+    mockServiceHelper.start();
+
+    ImmutableMap<String, String> properties =
+        ImmutableMap.of(
+            GCPProperties.PROJECT_ID,
+            GCP_PROJECT,
+            GCPProperties.REGION,
+            GCP_REGION,
+            CatalogProperties.WAREHOUSE_LOCATION,
+            temp.toAbsolutePath().toString());
+
+    MetastoreServiceSettings settings =
+        MetastoreServiceSettings.newBuilder()
+            
.setTransportChannelProvider(mockServiceHelper.createChannelProvider())
+            .setCredentialsProvider(NoCredentialsProvider.create())
+            .build();
+
+    bigLakeCatalogUsingMockService = new BigLakeCatalog();
+    bigLakeCatalogUsingMockService.setConf(new Configuration());
+    bigLakeCatalogUsingMockService.initialize(CATALOG_ID, properties, new 
BigLakeClient(settings));
+
+    bigLakeCatalogUsingMockClient = new BigLakeCatalog();
+    bigLakeCatalogUsingMockClient.setConf(new Configuration());
+    bigLakeCatalogUsingMockClient.initialize(CATALOG_ID, properties, 
mockBigLakeClient);
+  }
+
+  @AfterEach
+  public void after() throws Exception {
+    if (bigLakeCatalogUsingMockService != null) {
+      bigLakeCatalogUsingMockService.close();
+    }
+
+    if (bigLakeCatalogUsingMockClient != null) {
+      bigLakeCatalogUsingMockClient.close();
+    }
+
+    if (mockServiceHelper != null) {
+      mockServiceHelper.stop();
+    }
+  }
+
+  @Override
+  protected boolean requiresNamespaceCreate() {
+    return true;
+  }
+
+  @Override
+  protected BigLakeCatalog catalog() {
+    return bigLakeCatalogUsingMockService;
+  }
+
+  @Override
+  protected boolean supportsNamesWithSlashes() {
+    return false;
+  }
+
+  @Test
+  public void testDefaultWarehouseWithDatabaseLocation() {
+    when(mockBigLakeClient.getDatabase(DatabaseName.of(GCP_PROJECT, 
GCP_REGION, CATALOG_ID, "db")))
+        .thenReturn(
+            Database.newBuilder()
+                
.setHiveOptions(HiveDatabaseOptions.newBuilder().setLocationUri("db_folder"))
+                .build());
+
+    assertThat(
+            bigLakeCatalogUsingMockClient.defaultWarehouseLocation(
+                TableIdentifier.of("db", "table")))
+        .isEqualTo("db_folder/table");
+  }
+
+  @Test
+  public void testDefaultWarehouseWithoutDatabaseLocation() {
+    when(mockBigLakeClient.getDatabase(DatabaseName.of(GCP_PROJECT, "us", 
CATALOG_ID, "db")))
+        .thenReturn(
+            
Database.newBuilder().setHiveOptions(HiveDatabaseOptions.getDefaultInstance()).build());
+
+    assertThat(
+            bigLakeCatalogUsingMockClient.defaultWarehouseLocation(
+                TableIdentifier.of("db", "table")))
+        .isEqualTo(temp.toAbsolutePath().toString() + "/db.db/table");
+  }
+
+  @Test
+  public void testRenameTableToDifferentDatabaseShouldFail() {
+    assertThatThrownBy(
+            () ->
+                bigLakeCatalogUsingMockClient.renameTable(
+                    TableIdentifier.of("db0", "t1"), TableIdentifier.of("db1", 
"t2")))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Cannot rename table db0.t1 to db1.t2: database must 
match");
+  }
+
+  @Test
+  public void testCreateNamespaceShouldCreateCatalogWhenNamespaceIsEmpty() {
+    bigLakeCatalogUsingMockClient.createNamespace(Namespace.empty(), 
ImmutableMap.of());
+    verify(mockBigLakeClient, times(1))
+        .createCatalog(
+            CatalogName.of(GCP_PROJECT, GCP_REGION, CATALOG_ID), 
Catalog.getDefaultInstance());
+  }
+
+  @Test
+  public void testCreateNamespaceShouldFailWhenInvalid() {
+    assertThatThrownBy(
+            () ->
+                bigLakeCatalogUsingMockClient.createNamespace(
+                    Namespace.of("n0", "n1"), ImmutableMap.of()))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessage("Invalid namespace (too long): n0.n1");
+  }
+
+  @Test
+  public void testListNamespacesShouldReturnEmptyWhenNamespaceIsNotEmpty() {
+    
assertThat(bigLakeCatalogUsingMockClient.listNamespaces(Namespace.of("db"))).isEmpty();
+  }
+
+  @Test
+  public void testDropNamespaceShouldDeleteCatalogWhenEmptyNamespace() {
+    bigLakeCatalogUsingMockClient.dropNamespace(Namespace.empty());
+    verify(mockBigLakeClient, times(1))
+        .deleteCatalog(CatalogName.of(GCP_PROJECT, GCP_REGION, CATALOG_ID));
+  }
+
+  @Test
+  public void testListTablesShouldListTablesInAllDbsWhenNamespaceIsEmpty() {
+    DatabaseName db1Name = DatabaseName.of(GCP_PROJECT, GCP_REGION, 
CATALOG_ID, "db1");
+    DatabaseName db2Name = DatabaseName.of(GCP_PROJECT, GCP_REGION, 
CATALOG_ID, "db2");
+
+    TableName table1Name = TableName.of(GCP_PROJECT, GCP_REGION, CATALOG_ID, 
"db1", "tbl1");
+    TableName table2Name = TableName.of(GCP_PROJECT, GCP_REGION, CATALOG_ID, 
"db1", "tbl2");
+    TableName table3Name = TableName.of(GCP_PROJECT, GCP_REGION, CATALOG_ID, 
"db2", "tbl3");
+
+    when(mockBigLakeClient.listDatabases(CatalogName.of(GCP_PROJECT, 
GCP_REGION, CATALOG_ID)))
+        .thenReturn(
+            ImmutableList.of(
+                Database.newBuilder().setName(db1Name.toString()).build(),
+                Database.newBuilder().setName(db2Name.toString()).build()));
+
+    when(mockBigLakeClient.listTables(db1Name))
+        .thenReturn(
+            ImmutableList.of(
+                Table.newBuilder().setName(table1Name.toString()).build(),
+                Table.newBuilder().setName(table2Name.toString()).build()));
+    when(mockBigLakeClient.listTables(db2Name))
+        
.thenReturn(ImmutableList.of(Table.newBuilder().setName(table3Name.toString()).build()));
+
+    List<TableIdentifier> result = 
bigLakeCatalogUsingMockClient.listTables(Namespace.empty());
+    assertThat(result)
+        .containsExactlyInAnyOrder(
+            TableIdentifier.of("db1", "tbl1"),
+            TableIdentifier.of("db1", "tbl2"),
+            TableIdentifier.of("db2", "tbl3"));
+  }
+
+  @Test
+  public void testDropNamespaceShouldFailWhenNamespaceIsTooLong() {

Review Comment:
   I think the name here is misleading, because as a caller of this method, 
nothing is failing. Maybe `dropTooLongNamespace()`?



##########
gcp/src/test/java/org/apache/iceberg/gcp/biglake/BigLakeTableOperationsTest.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.biglake;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.AbortedException;
+import com.google.cloud.bigquery.biglake.v1.HiveTableOptions;
+import com.google.cloud.bigquery.biglake.v1.HiveTableOptions.StorageDescriptor;
+import com.google.cloud.bigquery.biglake.v1.Table;
+import com.google.cloud.bigquery.biglake.v1.TableName;
+import io.grpc.Status.Code;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Optional;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.ArgumentCaptor;
+
+public class BigLakeTableOperationsTest {
+
+  @TempDir private Path temp;
+
+  private static final String CATALOG_NAME = "iceberg";
+
+  private static final String GCP_PROJECT = "my-project";
+  private static final String GCP_REGION = "us";
+  private static final String CATALOG_ID = "biglake";
+  private static final String DB_ID = "db";
+  private static final String TABLE_ID = "tbl";
+  private static final TableName TABLE_NAME =
+      TableName.of(GCP_PROJECT, GCP_REGION, CATALOG_ID, DB_ID, TABLE_ID);
+  private static final TableIdentifier TABLE_IDENTIFIER = 
TableIdentifier.of(DB_ID, TABLE_ID);
+
+  private static final Schema SCHEMA =
+      new Schema(
+          required(1, "id", Types.IntegerType.get(), "unique ID"),
+          required(2, "data", Types.StringType.get()));
+
+  private BigLakeClient bigLakeClient = mock(BigLakeClient.class);
+  private BigLakeCatalog bigLakeCatalog;
+  private BigLakeTableOperations tableOps;
+
+  @BeforeEach
+  public void before() throws Exception {

Review Comment:
   `throws Exception` is redundant here



##########
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeTableOperations.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.biglake;
+
+import com.google.api.gax.rpc.AbortedException;
+import com.google.cloud.bigquery.biglake.v1.HiveTableOptions;
+import com.google.cloud.bigquery.biglake.v1.HiveTableOptions.SerDeInfo;
+import com.google.cloud.bigquery.biglake.v1.HiveTableOptions.StorageDescriptor;
+import com.google.cloud.bigquery.biglake.v1.Table;
+import com.google.cloud.bigquery.biglake.v1.TableName;
+import java.util.Map;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.NoSuchIcebergTableException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Handles BigLake table operations. */
+public final class BigLakeTableOperations extends BaseMetastoreTableOperations 
{
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BigLakeTableOperations.class);
+
+  private final BigLakeClient client;
+  private final FileIO io;
+  // The name of this Iceberg catalog plugin.
+  private final String name;
+  private final TableName tableName;
+
+  BigLakeTableOperations(BigLakeClient client, FileIO io, String name, 
TableName tableName) {
+    this.client = client;
+    this.io = io;
+    this.name = name;
+    this.tableName = tableName;
+  }
+
+  // The doRefresh method should provide implementation on how to get the 
metadata location
+  @Override
+  public void doRefresh() {
+    // Must default to null.
+    String metadataLocation = null;
+    try {
+      HiveTableOptions hiveOptions = 
client.getTable(tableName).getHiveOptions();
+      if (!hiveOptions.containsParameters(METADATA_LOCATION_PROP)) {
+        throw new NoSuchIcebergTableException(
+            "Invalid Iceberg table %s: missing metadata location", 
tableName());
+      }
+
+      metadataLocation = 
hiveOptions.getParametersOrThrow(METADATA_LOCATION_PROP);
+    } catch (NoSuchTableException e) {
+      if (currentMetadataLocation() != null) {
+        // Re-throws the exception because the table must exist in this case.
+        throw e;
+      }
+    }
+
+    refreshFromMetadataLocation(metadataLocation);
+  }
+
+  // The doCommit method should provide implementation on how to update with 
metadata location
+  // atomically
+  @Override
+  public void doCommit(TableMetadata base, TableMetadata metadata) {

Review Comment:
   I think this method should stay `protected`



##########
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeTableOperations.java:
##########
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.biglake;
+
+import com.google.api.gax.rpc.AbortedException;
+import com.google.cloud.bigquery.biglake.v1.HiveTableOptions;
+import com.google.cloud.bigquery.biglake.v1.HiveTableOptions.SerDeInfo;
+import com.google.cloud.bigquery.biglake.v1.HiveTableOptions.StorageDescriptor;
+import com.google.cloud.bigquery.biglake.v1.Table;
+import com.google.cloud.bigquery.biglake.v1.TableName;
+import java.util.Map;
+import org.apache.iceberg.BaseMetastoreTableOperations;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.CommitStateUnknownException;
+import org.apache.iceberg.exceptions.NoSuchIcebergTableException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Handles BigLake table operations. */
+public final class BigLakeTableOperations extends BaseMetastoreTableOperations 
{

Review Comment:
   does this need to be public final? I think it can be just package-protected



##########
gcp/src/test/java/org/apache/iceberg/gcp/biglake/BigLakeTableOperationsTest.java:
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.biglake;
+
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.api.gax.grpc.GrpcStatusCode;
+import com.google.api.gax.rpc.AbortedException;
+import com.google.cloud.bigquery.biglake.v1.HiveTableOptions;
+import com.google.cloud.bigquery.biglake.v1.HiveTableOptions.StorageDescriptor;
+import com.google.cloud.bigquery.biglake.v1.Table;
+import com.google.cloud.bigquery.biglake.v1.TableName;
+import io.grpc.Status.Code;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Optional;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.filefilter.TrueFileFilter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.CommitFailedException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.ArgumentCaptor;
+
+public class BigLakeTableOperationsTest {
+
+  @TempDir private Path temp;
+
+  private static final String CATALOG_NAME = "iceberg";
+
+  private static final String GCP_PROJECT = "my-project";
+  private static final String GCP_REGION = "us";
+  private static final String CATALOG_ID = "biglake";
+  private static final String DB_ID = "db";
+  private static final String TABLE_ID = "tbl";
+  private static final TableName TABLE_NAME =
+      TableName.of(GCP_PROJECT, GCP_REGION, CATALOG_ID, DB_ID, TABLE_ID);
+  private static final TableIdentifier TABLE_IDENTIFIER = 
TableIdentifier.of(DB_ID, TABLE_ID);
+
+  private static final Schema SCHEMA =
+      new Schema(
+          required(1, "id", Types.IntegerType.get(), "unique ID"),
+          required(2, "data", Types.StringType.get()));
+
+  private BigLakeClient bigLakeClient = mock(BigLakeClient.class);
+  private BigLakeCatalog bigLakeCatalog;
+  private BigLakeTableOperations tableOps;
+
+  @BeforeEach
+  public void before() throws Exception {
+    ImmutableMap<String, String> properties =
+        ImmutableMap.of(
+            GCPProperties.PROJECT_ID,
+            GCP_PROJECT,
+            GCPProperties.REGION,
+            GCP_REGION,
+            CatalogProperties.WAREHOUSE_LOCATION,
+            temp.toFile().getAbsolutePath(),
+            GCPProperties.BIGLAKE_CATALOG_ID,
+            CATALOG_ID);
+
+    bigLakeCatalog = new BigLakeCatalog();
+    bigLakeCatalog.setConf(new Configuration());
+    bigLakeCatalog.initialize(CATALOG_NAME, properties, bigLakeClient);
+    tableOps = (BigLakeTableOperations) 
bigLakeCatalog.newTableOps(TABLE_IDENTIFIER);
+  }
+
+  @AfterEach
+  public void after() throws Exception {
+    bigLakeCatalog.close();
+  }
+
+  @Test
+  public void testDoCommitShouldUseEtagForUpdateTable() throws Exception {
+    when(bigLakeClient.getTable(TABLE_NAME))
+        .thenThrow(new NoSuchTableException("error message getTable"));
+    Table createdTable = createTestTable();
+
+    Table tableWithEtag = createdTable.toBuilder().setEtag("etag").build();
+    reset(bigLakeClient);
+    when(bigLakeClient.getTable(TABLE_NAME)).thenReturn(tableWithEtag, 
tableWithEtag);
+
+    org.apache.iceberg.Table loadedTable = 
bigLakeCatalog.loadTable(TABLE_IDENTIFIER);
+
+    when(bigLakeClient.updateTableParameters(any(), any(), 
any())).thenReturn(tableWithEtag);
+    loadedTable.updateSchema().addColumn("n", 
Types.IntegerType.get()).commit();
+
+    ArgumentCaptor<TableName> nameCaptor = 
ArgumentCaptor.forClass(TableName.class);
+    ArgumentCaptor<String> etagCaptor = ArgumentCaptor.forClass(String.class);
+    verify(bigLakeClient, times(1))
+        .updateTableParameters(nameCaptor.capture(), any(), 
etagCaptor.capture());
+    assertThat(nameCaptor.getValue()).isEqualTo(TABLE_NAME);
+    assertThat(etagCaptor.getValue()).isEqualTo("etag");
+  }
+
+  @Test
+  public void testDoCommitShouldFailWhenEtagMismatch() throws Exception {
+    when(bigLakeClient.getTable(TABLE_NAME))
+        .thenThrow(new NoSuchTableException("error message getTable"));
+    Table createdTable = createTestTable();
+
+    Table tableWithEtag = createdTable.toBuilder().setEtag("etag").build();
+    reset(bigLakeClient);
+    when(bigLakeClient.getTable(TABLE_NAME)).thenReturn(tableWithEtag, 
tableWithEtag);
+
+    org.apache.iceberg.Table loadedTable = 
bigLakeCatalog.loadTable(TABLE_IDENTIFIER);
+
+    when(bigLakeClient.updateTableParameters(any(), any(), any()))
+        .thenThrow(
+            new AbortedException(
+                new RuntimeException("error message etag mismatch"),
+                GrpcStatusCode.of(Code.ABORTED),
+                false));
+
+    assertThatThrownBy(
+            () -> loadedTable.updateSchema().addColumn("n", 
Types.IntegerType.get()).commit())
+        .isInstanceOf(CommitFailedException.class)
+        .hasMessage("Updating table failed due to conflict updates (etag 
mismatch)");
+  }
+
+  @Test
+  public void testInitRefreshShouldReturnNullForNonIcebergTable() {
+    when(bigLakeClient.getTable(TABLE_NAME))
+        .thenReturn(Table.newBuilder().setName(TABLE_NAME.toString()).build());
+
+    assertThat(tableOps.refresh()).isNull();
+  }
+
+  @Test
+  public void testTableName() {
+    assertThat(tableOps.tableName()).isEqualTo("iceberg.db.tbl");
+  }
+
+  private Table createTestTable() throws IOException {
+    TableIdentifier tableIdent =
+        TableIdentifier.of(TABLE_NAME.getDatabase(), TABLE_NAME.getTable());
+    String tableDir =
+        new File(temp.toFile().getAbsolutePath(), 
TABLE_NAME.getTable()).getAbsolutePath();
+
+    bigLakeCatalog
+        .buildTable(tableIdent, SCHEMA)
+        .withLocation(tableDir)
+        .createTransaction()
+        .commitTransaction();
+
+    Optional<String> metadataLocation = getAnyJsonFilePath(tableDir);
+    assertThat(metadataLocation).isPresent();
+    return Table.newBuilder()
+        .setName(TABLE_NAME.toString())
+        .setHiveOptions(
+            HiveTableOptions.newBuilder()
+                .putParameters("metadata_location", metadataLocation.get())

Review Comment:
   ```suggestion
                   
.putParameters(BaseMetastoreTableOperations.METADATA_LOCATION_PROP, 
metadataLocation.get())
   ```



##########
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeCatalog.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.biglake;
+
+import com.google.cloud.bigquery.biglake.v1.Catalog;
+import com.google.cloud.bigquery.biglake.v1.CatalogName;
+import com.google.cloud.bigquery.biglake.v1.Database;
+import com.google.cloud.bigquery.biglake.v1.DatabaseName;
+import com.google.cloud.bigquery.biglake.v1.HiveDatabaseOptions;
+import com.google.cloud.bigquery.biglake.v1.Table;
+import com.google.cloud.bigquery.biglake.v1.TableName;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ServiceFailureException;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.ResolvingFileIO;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.LocationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Iceberg BigLake Metastore (BLMS) Catalog implementation. */
+public final class BigLakeCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  public static final String DEFAULT_BIGLAKE_SERVICE_ENDPOINT = 
"biglake.googleapis.com:443";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BigLakeCatalog.class);
+
+  // The name of this Iceberg catalog plugin.
+  private String name;
+  private Map<String, String> properties;
+  private FileIO io;
+  private Object conf;
+
+  private String projectId;
+  private String region;
+  // BLMS catalog ID and fully qualified name.
+  private String catalogId;
+  private CatalogName catalogName;
+  private BigLakeClient client;
+
+  private CloseableGroup closeableGroup;
+
+  // Must have a no-arg constructor to be dynamically loaded
+  // initialize(String name, Map<String, String> properties) will be called to 
complete
+  // initialization
+  public BigLakeCatalog() {}
+
+  @Override
+  public void initialize(String initName, Map<String, String> initProperties) {
+    BigLakeClient initClient;
+    try {
+      // CatalogProperties.URI specifies the endpoint of BigLake API.
+      // TODO: to add more auth options of the client. Currently it uses 
default auth
+      // 
(https://github.com/googleapis/google-cloud-java#application-default-credentials)
+      // that works on GCP services (e.g., GCE, GKE, Dataproc).
+      initClient =
+          new BigLakeClient(
+              initProperties.getOrDefault(CatalogProperties.URI, 
DEFAULT_BIGLAKE_SERVICE_ENDPOINT));
+    } catch (IOException e) {
+      throw new ServiceFailureException(e, "Creating BigLake client failed");
+    }
+
+    initialize(initName, initProperties, initClient);
+  }
+
+  @VisibleForTesting
+  void initialize(String initName, Map<String, String> initProperties, 
BigLakeClient initClient) {
+    this.name = initName;
+    this.properties = ImmutableMap.copyOf(initProperties);
+
+    Preconditions.checkArgument(initClient != null, "BigLake client must not 
be null");
+    this.client = initClient;
+
+    Preconditions.checkArgument(
+        properties.containsKey(GCPProperties.PROJECT_ID), "GCP project ID must 
be specified");
+    this.projectId = properties.get(GCPProperties.PROJECT_ID);
+
+    Preconditions.checkArgument(
+        properties.containsKey(GCPProperties.REGION), "GCP region must be 
specified");
+    this.region = properties.get(GCPProperties.REGION);
+
+    // Users can specify the BigLake catalog ID, otherwise catalog plugin name 
will be used.
+    // For example, 
"spark.sql.catalog.<plugin-name>=org.apache.iceberg.spark.SparkCatalog"
+    // specifies the plugin name "<plugin-name>".
+    this.catalogId = properties.getOrDefault(GCPProperties.BIGLAKE_CATALOG_ID, 
initName);
+    this.catalogName = CatalogName.of(projectId, region, catalogId);
+
+    String ioImpl =
+        properties.getOrDefault(CatalogProperties.FILE_IO_IMPL, 
ResolvingFileIO.class.getName());
+    this.io = CatalogUtil.loadFileIO(ioImpl, properties, conf);
+
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(io);
+    closeableGroup.setSuppressCloseFailure(true);

Review Comment:
   should the client also be added to the closeable group? Right now 
`BigLakeClient` doesn't have a `close()` method but I think it should implement 
`AutoCloseable`, because the `MetastoreServiceClient` in  `BigLakeClient` is 
also `AutoCloseable`



##########
gcp/src/main/java/org/apache/iceberg/gcp/biglake/BigLakeCatalog.java:
##########
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.biglake;
+
+import com.google.cloud.bigquery.biglake.v1.Catalog;
+import com.google.cloud.bigquery.biglake.v1.CatalogName;
+import com.google.cloud.bigquery.biglake.v1.Database;
+import com.google.cloud.bigquery.biglake.v1.DatabaseName;
+import com.google.cloud.bigquery.biglake.v1.HiveDatabaseOptions;
+import com.google.cloud.bigquery.biglake.v1.Table;
+import com.google.cloud.bigquery.biglake.v1.TableName;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.TableMetadata;
+import org.apache.iceberg.TableOperations;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.SupportsNamespaces;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.iceberg.exceptions.ServiceFailureException;
+import org.apache.iceberg.gcp.GCPProperties;
+import org.apache.iceberg.hadoop.Configurable;
+import org.apache.iceberg.io.CloseableGroup;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.ResolvingFileIO;
+import 
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.base.Strings;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
+import org.apache.iceberg.util.LocationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Iceberg BigLake Metastore (BLMS) Catalog implementation. */
+public final class BigLakeCatalog extends BaseMetastoreCatalog
+    implements Closeable, SupportsNamespaces, Configurable<Object> {
+
+  public static final String DEFAULT_BIGLAKE_SERVICE_ENDPOINT = 
"biglake.googleapis.com:443";
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BigLakeCatalog.class);
+
+  // The name of this Iceberg catalog plugin.
+  private String name;
+  private Map<String, String> properties;
+  private FileIO io;
+  private Object conf;
+
+  private String projectId;
+  private String region;
+  // BLMS catalog ID and fully qualified name.
+  private String catalogId;
+  private CatalogName catalogName;
+  private BigLakeClient client;
+
+  private CloseableGroup closeableGroup;
+
+  // Must have a no-arg constructor to be dynamically loaded
+  // initialize(String name, Map<String, String> properties) will be called to 
complete
+  // initialization
+  public BigLakeCatalog() {}
+
+  @Override
+  public void initialize(String initName, Map<String, String> initProperties) {
+    BigLakeClient initClient;
+    try {
+      // CatalogProperties.URI specifies the endpoint of BigLake API.
+      // TODO: to add more auth options of the client. Currently it uses 
default auth
+      // 
(https://github.com/googleapis/google-cloud-java#application-default-credentials)
+      // that works on GCP services (e.g., GCE, GKE, Dataproc).
+      initClient =
+          new BigLakeClient(
+              initProperties.getOrDefault(CatalogProperties.URI, 
DEFAULT_BIGLAKE_SERVICE_ENDPOINT));
+    } catch (IOException e) {
+      throw new ServiceFailureException(e, "Creating BigLake client failed");
+    }
+
+    initialize(initName, initProperties, initClient);
+  }
+
+  @VisibleForTesting
+  void initialize(String initName, Map<String, String> initProperties, 
BigLakeClient initClient) {
+    this.name = initName;
+    this.properties = ImmutableMap.copyOf(initProperties);
+
+    Preconditions.checkArgument(initClient != null, "BigLake client must not 
be null");
+    this.client = initClient;
+
+    Preconditions.checkArgument(
+        properties.containsKey(GCPProperties.PROJECT_ID), "GCP project ID must 
be specified");
+    this.projectId = properties.get(GCPProperties.PROJECT_ID);
+
+    Preconditions.checkArgument(
+        properties.containsKey(GCPProperties.REGION), "GCP region must be 
specified");
+    this.region = properties.get(GCPProperties.REGION);
+
+    // Users can specify the BigLake catalog ID, otherwise catalog plugin name 
will be used.
+    // For example, 
"spark.sql.catalog.<plugin-name>=org.apache.iceberg.spark.SparkCatalog"
+    // specifies the plugin name "<plugin-name>".
+    this.catalogId = properties.getOrDefault(GCPProperties.BIGLAKE_CATALOG_ID, 
initName);
+    this.catalogName = CatalogName.of(projectId, region, catalogId);
+
+    String ioImpl =
+        properties.getOrDefault(CatalogProperties.FILE_IO_IMPL, 
ResolvingFileIO.class.getName());
+    this.io = CatalogUtil.loadFileIO(ioImpl, properties, conf);
+
+    this.closeableGroup = new CloseableGroup();
+    closeableGroup.addCloseable(io);
+    closeableGroup.setSuppressCloseFailure(true);
+  }
+
+  @Override
+  protected TableOperations newTableOps(TableIdentifier identifier) {
+    String db = databaseId(identifier.namespace());
+    if (db == null) {
+      throwInvalidDbNamespaceError(identifier.namespace());
+    }
+
+    return new BigLakeTableOperations(client, io, name(), tableName(db, 
identifier.name()));
+  }
+
+  @Override
+  protected String defaultWarehouseLocation(TableIdentifier identifier) {
+    String db = databaseId(identifier.namespace());
+    if (db == null) {
+      throwInvalidDbNamespaceError(identifier.namespace());
+    }
+
+    String locationUri = 
loadDatabase(identifier.namespace()).getHiveOptions().getLocationUri();
+    return String.format(
+        "%s/%s",
+        Strings.isNullOrEmpty(locationUri) ? databaseLocation(db) : 
locationUri, identifier.name());
+  }
+
+  @Override
+  public List<TableIdentifier> listTables(Namespace namespace) {
+    ImmutableList<DatabaseName> dbNames = ImmutableList.of();
+    if (namespace.isEmpty()) {
+      dbNames =
+          Streams.stream(client.listDatabases(catalogName))
+              .map(db -> DatabaseName.parse(db.getName()))
+              .collect(ImmutableList.toImmutableList());
+    } else if (namespace.levels().length == 1) {
+      dbNames = ImmutableList.of(databaseName(namespace));
+    } else {
+      throwInvalidDbNamespaceError(namespace);
+    }
+
+    ImmutableList.Builder<TableIdentifier> result = ImmutableList.builder();
+    dbNames.stream()
+        .map(
+            dbName ->
+                Streams.stream(client.listTables(dbName))
+                    .map(BigLakeCatalog::tableIdentifier)
+                    .collect(ImmutableList.toImmutableList()))
+        .forEach(result::addAll);
+    return result.build();
+  }
+
+  @Override
+  public boolean dropTable(TableIdentifier identifier, boolean purge) {
+    String db = databaseId(identifier.namespace());
+    if (db == null) {
+      throwInvalidDbNamespaceError(identifier.namespace());
+    }
+
+    TableOperations ops = null;
+    TableMetadata lastMetadata = null;
+    if (purge) {
+      ops = newTableOps(identifier);
+      // TODO: to catch NotFoundException as in 
https://github.com/apache/iceberg/pull/5510.
+      lastMetadata = ops.current();
+    }
+
+    try {
+      client.deleteTable(tableName(db, identifier.name()));
+    } catch (NoSuchTableException e) {
+      return false;
+    }
+
+    if (ops != null && lastMetadata != null) {
+      CatalogUtil.dropTableData(ops.io(), lastMetadata);
+    }
+
+    return true;
+  }
+
+  @Override
+  public void renameTable(TableIdentifier from, TableIdentifier to) {
+    String fromDb = databaseId(from.namespace());
+    if (fromDb == null) {
+      throwInvalidDbNamespaceError(from.namespace());
+    }
+
+    String toDb = databaseId(to.namespace());
+    if (toDb == null) {
+      throwInvalidDbNamespaceError(to.namespace());
+    }
+
+    Preconditions.checkArgument(
+        fromDb.equals(toDb),
+        "Cannot rename table %s to %s: database must match",
+        from.toString(),
+        to.toString());
+    client.renameTable(tableName(fromDb, from.name()), tableName(toDb, 
to.name()));
+  }
+
+  @Override
+  public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    if (namespace.isEmpty()) {
+      // Used by `CREATE NAMESPACE <catalog>`. Create a BLMS catalog linked 
with Iceberg catalog.
+      client.createCatalog(catalogName, Catalog.getDefaultInstance());
+      LOG.info("Created BigLake catalog: {}", catalogName.toString());
+    } else if (namespace.levels().length == 1) {
+      // Create a database.
+      String dbId = databaseId(namespace);
+      Database.Builder builder = 
Database.newBuilder().setType(Database.Type.HIVE);
+      builder
+          .getHiveOptionsBuilder()
+          .putAllParameters(metadata)
+          .setLocationUri(databaseLocation(dbId));
+
+      client.createDatabase(DatabaseName.of(projectId, region, catalogId, 
dbId), builder.build());
+    } else {
+      throw new IllegalArgumentException(
+          String.format("Invalid namespace (too long): %s", namespace));
+    }
+  }
+
+  @Override
+  public List<Namespace> listNamespaces(Namespace namespace) {
+    if (namespace.isEmpty()) {
+      return Streams.stream(client.listDatabases(catalogName))
+          .map(BigLakeCatalog::namespace)
+          .collect(ImmutableList.toImmutableList());
+    }
+
+    // Database namespace does not have nested namespaces.
+    if (namespace.levels().length == 1) {
+      return ImmutableList.of();
+    }
+
+    throw new NoSuchNamespaceException("Invalid namespace: %s", namespace);
+  }
+
+  @Override
+  public boolean dropNamespace(Namespace namespace) {
+    try {
+      if (namespace.isEmpty()) {
+        // Used by `DROP NAMESPACE <catalog>`. Deletes the BLMS catalog linked 
by Iceberg catalog.
+        client.deleteCatalog(catalogName);
+        LOG.info("Deleted BigLake catalog: {}", catalogName.toString());
+      } else if (namespace.levels().length == 1) {
+        client.deleteDatabase(databaseName(namespace));
+        // Don't delete the data file folder for safety. It aligns with HMS's 
default behavior.
+        // To support database or catalog level config controlling file 
deletion in future.
+      } else {
+        LOG.warn("Invalid namespace (too long): {}", namespace);
+        return false;
+      }
+    } catch (NoSuchNamespaceException e) {
+      LOG.warn("Failed to drop namespace", e);
+      return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public boolean setProperties(Namespace namespace, Map<String, String> props) 
{
+    DatabaseName dbName = databaseName(namespace);
+    if (dbName == null) {
+      throwInvalidDbNamespaceError(namespace);
+    }
+
+    HiveDatabaseOptions.Builder optionsBuilder =
+        loadDatabase(namespace).toBuilder().getHiveOptionsBuilder();
+    props.forEach(optionsBuilder::putParameters);
+    client.updateDatabaseParameters(dbName, optionsBuilder.getParametersMap());
+    return true;
+  }
+
+  @Override
+  public boolean removeProperties(Namespace namespace, Set<String> props) {
+    DatabaseName dbName = databaseName(namespace);
+    if (dbName == null) {
+      throwInvalidDbNamespaceError(namespace);
+    }
+
+    HiveDatabaseOptions.Builder optionsBuilder =
+        loadDatabase(namespace).toBuilder().getHiveOptionsBuilder();
+    props.forEach(optionsBuilder::removeParameters);
+    client.updateDatabaseParameters(dbName, optionsBuilder.getParametersMap());
+    return true;
+  }
+
+  @Override
+  public Map<String, String> loadNamespaceMetadata(Namespace namespace) {
+    if (namespace.isEmpty()) {
+      // Calls getCatalog to check existence. BLMS catalog has no metadata 
today.
+      client.getCatalog(catalogName);
+      return ImmutableMap.of();
+    } else if (namespace.levels().length == 1) {
+      return loadDatabase(namespace).getHiveOptions().getParametersMap();
+    } else {
+      throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
+    }
+  }
+
+  @Override
+  public String name() {
+    return name;
+  }
+
+  @Override
+  protected Map<String, String> properties() {
+    return properties == null ? ImmutableMap.of() : properties;
+  }
+
+  @Override
+  public void setConf(Object conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closeableGroup != null) {
+      closeableGroup.close();
+    }
+  }
+
+  @Override
+  protected boolean isValidIdentifier(TableIdentifier tableIdentifier) {
+    return tableIdentifier.namespace().levels().length == 1;
+  }
+
+  private String databaseLocation(String dbId) {
+    String warehouseLocation =
+        
LocationUtil.stripTrailingSlash(properties.get(CatalogProperties.WAREHOUSE_LOCATION));
+    Preconditions.checkArgument(warehouseLocation != null, "Data warehouse 
location is not set");

Review Comment:
   I think this would be better if it's checked in `initialize(..)` so that we 
fail early if this is missing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to