This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch branch-lance-namepspace-dev
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-lance-namepspace-dev by 
this push:
     new 32d9642e8a [#9015] improvement(lance-rest): Add UTs and ITs for Lance 
rest table operations (#9016)
32d9642e8a is described below

commit 32d9642e8a656f1a8e7b0f7da71201951a28b865
Author: Mini Yu <[email protected]>
AuthorDate: Wed Nov 5 21:47:05 2025 +0800

    [#9015] improvement(lance-rest): Add UTs and ITs for Lance rest table 
operations (#9016)
    
    ### What changes were proposed in this pull request?
    
    Add ITs for lance rest service and test lance table operations.
    
    ### Why are the changes needed?
    
    To make code robust
    
    Fix: #9015
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A.
    
    ### How was this patch tested?
    
    It's self a test.
---
 .../GenericLakehouseCatalogOperations.java         |  15 +-
 .../lakehouse/lance/LanceCatalogOperations.java    |  29 +-
 .../lance/common/ops/LanceTableOperations.java     |  60 +-
 ...java => GravitinoLanceNameSpaceOperations.java} | 345 +----------
 .../gravitino/GravitinoLanceNamespaceWrapper.java  | 633 +--------------------
 .../gravitino/GravitinoLanceTableOperations.java   | 293 ++++++++++
 .../gravitino/lance/common/utils/ArrowUtils.java}  |  44 +-
 .../lance/common/utils/LanceConstants.java}        |   9 +-
 .../lance/common/utils/LancePropertiesUtils.java}  |  30 +-
 .../lance/common/utils/SerializationUtils.java     |  53 ++
 .../lance/common/utils/TestArrowUtils.java}        |  27 +-
 .../lance/service/rest/LanceTableOperations.java   |  83 ++-
 .../lance/integration/test/LanceRESTServiceIT.java | 303 ++++++++++
 .../service/rest/TestLanceNamespaceOperations.java | 301 ++++++++++
 .../src/test/resources/log4j2.properties           |  73 +++
 15 files changed, 1273 insertions(+), 1025 deletions(-)

diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
index 60c0958c14..3860b52d57 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
@@ -33,6 +33,7 @@ import org.apache.commons.collections4.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
@@ -304,6 +305,8 @@ public class GenericLakehouseCatalogOperations
           getLakehouseCatalogOperations(newProperties);
       return lanceCatalogOperations.createTable(
           ident, columns, comment, newProperties, partitions, distribution, 
sortOrders, indexes);
+    } catch (EntityAlreadyExistsException e) {
+      throw new TableAlreadyExistsException(e, "Table %s already exists", 
ident);
     } catch (IOException e) {
       throw new RuntimeException("Failed to create table " + ident, e);
     }
@@ -366,22 +369,22 @@ public class GenericLakehouseCatalogOperations
   }
 
   @Override
-  public boolean dropTable(NameIdentifier ident) {
+  public boolean purgeTable(NameIdentifier ident) {
     try {
       TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, 
TableEntity.class);
       LakehouseCatalogOperations lakehouseCatalogOperations =
           getLakehouseCatalogOperations(tableEntity.getProperties());
-      return lakehouseCatalogOperations.dropTable(ident);
+      return lakehouseCatalogOperations.purgeTable(ident);
     } catch (NoSuchTableException e) {
-      LOG.warn("Table {} does not exist, skip dropping it.", ident);
+      LOG.warn("Table {} does not exist, skip purging it.", ident);
       return false;
     } catch (IOException e) {
-      throw new RuntimeException("Failed to drop table: " + ident, e);
+      throw new RuntimeException("Failed to purge table: " + ident, e);
     }
   }
 
   @Override
-  public boolean purgeTable(NameIdentifier ident) throws 
UnsupportedOperationException {
+  public boolean dropTable(NameIdentifier ident) throws 
UnsupportedOperationException {
     try {
       // Only delete the metadata entry here. The physical data will not be 
deleted.
       if (!tableExists(ident)) {
@@ -389,7 +392,7 @@ public class GenericLakehouseCatalogOperations
       }
       return store.delete(ident, Entity.EntityType.TABLE);
     } catch (IOException e) {
-      throw new RuntimeException("Failed to purge table " + ident, e);
+      throw new RuntimeException("Failed to drop table " + ident, e);
     }
   }
 
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
index 0ed83457a4..16bef5565f 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
@@ -20,9 +20,7 @@
 package org.apache.gravitino.catalog.lakehouse.lance;
 
 import static org.apache.gravitino.Entity.EntityType.TABLE;
-import static 
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.lancedb.lance.Dataset;
 import com.lancedb.lance.WriteParams;
@@ -58,6 +56,7 @@ import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
 import org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter;
+import org.apache.gravitino.lance.common.utils.LancePropertiesUtils;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.TableEntity;
 import org.apache.gravitino.rel.Column;
@@ -120,13 +119,7 @@ public class LanceCatalogOperations implements 
LakehouseCatalogOperations {
       throws NoSuchSchemaException, TableAlreadyExistsException {
     // Ignore partitions, distributions, sortOrders, and indexes for Lance 
tables;
     String location = 
properties.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
-    Map<String, String> storageProps =
-        properties.entrySet().stream()
-            .filter(e -> 
e.getKey().startsWith(LANCE_TABLE_STORAGE_OPTION_PREFIX))
-            .collect(
-                Collectors.toMap(
-                    e -> 
e.getKey().substring(LANCE_TABLE_STORAGE_OPTION_PREFIX.length()),
-                    Map.Entry::getValue));
+    Map<String, String> storageProps = 
LancePropertiesUtils.getLanceStorageOptions(properties);
 
     try (Dataset ignored =
         Dataset.create(
@@ -280,7 +273,7 @@ public class LanceCatalogOperations implements 
LakehouseCatalogOperations {
   }
 
   @Override
-  public boolean dropTable(NameIdentifier ident) {
+  public boolean purgeTable(NameIdentifier ident) {
     try {
       TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE, 
TableEntity.class);
       Map<String, String> lancePropertiesMap = tableEntity.getProperties();
@@ -288,14 +281,24 @@ public class LanceCatalogOperations implements 
LakehouseCatalogOperations {
           
lancePropertiesMap.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
 
       if (!store.delete(ident, Entity.EntityType.TABLE)) {
-        throw new RuntimeException("Failed to drop Lance table: " + 
ident.name());
+        throw new RuntimeException("Failed to purge Lance table: " + 
ident.name());
       }
 
       // Drop the Lance dataset from cloud storage.
-      Dataset.drop(location, ImmutableMap.of());
+      Dataset.drop(location, 
LancePropertiesUtils.getLanceStorageOptions(lancePropertiesMap));
       return true;
     } catch (IOException e) {
-      throw new RuntimeException("Failed to drop Lance table: " + 
ident.name(), e);
+      throw new RuntimeException("Failed to purge Lance table: " + 
ident.name(), e);
     }
   }
+
+  @Override
+  public boolean dropTable(NameIdentifier ident) {
+    // TODO We will handle it in GenericLakehouseCatalogOperations. However, 
we need
+    //  to figure out it's a external table or not first. we will introduce a 
property
+    //  to indicate that. Currently, all Lance tables are external tables. 
`drop` will
+    //  just remove the metadata in metastore and will not delete data in 
storage.
+    throw new UnsupportedOperationException(
+        "LanceCatalogOperations does not support dropTable operation.");
+  }
 }
diff --git 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
index 8a356fb135..7f2f1df52f 100644
--- 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
@@ -18,26 +18,80 @@
  */
 package org.apache.gravitino.lance.common.ops;
 
+import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
+import com.lancedb.lance.namespace.model.CreateTableRequest;
 import com.lancedb.lance.namespace.model.CreateTableResponse;
 import com.lancedb.lance.namespace.model.DeregisterTableResponse;
 import com.lancedb.lance.namespace.model.DescribeTableResponse;
+import com.lancedb.lance.namespace.model.RegisterTableRequest;
 import com.lancedb.lance.namespace.model.RegisterTableResponse;
 import java.util.Map;
+import java.util.Optional;
 
 public interface LanceTableOperations {
 
-  DescribeTableResponse describeTable(String tableId, String delimiter);
+  /**
+   * Describe the details of a table.
+   *
+   * @param tableId table ids are in the format of 
"{namespace}{delimiter}{table_name}"
+   * @param delimiter the delimiter used in the namespace
+   * @param version the version of the table to describe, if null, describe 
the latest version
+   * @return the table description
+   */
+  DescribeTableResponse describeTable(String tableId, String delimiter, 
Optional<Long> version);
 
+  /**
+   * Create a new table.
+   *
+   * @param tableId table ids are in the format of 
"{namespace}{delimiter}{table_name}"
+   * @param mode it can be CREATE, OVERWRITE, or EXIST_OK
+   * @param delimiter the delimiter used in the namespace
+   * @param tableLocation the location where the table data will be stored
+   * @param tableProperties the properties of the table
+   * @param arrowStreamBody the arrow stream bytes containing the schema and 
data
+   * @return the response of the create table operation
+   */
   CreateTableResponse createTable(
       String tableId,
-      String mode,
+      CreateTableRequest.ModeEnum mode,
       String delimiter,
       String tableLocation,
       Map<String, String> tableProperties,
       byte[] arrowStreamBody);
 
+  /**
+   * Create an new table without schema.
+   *
+   * @param tableId table ids are in the format of 
"{namespace}{delimiter}{table_name}"
+   * @param delimiter the delimiter used in the namespace
+   * @param tableLocation the location where the table data will be stored
+   * @param tableProperties the properties of the table
+   * @return the response of the create table operation
+   */
+  CreateEmptyTableResponse createEmptyTable(
+      String tableId, String delimiter, String tableLocation, Map<String, 
String> tableProperties);
+
+  /**
+   * Register an existing table.
+   *
+   * @param tableId table ids are in the format of 
"{namespace}{delimiter}{table_name}"
+   * @param mode it can be REGISTER or OVERWRITE.
+   * @param delimiter the delimiter used in the namespace
+   * @param tableProperties the properties of the table, it should contain the 
table location
+   * @return the response of the register table operation
+   */
   RegisterTableResponse registerTable(
-      String tableId, String mode, String delimiter, Map<String, String> 
tableProperties);
+      String tableId,
+      RegisterTableRequest.ModeEnum mode,
+      String delimiter,
+      Map<String, String> tableProperties);
 
+  /**
+   * Deregister a table. It will not delete the underlying lance data.
+   *
+   * @param tableId table ids are in the format of 
"{namespace}{delimiter}{table_name}"
+   * @param delimiter the delimiter used in the namespace
+   * @return the response of the deregister table operation
+   */
   DeregisterTableResponse deregisterTable(String tableId, String delimiter);
 }
diff --git 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNameSpaceOperations.java
similarity index 54%
copy from 
lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
copy to 
lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNameSpaceOperations.java
index dd2c4629b4..961134ec66 100644
--- 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNameSpaceOperations.java
@@ -1,29 +1,24 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
  *
- *  http://www.apache.org/licenses/LICENSE-2.0
+ *      http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
  */
-package org.apache.gravitino.lance.common.ops.gravitino;
 
-import static 
org.apache.gravitino.lance.common.config.LanceConfig.METALAKE_NAME;
-import static 
org.apache.gravitino.lance.common.config.LanceConfig.NAMESPACE_BACKEND_URI;
-import static 
org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter.CONVERTER;
-import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+package org.apache.gravitino.lance.common.ops.gravitino;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
@@ -32,24 +27,14 @@ import com.google.common.collect.Sets;
 import com.lancedb.lance.namespace.LanceNamespaceException;
 import com.lancedb.lance.namespace.ObjectIdentifier;
 import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
-import com.lancedb.lance.namespace.model.CreateNamespaceRequest.ModeEnum;
 import com.lancedb.lance.namespace.model.CreateNamespaceResponse;
-import com.lancedb.lance.namespace.model.CreateTableResponse;
-import com.lancedb.lance.namespace.model.DeregisterTableResponse;
 import com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
-import com.lancedb.lance.namespace.model.DescribeTableResponse;
 import com.lancedb.lance.namespace.model.DropNamespaceRequest;
 import com.lancedb.lance.namespace.model.DropNamespaceResponse;
-import com.lancedb.lance.namespace.model.JsonArrowSchema;
 import com.lancedb.lance.namespace.model.ListNamespacesResponse;
 import com.lancedb.lance.namespace.model.ListTablesResponse;
-import com.lancedb.lance.namespace.model.RegisterTableRequest;
-import com.lancedb.lance.namespace.model.RegisterTableResponse;
 import com.lancedb.lance.namespace.util.CommonUtil;
-import com.lancedb.lance.namespace.util.JsonArrowSchemaConverter;
 import com.lancedb.lance.namespace.util.PageUtil;
-import java.io.ByteArrayInputStream;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -61,14 +46,8 @@ import java.util.function.IntFunction;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.arrow.vector.types.pojo.Field;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.CatalogChange;
-import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.Schema;
 import org.apache.gravitino.SchemaChange;
@@ -80,60 +59,16 @@ import 
org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NonEmptyCatalogException;
 import org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
-import org.apache.gravitino.lance.common.config.LanceConfig;
 import org.apache.gravitino.lance.common.ops.LanceNamespaceOperations;
-import org.apache.gravitino.lance.common.ops.LanceTableOperations;
-import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
-import org.apache.gravitino.rel.Column;
-import org.apache.gravitino.rel.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GravitinoLanceNamespaceWrapper extends NamespaceWrapper
-    implements LanceNamespaceOperations, LanceTableOperations {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoLanceNamespaceWrapper.class);
-  private GravitinoClient client;
-
-  @VisibleForTesting
-  GravitinoLanceNamespaceWrapper() {
-    super(null);
-  }
-
-  public GravitinoLanceNamespaceWrapper(LanceConfig config) {
-    super(config);
-  }
-
-  @Override
-  protected void initialize() {
-    String uri = config().get(NAMESPACE_BACKEND_URI);
-    String metalakeName = config().get(METALAKE_NAME);
-    Preconditions.checkArgument(
-        StringUtils.isNotBlank(metalakeName),
-        "Metalake name must be provided for Lance Gravitino namespace 
backend");
 
-    this.client = 
GravitinoClient.builder(uri).withMetalake(metalakeName).build();
-  }
+public class GravitinoLanceNameSpaceOperations implements 
LanceNamespaceOperations {
 
-  @Override
-  public LanceNamespaceOperations newNamespaceOps() {
-    return this;
-  }
+  private final GravitinoLanceNamespaceWrapper namespaceWrapper;
+  private final GravitinoClient client;
 
-  @Override
-  protected LanceTableOperations newTableOps() {
-    return this;
-  }
-
-  @Override
-  public void close() {
-    if (client != null) {
-      try {
-        client.close();
-      } catch (Exception e) {
-        LOG.warn("Error closing Gravitino client", e);
-      }
-    }
+  public GravitinoLanceNameSpaceOperations(GravitinoLanceNamespaceWrapper 
namespaceWrapper) {
+    this.namespaceWrapper = namespaceWrapper;
+    this.client = namespaceWrapper.getClient();
   }
 
   @Override
@@ -148,13 +83,13 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
       case 0:
         namespaces =
             Arrays.stream(client.listCatalogsInfo())
-                .filter(this::isLakehouseCatalog)
+                .filter(namespaceWrapper::isLakehouseCatalog)
                 .map(Catalog::name)
                 .collect(Collectors.toList());
         break;
 
       case 1:
-        Catalog catalog = 
loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
+        Catalog catalog = 
namespaceWrapper.loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
         namespaces = Lists.newArrayList(catalog.asSchemas().listSchemas());
         break;
 
@@ -184,7 +119,7 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
         "Expected at most 2-level and at least 1-level namespace but got: %s",
         namespaceId);
 
-    Catalog catalog = loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
+    Catalog catalog = 
namespaceWrapper.loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
     Map<String, String> properties = Maps.newHashMap();
 
     switch (nsId.levels()) {
@@ -261,7 +196,7 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
         "Expected at most 2-level and at least 1-level namespace but got: %s",
         namespaceId);
 
-    Catalog catalog = loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
+    Catalog catalog = 
namespaceWrapper.loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
     if (nsId.levels() == 2) {
       String schemaName = nsId.levelAtListPos(1);
       if (!catalog.asSchemas().schemaExists(schemaName)) {
@@ -274,32 +209,6 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
     }
   }
 
-  private boolean isLakehouseCatalog(Catalog catalog) {
-    return catalog.type().equals(Catalog.Type.RELATIONAL)
-        && "generic-lakehouse".equals(catalog.provider());
-  }
-
-  private Catalog loadAndValidateLakehouseCatalog(String catalogName) {
-    Catalog catalog;
-    try {
-      catalog = client.loadCatalog(catalogName);
-    } catch (NoSuchCatalogException e) {
-      throw LanceNamespaceException.notFound(
-          "Catalog not found: " + catalogName,
-          NoSuchCatalogException.class.getSimpleName(),
-          catalogName,
-          CommonUtil.formatCurrentStackTrace());
-    }
-    if (!isLakehouseCatalog(catalog)) {
-      throw LanceNamespaceException.notFound(
-          "Catalog is not a lakehouse catalog: " + catalogName,
-          NoSuchCatalogException.class.getSimpleName(),
-          catalogName,
-          CommonUtil.formatCurrentStackTrace());
-    }
-    return catalog;
-  }
-
   private CreateNamespaceResponse createOrUpdateCatalog(
       String catalogName, CreateNamespaceRequest.ModeEnum mode, Map<String, 
String> properties) {
     CreateNamespaceResponse response = new CreateNamespaceResponse();
@@ -322,7 +231,7 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
     }
 
     // Catalog exists, validate type
-    if (!isLakehouseCatalog(catalog)) {
+    if (!namespaceWrapper.isLakehouseCatalog(catalog)) {
       throw LanceNamespaceException.conflict(
           "Catalog already exists but is not a lakehouse catalog: " + 
catalogName,
           CatalogAlreadyExistsException.class.getSimpleName(),
@@ -370,7 +279,7 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
       CreateNamespaceRequest.ModeEnum mode,
       Map<String, String> properties) {
     CreateNamespaceResponse response = new CreateNamespaceResponse();
-    Catalog loadedCatalog = loadAndValidateLakehouseCatalog(catalogName);
+    Catalog loadedCatalog = 
namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);
 
     Schema schema;
     try {
@@ -506,7 +415,7 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
     Preconditions.checkArgument(
         nsId.levels() == 2, "Expected 2-level namespace but got: %s", 
nsId.levels());
     String catalogName = nsId.levelAtListPos(0);
-    Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
+    Catalog catalog = 
namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);
     String schemaName = nsId.levelAtListPos(1);
     List<String> tables =
         
Arrays.stream(catalog.asTableCatalog().listTables(Namespace.of(schemaName)))
@@ -523,202 +432,4 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
         .tables(response.getNamespaces())
         .pageToken(response.getPageToken());
   }
-
-  @Override
-  public DescribeTableResponse describeTable(String tableId, String delimiter) 
{
-    ObjectIdentifier nsId = ObjectIdentifier.of(tableId, 
Pattern.quote(delimiter));
-    Preconditions.checkArgument(
-        nsId.levels() == 3, "Expected at 3-level namespace but got: %s", 
nsId.levels());
-
-    String catalogName = nsId.levelAtListPos(0);
-    Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
-    NameIdentifier tableIdentifier =
-        NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
-
-    Table table = catalog.asTableCatalog().loadTable(tableIdentifier);
-    DescribeTableResponse response = new DescribeTableResponse();
-    response.setProperties(table.properties());
-    response.setLocation(table.properties().get("location"));
-    response.setSchema(toJsonArrowSchema(table.columns()));
-    return response;
-  }
-
-  @Override
-  public CreateTableResponse createTable(
-      String tableId,
-      String mode,
-      String delimiter,
-      String tableLocation,
-      Map<String, String> tableProperties,
-      byte[] arrowStreamBody) {
-    ObjectIdentifier nsId = ObjectIdentifier.of(tableId, 
Pattern.quote(delimiter));
-    Preconditions.checkArgument(
-        nsId.levels() == 3, "Expected at 3-level namespace but got: %s", 
nsId.levels());
-
-    // Parser column information.
-    List<Column> columns = Lists.newArrayList();
-    if (arrowStreamBody != null) {
-      org.apache.arrow.vector.types.pojo.Schema schema = 
parseArrowIpcStream(arrowStreamBody);
-      columns = extractColumns(schema);
-    }
-
-    String catalogName = nsId.levelAtListPos(0);
-    Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
-
-    NameIdentifier tableIdentifier =
-        NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
-
-    Map<String, String> createTableProperties = 
Maps.newHashMap(tableProperties);
-    createTableProperties.put("location", tableLocation);
-    createTableProperties.put("mode", mode);
-    // TODO considering the mode (create, exist_ok, overwrite)
-
-    ModeEnum createMode = ModeEnum.fromValue(mode.toLowerCase());
-    switch (createMode) {
-      case EXIST_OK:
-        if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
-          CreateTableResponse response = new CreateTableResponse();
-          Table existingTable = 
catalog.asTableCatalog().loadTable(tableIdentifier);
-          response.setProperties(existingTable.properties());
-          response.setLocation(existingTable.properties().get("location"));
-          response.setVersion(0L);
-          return response;
-        }
-        break;
-      case CREATE:
-        if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
-          throw LanceNamespaceException.conflict(
-              "Table already exists: " + tableId,
-              SchemaAlreadyExistsException.class.getSimpleName(),
-              tableId,
-              CommonUtil.formatCurrentStackTrace());
-        }
-        break;
-      case OVERWRITE:
-        if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
-          catalog.asTableCatalog().dropTable(tableIdentifier);
-        }
-        break;
-      default:
-        throw new IllegalArgumentException("Unknown mode: " + mode);
-    }
-
-    Table t =
-        catalog
-            .asTableCatalog()
-            .createTable(
-                tableIdentifier,
-                columns.toArray(new Column[0]),
-                tableLocation,
-                createTableProperties);
-
-    CreateTableResponse response = new CreateTableResponse();
-    response.setProperties(t.properties());
-    response.setLocation(tableLocation);
-    response.setVersion(0L);
-    return response;
-  }
-
-  @Override
-  public RegisterTableResponse registerTable(
-      String tableId, String mode, String delimiter, Map<String, String> 
tableProperties) {
-    ObjectIdentifier nsId = ObjectIdentifier.of(tableId, 
Pattern.quote(delimiter));
-    Preconditions.checkArgument(
-        nsId.levels() == 3, "Expected at 3-level namespace but got: %s", 
nsId.levels());
-
-    String catalogName = nsId.levelAtListPos(0);
-    Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
-    NameIdentifier tableIdentifier =
-        NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
-
-    // TODO Support real register API
-    RegisterTableRequest.ModeEnum createMode =
-        RegisterTableRequest.ModeEnum.fromValue(mode.toUpperCase());
-    if (createMode == RegisterTableRequest.ModeEnum.CREATE
-        && catalog.asTableCatalog().tableExists(tableIdentifier)) {
-      throw LanceNamespaceException.conflict(
-          "Table already exists: " + tableId,
-          SchemaAlreadyExistsException.class.getSimpleName(),
-          tableId,
-          CommonUtil.formatCurrentStackTrace());
-    }
-
-    if (createMode == RegisterTableRequest.ModeEnum.OVERWRITE
-        && catalog.asTableCatalog().tableExists(tableIdentifier)) {
-      LOG.info("Overwriting existing table: {}", tableId);
-      catalog.asTableCatalog().dropTable(tableIdentifier);
-    }
-
-    Table t =
-        catalog.asTableCatalog().createTable(tableIdentifier, new Column[] {}, 
"", tableProperties);
-
-    RegisterTableResponse response = new RegisterTableResponse();
-    response.setProperties(t.properties());
-    response.setLocation(t.properties().get("location"));
-    return response;
-  }
-
-  @Override
-  public DeregisterTableResponse deregisterTable(String tableId, String 
delimiter) {
-
-    ObjectIdentifier nsId = ObjectIdentifier.of(tableId, 
Pattern.quote(delimiter));
-    Preconditions.checkArgument(
-        nsId.levels() == 3, "Expected at 3-level namespace but got: %s", 
nsId.levels());
-
-    String catalogName = nsId.levelAtListPos(0);
-    Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
-
-    NameIdentifier tableIdentifier =
-        NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
-    Table t = catalog.asTableCatalog().loadTable(tableIdentifier);
-    Map<String, String> properties = t.properties();
-    // TODO Support real deregister API.
-    catalog.asTableCatalog().purgeTable(tableIdentifier);
-
-    DeregisterTableResponse response = new DeregisterTableResponse();
-    response.setProperties(properties);
-    response.setLocation(properties.get("location"));
-    return response;
-  }
-
-  private JsonArrowSchema toJsonArrowSchema(Column[] columns) {
-    List<Field> fields =
-        Arrays.stream(columns)
-            .map(col -> CONVERTER.toArrowField(col.name(), col.dataType(), 
col.nullable()))
-            .collect(Collectors.toList());
-
-    return JsonArrowSchemaConverter.convertToJsonArrowSchema(
-        new org.apache.arrow.vector.types.pojo.Schema(fields));
-  }
-
-  @VisibleForTesting
-  org.apache.arrow.vector.types.pojo.Schema parseArrowIpcStream(byte[] stream) 
{
-    org.apache.arrow.vector.types.pojo.Schema schema;
-    try (BufferAllocator allocator = new RootAllocator();
-        ByteArrayInputStream bais = new ByteArrayInputStream(stream);
-        ArrowStreamReader reader = new ArrowStreamReader(bais, allocator)) {
-      schema = reader.getVectorSchemaRoot().getSchema();
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to parse Arrow IPC stream", e);
-    }
-
-    Preconditions.checkArgument(schema != null, "No schema found in Arrow IPC 
stream");
-    return schema;
-  }
-
-  private List<Column> 
extractColumns(org.apache.arrow.vector.types.pojo.Schema arrowSchema) {
-    List<Column> columns = new ArrayList<>();
-
-    for (org.apache.arrow.vector.types.pojo.Field field : 
arrowSchema.getFields()) {
-      columns.add(
-          Column.of(
-              field.getName(),
-              CONVERTER.toGravitino(field),
-              null,
-              field.isNullable(),
-              false,
-              DEFAULT_VALUE_NOT_SET));
-    }
-    return columns;
-  }
 }
diff --git 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
index dd2c4629b4..a1fee0a73e 100644
--- 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
@@ -20,81 +20,30 @@ package org.apache.gravitino.lance.common.ops.gravitino;
 
 import static 
org.apache.gravitino.lance.common.config.LanceConfig.METALAKE_NAME;
 import static 
org.apache.gravitino.lance.common.config.LanceConfig.NAMESPACE_BACKEND_URI;
-import static 
org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter.CONVERTER;
-import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import com.lancedb.lance.namespace.LanceNamespaceException;
-import com.lancedb.lance.namespace.ObjectIdentifier;
-import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
-import com.lancedb.lance.namespace.model.CreateNamespaceRequest.ModeEnum;
-import com.lancedb.lance.namespace.model.CreateNamespaceResponse;
-import com.lancedb.lance.namespace.model.CreateTableResponse;
-import com.lancedb.lance.namespace.model.DeregisterTableResponse;
-import com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
-import com.lancedb.lance.namespace.model.DescribeTableResponse;
-import com.lancedb.lance.namespace.model.DropNamespaceRequest;
-import com.lancedb.lance.namespace.model.DropNamespaceResponse;
-import com.lancedb.lance.namespace.model.JsonArrowSchema;
-import com.lancedb.lance.namespace.model.ListNamespacesResponse;
-import com.lancedb.lance.namespace.model.ListTablesResponse;
-import com.lancedb.lance.namespace.model.RegisterTableRequest;
-import com.lancedb.lance.namespace.model.RegisterTableResponse;
 import com.lancedb.lance.namespace.util.CommonUtil;
-import com.lancedb.lance.namespace.util.JsonArrowSchemaConverter;
-import com.lancedb.lance.namespace.util.PageUtil;
-import java.io.ByteArrayInputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import java.util.function.IntFunction;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
-import org.apache.gravitino.CatalogChange;
-import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.Namespace;
-import org.apache.gravitino.Schema;
-import org.apache.gravitino.SchemaChange;
 import org.apache.gravitino.client.GravitinoClient;
-import org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
-import org.apache.gravitino.exceptions.CatalogInUseException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
-import org.apache.gravitino.exceptions.NoSuchSchemaException;
-import org.apache.gravitino.exceptions.NonEmptyCatalogException;
-import org.apache.gravitino.exceptions.NonEmptySchemaException;
-import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
 import org.apache.gravitino.lance.common.config.LanceConfig;
 import org.apache.gravitino.lance.common.ops.LanceNamespaceOperations;
 import org.apache.gravitino.lance.common.ops.LanceTableOperations;
 import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
-import org.apache.gravitino.rel.Column;
-import org.apache.gravitino.rel.Table;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class GravitinoLanceNamespaceWrapper extends NamespaceWrapper
-    implements LanceNamespaceOperations, LanceTableOperations {
+public class GravitinoLanceNamespaceWrapper extends NamespaceWrapper {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoLanceNamespaceWrapper.class);
   private GravitinoClient client;
 
+  private LanceNamespaceOperations namespaceOperations;
+  private LanceTableOperations tableOperations;
+
   @VisibleForTesting
   GravitinoLanceNamespaceWrapper() {
     super(null);
@@ -104,6 +53,10 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
     super(config);
   }
 
+  public GravitinoClient getClient() {
+    return client;
+  }
+
   @Override
   protected void initialize() {
     String uri = config().get(NAMESPACE_BACKEND_URI);
@@ -113,16 +66,18 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
         "Metalake name must be provided for Lance Gravitino namespace 
backend");
 
     this.client = 
GravitinoClient.builder(uri).withMetalake(metalakeName).build();
+    this.namespaceOperations = new GravitinoLanceNameSpaceOperations(this);
+    this.tableOperations = new GravitinoLanceTableOperations(this);
   }
 
   @Override
   public LanceNamespaceOperations newNamespaceOps() {
-    return this;
+    return namespaceOperations;
   }
 
   @Override
   protected LanceTableOperations newTableOps() {
-    return this;
+    return tableOperations;
   }
 
   @Override
@@ -136,150 +91,12 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
     }
   }
 
-  @Override
-  public ListNamespacesResponse listNamespaces(
-      String namespaceId, String delimiter, String pageToken, Integer limit) {
-    ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter);
-    Preconditions.checkArgument(
-        nsId.levels() <= 2, "Expected at most 2-level namespace but got: %s", 
namespaceId);
-
-    List<String> namespaces;
-    switch (nsId.levels()) {
-      case 0:
-        namespaces =
-            Arrays.stream(client.listCatalogsInfo())
-                .filter(this::isLakehouseCatalog)
-                .map(Catalog::name)
-                .collect(Collectors.toList());
-        break;
-
-      case 1:
-        Catalog catalog = 
loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
-        namespaces = Lists.newArrayList(catalog.asSchemas().listSchemas());
-        break;
-
-      case 2:
-        namespaces = Lists.newArrayList();
-        break;
-
-      default:
-        throw new IllegalArgumentException(
-            "Expected at most 2-level namespace but got: " + namespaceId);
-    }
-
-    Collections.sort(namespaces);
-    PageUtil.Page page =
-        PageUtil.splitPage(namespaces, pageToken, 
PageUtil.normalizePageSize(limit));
-    ListNamespacesResponse response = new ListNamespacesResponse();
-    response.setNamespaces(Sets.newHashSet(page.items()));
-    response.setPageToken(page.nextPageToken());
-    return response;
-  }
-
-  @Override
-  public DescribeNamespaceResponse describeNamespace(String namespaceId, 
String delimiter) {
-    ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter);
-    Preconditions.checkArgument(
-        nsId.levels() <= 2 && nsId.levels() > 0,
-        "Expected at most 2-level and at least 1-level namespace but got: %s",
-        namespaceId);
-
-    Catalog catalog = loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
-    Map<String, String> properties = Maps.newHashMap();
-
-    switch (nsId.levels()) {
-      case 1:
-        
Optional.ofNullable(catalog.properties()).ifPresent(properties::putAll);
-        break;
-      case 2:
-        String schemaName = nsId.levelAtListPos(1);
-        Schema schema = catalog.asSchemas().loadSchema(schemaName);
-        Optional.ofNullable(schema.properties()).ifPresent(properties::putAll);
-        break;
-      default:
-        throw new IllegalArgumentException(
-            "Expected at most 2-level and at least 1-level namespace but got: 
" + namespaceId);
-    }
-
-    DescribeNamespaceResponse response = new DescribeNamespaceResponse();
-    response.setProperties(properties);
-    return response;
-  }
-
-  @Override
-  public CreateNamespaceResponse createNamespace(
-      String namespaceId,
-      String delimiter,
-      CreateNamespaceRequest.ModeEnum mode,
-      Map<String, String> properties) {
-    ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter);
-    Preconditions.checkArgument(
-        nsId.levels() <= 2 && nsId.levels() > 0,
-        "Expected at most 2-level and at least 1-level namespace but got: %s",
-        namespaceId);
-
-    switch (nsId.levels()) {
-      case 1:
-        return createOrUpdateCatalog(nsId.levelAtListPos(0), mode, properties);
-      case 2:
-        return createOrUpdateSchema(
-            nsId.levelAtListPos(0), nsId.levelAtListPos(1), mode, properties);
-      default:
-        throw new IllegalArgumentException(
-            "Expected at most 2-level and at least 1-level namespace but got: 
" + namespaceId);
-    }
-  }
-
-  @Override
-  public DropNamespaceResponse dropNamespace(
-      String namespaceId,
-      String delimiter,
-      DropNamespaceRequest.ModeEnum mode,
-      DropNamespaceRequest.BehaviorEnum behavior) {
-    ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter);
-    Preconditions.checkArgument(
-        nsId.levels() <= 2 && nsId.levels() > 0,
-        "Expected at most 2-level and at least 1-level namespace but got: %s",
-        namespaceId);
-
-    switch (nsId.levels()) {
-      case 1:
-        return dropCatalog(nsId.levelAtListPos(0), mode, behavior);
-      case 2:
-        return dropSchema(nsId.levelAtListPos(0), nsId.levelAtListPos(1), 
mode, behavior);
-      default:
-        throw new IllegalArgumentException(
-            "Expected at most 2-level and at least 1-level namespace but got: 
" + namespaceId);
-    }
-  }
-
-  @Override
-  public void namespaceExists(String namespaceId, String delimiter) throws 
LanceNamespaceException {
-    ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter);
-    Preconditions.checkArgument(
-        nsId.levels() <= 2 && nsId.levels() > 0,
-        "Expected at most 2-level and at least 1-level namespace but got: %s",
-        namespaceId);
-
-    Catalog catalog = loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
-    if (nsId.levels() == 2) {
-      String schemaName = nsId.levelAtListPos(1);
-      if (!catalog.asSchemas().schemaExists(schemaName)) {
-        throw LanceNamespaceException.notFound(
-            "Schema not found: " + schemaName,
-            NoSuchSchemaException.class.getSimpleName(),
-            schemaName,
-            CommonUtil.formatCurrentStackTrace());
-      }
-    }
-  }
-
-  private boolean isLakehouseCatalog(Catalog catalog) {
+  public boolean isLakehouseCatalog(Catalog catalog) {
     return catalog.type().equals(Catalog.Type.RELATIONAL)
         && "generic-lakehouse".equals(catalog.provider());
   }
 
-  private Catalog loadAndValidateLakehouseCatalog(String catalogName) {
+  public Catalog loadAndValidateLakehouseCatalog(String catalogName) {
     Catalog catalog;
     try {
       catalog = client.loadCatalog(catalogName);
@@ -299,426 +116,4 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
     }
     return catalog;
   }
-
-  private CreateNamespaceResponse createOrUpdateCatalog(
-      String catalogName, CreateNamespaceRequest.ModeEnum mode, Map<String, 
String> properties) {
-    CreateNamespaceResponse response = new CreateNamespaceResponse();
-
-    Catalog catalog;
-    try {
-      catalog = client.loadCatalog(catalogName);
-    } catch (NoSuchCatalogException e) {
-      // Catalog does not exist, create it
-      Catalog createdCatalog =
-          client.createCatalog(
-              catalogName,
-              Catalog.Type.RELATIONAL,
-              "generic-lakehouse",
-              "created by Lance REST server",
-              properties);
-      response.setProperties(
-          createdCatalog.properties() == null ? Maps.newHashMap() : 
createdCatalog.properties());
-      return response;
-    }
-
-    // Catalog exists, validate type
-    if (!isLakehouseCatalog(catalog)) {
-      throw LanceNamespaceException.conflict(
-          "Catalog already exists but is not a lakehouse catalog: " + 
catalogName,
-          CatalogAlreadyExistsException.class.getSimpleName(),
-          catalogName,
-          CommonUtil.formatCurrentStackTrace());
-    }
-
-    // Catalog exists, handle based on mode
-    switch (mode) {
-      case EXIST_OK:
-        response.setProperties(
-            
Optional.ofNullable(catalog.properties()).orElse(Collections.emptyMap()));
-        return response;
-      case CREATE:
-        throw LanceNamespaceException.conflict(
-            "Catalog already exists: " + catalogName,
-            CatalogAlreadyExistsException.class.getSimpleName(),
-            catalogName,
-            CommonUtil.formatCurrentStackTrace());
-      case OVERWRITE:
-        CatalogChange[] changes =
-            buildChanges(
-                properties,
-                removeInUseProperty(catalog.properties()),
-                CatalogChange::setProperty,
-                CatalogChange::removeProperty,
-                CatalogChange[]::new);
-        Catalog alteredCatalog = client.alterCatalog(catalogName, changes);
-        
Optional.ofNullable(alteredCatalog.properties()).ifPresent(response::setProperties);
-        return response;
-      default:
-        throw new IllegalArgumentException("Unknown mode: " + mode);
-    }
-  }
-
-  private Map<String, String> removeInUseProperty(Map<String, String> 
properties) {
-    return properties.entrySet().stream()
-        .filter(e -> !e.getKey().equalsIgnoreCase(Catalog.PROPERTY_IN_USE))
-        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-  }
-
-  private CreateNamespaceResponse createOrUpdateSchema(
-      String catalogName,
-      String schemaName,
-      CreateNamespaceRequest.ModeEnum mode,
-      Map<String, String> properties) {
-    CreateNamespaceResponse response = new CreateNamespaceResponse();
-    Catalog loadedCatalog = loadAndValidateLakehouseCatalog(catalogName);
-
-    Schema schema;
-    try {
-      schema = loadedCatalog.asSchemas().loadSchema(schemaName);
-    } catch (NoSuchSchemaException e) {
-      // Schema does not exist, create it
-      Schema createdSchema = 
loadedCatalog.asSchemas().createSchema(schemaName, null, properties);
-      response.setProperties(
-          createdSchema.properties() == null ? Maps.newHashMap() : 
createdSchema.properties());
-      return response;
-    }
-
-    // Schema exists, handle based on mode
-    switch (mode) {
-      case EXIST_OK:
-        response.setProperties(
-            
Optional.ofNullable(schema.properties()).orElse(Collections.emptyMap()));
-        return response;
-      case CREATE:
-        throw LanceNamespaceException.conflict(
-            "Schema already exists: " + schemaName,
-            SchemaAlreadyExistsException.class.getSimpleName(),
-            schemaName,
-            CommonUtil.formatCurrentStackTrace());
-      case OVERWRITE:
-        SchemaChange[] changes =
-            buildChanges(
-                properties,
-                schema.properties(),
-                SchemaChange::setProperty,
-                SchemaChange::removeProperty,
-                SchemaChange[]::new);
-        Schema alteredSchema = 
loadedCatalog.asSchemas().alterSchema(schemaName, changes);
-        
Optional.ofNullable(alteredSchema.properties()).ifPresent(response::setProperties);
-        return response;
-      default:
-        throw new IllegalArgumentException("Unknown mode: " + mode);
-    }
-  }
-
-  private DropNamespaceResponse dropCatalog(
-      String catalogName,
-      DropNamespaceRequest.ModeEnum mode,
-      DropNamespaceRequest.BehaviorEnum behavior) {
-    try {
-      boolean dropped =
-          client.dropCatalog(catalogName, behavior == 
DropNamespaceRequest.BehaviorEnum.CASCADE);
-      if (dropped) {
-        return new DropNamespaceResponse();
-      } else {
-        // Catalog did not exist
-        if (mode == DropNamespaceRequest.ModeEnum.FAIL) {
-          throw LanceNamespaceException.notFound(
-              "Catalog not found: " + catalogName,
-              NoSuchCatalogException.class.getSimpleName(),
-              catalogName,
-              CommonUtil.formatCurrentStackTrace());
-        }
-        return new DropNamespaceResponse(); // SKIP mode
-      }
-    } catch (NonEmptyCatalogException | CatalogInUseException e) {
-      throw LanceNamespaceException.badRequest(
-          String.format("Catalog %s is not empty or in used", catalogName),
-          NonEmptyCatalogException.class.getSimpleName(),
-          catalogName,
-          CommonUtil.formatCurrentStackTrace());
-    }
-  }
-
-  private DropNamespaceResponse dropSchema(
-      String catalogName,
-      String schemaName,
-      DropNamespaceRequest.ModeEnum mode,
-      DropNamespaceRequest.BehaviorEnum behavior) {
-    try {
-      boolean dropped =
-          client
-              .loadCatalog(catalogName)
-              .asSchemas()
-              .dropSchema(schemaName, behavior == 
DropNamespaceRequest.BehaviorEnum.CASCADE);
-      if (dropped) {
-        return new DropNamespaceResponse();
-      } else {
-        // Schema did not exist
-        if (mode == DropNamespaceRequest.ModeEnum.FAIL) {
-          throw LanceNamespaceException.notFound(
-              "Schema not found: " + schemaName,
-              NoSuchSchemaException.class.getSimpleName(),
-              schemaName,
-              CommonUtil.formatCurrentStackTrace());
-        }
-        return new DropNamespaceResponse(); // SKIP mode
-      }
-    } catch (NoSuchCatalogException e) {
-      throw LanceNamespaceException.notFound(
-          "Catalog not found: " + catalogName,
-          NoSuchCatalogException.class.getSimpleName(),
-          catalogName,
-          CommonUtil.formatCurrentStackTrace());
-    } catch (NonEmptySchemaException e) {
-      throw LanceNamespaceException.badRequest(
-          String.format("Schema %s is not empty.", schemaName),
-          NonEmptySchemaException.class.getSimpleName(),
-          schemaName,
-          CommonUtil.formatCurrentStackTrace());
-    }
-  }
-
-  private <T> T[] buildChanges(
-      Map<String, String> newProps,
-      Map<String, String> oldProps,
-      BiFunction<String, String, T> setPropertyFunc,
-      Function<String, T> removePropertyFunc,
-      IntFunction<T[]> arrayCreator) {
-    Stream<T> setPropertiesStream =
-        newProps.entrySet().stream()
-            .map(entry -> setPropertyFunc.apply(entry.getKey(), 
entry.getValue()));
-
-    Stream<T> removePropertiesStream =
-        oldProps == null
-            ? Stream.empty()
-            : oldProps.keySet().stream()
-                .filter(key -> !newProps.containsKey(key))
-                .map(removePropertyFunc);
-
-    return Stream.concat(setPropertiesStream, 
removePropertiesStream).toArray(arrayCreator);
-  }
-
-  @Override
-  public ListTablesResponse listTables(
-      String namespaceId, String delimiter, String pageToken, Integer limit) {
-    ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, 
Pattern.quote(delimiter));
-    Preconditions.checkArgument(
-        nsId.levels() == 2, "Expected 2-level namespace but got: %s", 
nsId.levels());
-    String catalogName = nsId.levelAtListPos(0);
-    Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
-    String schemaName = nsId.levelAtListPos(1);
-    List<String> tables =
-        
Arrays.stream(catalog.asTableCatalog().listTables(Namespace.of(schemaName)))
-            .map(ident -> Joiner.on(delimiter).join(catalogName, schemaName, 
ident.name()))
-            .sorted()
-            .collect(Collectors.toList());
-
-    PageUtil.Page page = PageUtil.splitPage(tables, pageToken, 
PageUtil.normalizePageSize(limit));
-    ListNamespacesResponse response = new ListNamespacesResponse();
-    response.setNamespaces(Sets.newHashSet(page.items()));
-    response.setPageToken(page.nextPageToken());
-
-    return new ListTablesResponse()
-        .tables(response.getNamespaces())
-        .pageToken(response.getPageToken());
-  }
-
-  @Override
-  public DescribeTableResponse describeTable(String tableId, String delimiter) 
{
-    ObjectIdentifier nsId = ObjectIdentifier.of(tableId, 
Pattern.quote(delimiter));
-    Preconditions.checkArgument(
-        nsId.levels() == 3, "Expected at 3-level namespace but got: %s", 
nsId.levels());
-
-    String catalogName = nsId.levelAtListPos(0);
-    Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
-    NameIdentifier tableIdentifier =
-        NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
-
-    Table table = catalog.asTableCatalog().loadTable(tableIdentifier);
-    DescribeTableResponse response = new DescribeTableResponse();
-    response.setProperties(table.properties());
-    response.setLocation(table.properties().get("location"));
-    response.setSchema(toJsonArrowSchema(table.columns()));
-    return response;
-  }
-
-  @Override
-  public CreateTableResponse createTable(
-      String tableId,
-      String mode,
-      String delimiter,
-      String tableLocation,
-      Map<String, String> tableProperties,
-      byte[] arrowStreamBody) {
-    ObjectIdentifier nsId = ObjectIdentifier.of(tableId, 
Pattern.quote(delimiter));
-    Preconditions.checkArgument(
-        nsId.levels() == 3, "Expected at 3-level namespace but got: %s", 
nsId.levels());
-
-    // Parser column information.
-    List<Column> columns = Lists.newArrayList();
-    if (arrowStreamBody != null) {
-      org.apache.arrow.vector.types.pojo.Schema schema = 
parseArrowIpcStream(arrowStreamBody);
-      columns = extractColumns(schema);
-    }
-
-    String catalogName = nsId.levelAtListPos(0);
-    Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
-
-    NameIdentifier tableIdentifier =
-        NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
-
-    Map<String, String> createTableProperties = 
Maps.newHashMap(tableProperties);
-    createTableProperties.put("location", tableLocation);
-    createTableProperties.put("mode", mode);
-    // TODO considering the mode (create, exist_ok, overwrite)
-
-    ModeEnum createMode = ModeEnum.fromValue(mode.toLowerCase());
-    switch (createMode) {
-      case EXIST_OK:
-        if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
-          CreateTableResponse response = new CreateTableResponse();
-          Table existingTable = 
catalog.asTableCatalog().loadTable(tableIdentifier);
-          response.setProperties(existingTable.properties());
-          response.setLocation(existingTable.properties().get("location"));
-          response.setVersion(0L);
-          return response;
-        }
-        break;
-      case CREATE:
-        if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
-          throw LanceNamespaceException.conflict(
-              "Table already exists: " + tableId,
-              SchemaAlreadyExistsException.class.getSimpleName(),
-              tableId,
-              CommonUtil.formatCurrentStackTrace());
-        }
-        break;
-      case OVERWRITE:
-        if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
-          catalog.asTableCatalog().dropTable(tableIdentifier);
-        }
-        break;
-      default:
-        throw new IllegalArgumentException("Unknown mode: " + mode);
-    }
-
-    Table t =
-        catalog
-            .asTableCatalog()
-            .createTable(
-                tableIdentifier,
-                columns.toArray(new Column[0]),
-                tableLocation,
-                createTableProperties);
-
-    CreateTableResponse response = new CreateTableResponse();
-    response.setProperties(t.properties());
-    response.setLocation(tableLocation);
-    response.setVersion(0L);
-    return response;
-  }
-
-  @Override
-  public RegisterTableResponse registerTable(
-      String tableId, String mode, String delimiter, Map<String, String> 
tableProperties) {
-    ObjectIdentifier nsId = ObjectIdentifier.of(tableId, 
Pattern.quote(delimiter));
-    Preconditions.checkArgument(
-        nsId.levels() == 3, "Expected at 3-level namespace but got: %s", 
nsId.levels());
-
-    String catalogName = nsId.levelAtListPos(0);
-    Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
-    NameIdentifier tableIdentifier =
-        NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
-
-    // TODO Support real register API
-    RegisterTableRequest.ModeEnum createMode =
-        RegisterTableRequest.ModeEnum.fromValue(mode.toUpperCase());
-    if (createMode == RegisterTableRequest.ModeEnum.CREATE
-        && catalog.asTableCatalog().tableExists(tableIdentifier)) {
-      throw LanceNamespaceException.conflict(
-          "Table already exists: " + tableId,
-          SchemaAlreadyExistsException.class.getSimpleName(),
-          tableId,
-          CommonUtil.formatCurrentStackTrace());
-    }
-
-    if (createMode == RegisterTableRequest.ModeEnum.OVERWRITE
-        && catalog.asTableCatalog().tableExists(tableIdentifier)) {
-      LOG.info("Overwriting existing table: {}", tableId);
-      catalog.asTableCatalog().dropTable(tableIdentifier);
-    }
-
-    Table t =
-        catalog.asTableCatalog().createTable(tableIdentifier, new Column[] {}, 
"", tableProperties);
-
-    RegisterTableResponse response = new RegisterTableResponse();
-    response.setProperties(t.properties());
-    response.setLocation(t.properties().get("location"));
-    return response;
-  }
-
-  @Override
-  public DeregisterTableResponse deregisterTable(String tableId, String 
delimiter) {
-
-    ObjectIdentifier nsId = ObjectIdentifier.of(tableId, 
Pattern.quote(delimiter));
-    Preconditions.checkArgument(
-        nsId.levels() == 3, "Expected at 3-level namespace but got: %s", 
nsId.levels());
-
-    String catalogName = nsId.levelAtListPos(0);
-    Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
-
-    NameIdentifier tableIdentifier =
-        NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
-    Table t = catalog.asTableCatalog().loadTable(tableIdentifier);
-    Map<String, String> properties = t.properties();
-    // TODO Support real deregister API.
-    catalog.asTableCatalog().purgeTable(tableIdentifier);
-
-    DeregisterTableResponse response = new DeregisterTableResponse();
-    response.setProperties(properties);
-    response.setLocation(properties.get("location"));
-    return response;
-  }
-
-  private JsonArrowSchema toJsonArrowSchema(Column[] columns) {
-    List<Field> fields =
-        Arrays.stream(columns)
-            .map(col -> CONVERTER.toArrowField(col.name(), col.dataType(), 
col.nullable()))
-            .collect(Collectors.toList());
-
-    return JsonArrowSchemaConverter.convertToJsonArrowSchema(
-        new org.apache.arrow.vector.types.pojo.Schema(fields));
-  }
-
-  @VisibleForTesting
-  org.apache.arrow.vector.types.pojo.Schema parseArrowIpcStream(byte[] stream) 
{
-    org.apache.arrow.vector.types.pojo.Schema schema;
-    try (BufferAllocator allocator = new RootAllocator();
-        ByteArrayInputStream bais = new ByteArrayInputStream(stream);
-        ArrowStreamReader reader = new ArrowStreamReader(bais, allocator)) {
-      schema = reader.getVectorSchemaRoot().getSchema();
-    } catch (Exception e) {
-      throw new RuntimeException("Failed to parse Arrow IPC stream", e);
-    }
-
-    Preconditions.checkArgument(schema != null, "No schema found in Arrow IPC 
stream");
-    return schema;
-  }
-
-  private List<Column> 
extractColumns(org.apache.arrow.vector.types.pojo.Schema arrowSchema) {
-    List<Column> columns = new ArrayList<>();
-
-    for (org.apache.arrow.vector.types.pojo.Field field : 
arrowSchema.getFields()) {
-      columns.add(
-          Column.of(
-              field.getName(),
-              CONVERTER.toGravitino(field),
-              null,
-              field.isNullable(),
-              false,
-              DEFAULT_VALUE_NOT_SET));
-    }
-    return columns;
-  }
 }
diff --git 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java
new file mode 100644
index 0000000000..d298dbe5e5
--- /dev/null
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java
@@ -0,0 +1,293 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lance.common.ops.gravitino;
+
+import static 
org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter.CONVERTER;
+import static 
org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_LOCATION;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.lancedb.lance.namespace.LanceNamespaceException;
+import com.lancedb.lance.namespace.ObjectIdentifier;
+import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
+import com.lancedb.lance.namespace.model.CreateTableRequest;
+import com.lancedb.lance.namespace.model.CreateTableRequest.ModeEnum;
+import com.lancedb.lance.namespace.model.CreateTableResponse;
+import com.lancedb.lance.namespace.model.DeregisterTableResponse;
+import com.lancedb.lance.namespace.model.DescribeTableResponse;
+import com.lancedb.lance.namespace.model.JsonArrowSchema;
+import com.lancedb.lance.namespace.model.RegisterTableRequest;
+import com.lancedb.lance.namespace.model.RegisterTableResponse;
+import com.lancedb.lance.namespace.util.CommonUtil;
+import com.lancedb.lance.namespace.util.JsonArrowSchemaConverter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.lance.common.ops.LanceTableOperations;
+import org.apache.gravitino.lance.common.utils.ArrowUtils;
+import org.apache.gravitino.lance.common.utils.LancePropertiesUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GravitinoLanceTableOperations implements LanceTableOperations {
+  private static final Logger LOG = 
LoggerFactory.getLogger(GravitinoLanceTableOperations.class);
+
+  private final GravitinoLanceNamespaceWrapper namespaceWrapper;
+
+  public GravitinoLanceTableOperations(GravitinoLanceNamespaceWrapper 
namespaceWrapper) {
+    this.namespaceWrapper = namespaceWrapper;
+  }
+
+  @Override
+  public DescribeTableResponse describeTable(
+      String tableId, String delimiter, Optional<Long> version) {
+    if (!version.isEmpty()) {
+      throw new UnsupportedOperationException(
+          "Describing specific table version is not supported. It should be 
null to indicate the"
+              + " latest version.");
+    }
+
+    ObjectIdentifier nsId = ObjectIdentifier.of(tableId, 
Pattern.quote(delimiter));
+    Preconditions.checkArgument(
+        nsId.levels() == 3, "Expected at 3-level namespace but got: %s", 
nsId.levels());
+
+    String catalogName = nsId.levelAtListPos(0);
+    Catalog catalog = 
namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);
+    NameIdentifier tableIdentifier =
+        NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
+
+    Table table = catalog.asTableCatalog().loadTable(tableIdentifier);
+    DescribeTableResponse response = new DescribeTableResponse();
+    response.setProperties(table.properties());
+    response.setLocation(table.properties().get(LANCE_LOCATION));
+    response.setSchema(toJsonArrowSchema(table.columns()));
+    response.setVersion(null);
+    
response.setStorageOptions(LancePropertiesUtils.getLanceStorageOptions(table.properties()));
+    return response;
+  }
+
+  @Override
+  public CreateTableResponse createTable(
+      String tableId,
+      CreateTableRequest.ModeEnum mode,
+      String delimiter,
+      String tableLocation,
+      Map<String, String> tableProperties,
+      byte[] arrowStreamBody) {
+    ObjectIdentifier nsId = ObjectIdentifier.of(tableId, 
Pattern.quote(delimiter));
+    Preconditions.checkArgument(
+        nsId.levels() == 3, "Expected at 3-level namespace but got: %s", 
nsId.levels());
+
+    // Parser column information.
+    List<Column> columns = Lists.newArrayList();
+    if (arrowStreamBody != null) {
+      org.apache.arrow.vector.types.pojo.Schema schema =
+          ArrowUtils.parseArrowIpcStream(arrowStreamBody);
+      columns = extractColumns(schema);
+    }
+
+    String catalogName = nsId.levelAtListPos(0);
+    Catalog catalog = 
namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);
+
+    NameIdentifier tableIdentifier =
+        NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
+
+    Map<String, String> createTableProperties = 
Maps.newHashMap(tableProperties);
+    if (tableLocation != null) {
+      createTableProperties.put(LANCE_LOCATION, tableLocation);
+    }
+    // The format is defined in GenericLakehouseCatalog
+    createTableProperties.put("format", "lance");
+
+    Table t;
+    try {
+      t =
+          catalog
+              .asTableCatalog()
+              .createTable(
+                  tableIdentifier, columns.toArray(new Column[0]), null, 
createTableProperties);
+    } catch (TableAlreadyExistsException exception) {
+      if (mode == CreateTableRequest.ModeEnum.CREATE) {
+        throw LanceNamespaceException.conflict(
+            "Table already exists: " + tableId,
+            TableAlreadyExistsException.class.getSimpleName(),
+            tableId,
+            CommonUtil.formatCurrentStackTrace());
+      } else if (mode == CreateTableRequest.ModeEnum.OVERWRITE) {
+        LOG.info("Overwriting existing table: {}", tableId);
+        catalog.asTableCatalog().purgeTable(tableIdentifier);
+
+        t =
+            catalog
+                .asTableCatalog()
+                .createTable(
+                    tableIdentifier, columns.toArray(new Column[0]), null, 
createTableProperties);
+      } else { // EXIST_OK
+        CreateTableResponse response = new CreateTableResponse();
+        Table existingTable = 
catalog.asTableCatalog().loadTable(tableIdentifier);
+        response.setProperties(existingTable.properties());
+        response.setLocation(existingTable.properties().get(LANCE_LOCATION));
+        response.setVersion(null);
+        response.setStorageOptions(
+            
LancePropertiesUtils.getLanceStorageOptions(existingTable.properties()));
+        return response;
+      }
+    }
+
+    CreateTableResponse response = new CreateTableResponse();
+    response.setProperties(t.properties());
+    response.setLocation(tableLocation);
+    // Extract storage options from table properties. All storage options 
stores in table
+    // properties.
+    
response.setStorageOptions(LancePropertiesUtils.getLanceStorageOptions(t.properties()));
+    response.setVersion(null);
+    response.setLocation(t.properties().get(LANCE_LOCATION));
+    response.setProperties(t.properties());
+    return response;
+  }
+
+  @Override
+  public CreateEmptyTableResponse createEmptyTable(
+      String tableId, String delimiter, String tableLocation, Map<String, 
String> tableProperties) {
+    CreateTableResponse response =
+        createTable(tableId, ModeEnum.CREATE, delimiter, tableLocation, 
tableProperties, null);
+    CreateEmptyTableResponse emptyTableResponse = new 
CreateEmptyTableResponse();
+    emptyTableResponse.setProperties(response.getProperties());
+    emptyTableResponse.setLocation(response.getLocation());
+    emptyTableResponse.setStorageOptions(response.getStorageOptions());
+    return emptyTableResponse;
+  }
+
+  @Override
+  public RegisterTableResponse registerTable(
+      String tableId,
+      RegisterTableRequest.ModeEnum mode,
+      String delimiter,
+      Map<String, String> tableProperties) {
+    ObjectIdentifier nsId = ObjectIdentifier.of(tableId, 
Pattern.quote(delimiter));
+    Preconditions.checkArgument(
+        nsId.levels() == 3, "Expected at 3-level namespace but got: %s", 
nsId.levels());
+
+    String catalogName = nsId.levelAtListPos(0);
+    Catalog catalog = 
namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);
+    NameIdentifier tableIdentifier =
+        NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
+
+    Map<String, String> copiedTableProperties = 
Maps.newHashMap(tableProperties);
+    copiedTableProperties.put("format", "lance");
+    Table t = null;
+    try {
+      t =
+          catalog
+              .asTableCatalog()
+              .createTable(tableIdentifier, new Column[] {}, null, 
copiedTableProperties);
+    } catch (TableAlreadyExistsException exception) {
+      if (mode == RegisterTableRequest.ModeEnum.CREATE) {
+        throw LanceNamespaceException.conflict(
+            "Table already exists: " + tableId,
+            TableAlreadyExistsException.class.getSimpleName(),
+            tableId,
+            CommonUtil.formatCurrentStackTrace());
+      } else if (mode == RegisterTableRequest.ModeEnum.OVERWRITE) {
+        LOG.info("Overwriting existing table: {}", tableId);
+        catalog.asTableCatalog().dropTable(tableIdentifier);
+
+        t =
+            catalog
+                .asTableCatalog()
+                .createTable(tableIdentifier, new Column[] {}, null, 
copiedTableProperties);
+      }
+    }
+
+    RegisterTableResponse response = new RegisterTableResponse();
+    response.setProperties(t.properties());
+    response.setLocation(t.properties().get(LANCE_LOCATION));
+    return response;
+  }
+
+  @Override
+  public DeregisterTableResponse deregisterTable(String tableId, String 
delimiter) {
+    ObjectIdentifier nsId = ObjectIdentifier.of(tableId, 
Pattern.quote(delimiter));
+    Preconditions.checkArgument(
+        nsId.levels() == 3, "Expected at 3-level namespace but got: %s", 
nsId.levels());
+
+    String catalogName = nsId.levelAtListPos(0);
+    Catalog catalog = 
namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);
+
+    NameIdentifier tableIdentifier =
+        NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
+    Table t = catalog.asTableCatalog().loadTable(tableIdentifier);
+    Map<String, String> properties = t.properties();
+    // TODO Support real deregister API.
+    boolean result = catalog.asTableCatalog().dropTable(tableIdentifier);
+    if (!result) {
+      throw LanceNamespaceException.notFound(
+          "Table not found: " + tableId,
+          NoSuchTableException.class.getSimpleName(),
+          tableId,
+          CommonUtil.formatCurrentStackTrace());
+    }
+
+    DeregisterTableResponse response = new DeregisterTableResponse();
+    response.setProperties(properties);
+    response.setLocation(properties.get(LANCE_LOCATION));
+    response.setId(nsId.listStyleId());
+    return response;
+  }
+
+  private List<Column> 
extractColumns(org.apache.arrow.vector.types.pojo.Schema arrowSchema) {
+    List<Column> columns = new ArrayList<>();
+
+    for (org.apache.arrow.vector.types.pojo.Field field : 
arrowSchema.getFields()) {
+      columns.add(
+          Column.of(
+              field.getName(),
+              CONVERTER.toGravitino(field),
+              null,
+              field.isNullable(),
+              false,
+              DEFAULT_VALUE_NOT_SET));
+    }
+    return columns;
+  }
+
+  private JsonArrowSchema toJsonArrowSchema(Column[] columns) {
+    List<Field> fields =
+        Arrays.stream(columns)
+            .map(col -> CONVERTER.toArrowField(col.name(), col.dataType(), 
col.nullable()))
+            .collect(Collectors.toList());
+
+    return JsonArrowSchemaConverter.convertToJsonArrowSchema(
+        new org.apache.arrow.vector.types.pojo.Schema(fields));
+  }
+}
diff --git 
a/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestGravitinoLanceNamespaceWrapper.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/ArrowUtils.java
similarity index 68%
rename from 
lance/lance-common/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestGravitinoLanceNamespaceWrapper.java
rename to 
lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/ArrowUtils.java
index b0ddb980ab..5d8508ee45 100644
--- 
a/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestGravitinoLanceNamespaceWrapper.java
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/ArrowUtils.java
@@ -16,40 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.gravitino.lance.common.ops.gravitino;
+package org.apache.gravitino.lance.common.utils;
 
+import com.google.common.base.Preconditions;
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.channels.Channels;
-import java.util.Arrays;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
 import org.apache.arrow.vector.ipc.ArrowStreamWriter;
-import org.apache.arrow.vector.types.pojo.ArrowType;
-import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.Schema;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
 
-public class TestGravitinoLanceNamespaceWrapper {
-
-  @Test
-  public void testParseArrowIpcStream() throws Exception {
-    Schema schema =
-        new Schema(
-            Arrays.asList(
-                Field.nullable("id", new ArrowType.Int(32, true)),
-                Field.nullable("value", new ArrowType.Utf8())));
-
-    GravitinoLanceNamespaceWrapper wrapper = new 
GravitinoLanceNamespaceWrapper();
-    byte[] ipcStream = generateIpcStream(schema);
-    Schema parsedSchema = wrapper.parseArrowIpcStream(ipcStream);
-
-    Assertions.assertEquals(schema, parsedSchema);
-  }
-
-  private byte[] generateIpcStream(Schema arrowSchema) throws IOException {
+public class ArrowUtils {
+  public static byte[] generateIpcStream(Schema arrowSchema) throws 
IOException {
     try (BufferAllocator allocator = new RootAllocator()) {
 
       // Create an empty VectorSchemaRoot with the schema
@@ -73,4 +55,18 @@ public class TestGravitinoLanceNamespaceWrapper {
       throw new IOException("Failed to create empty Arrow IPC stream: " + 
e.getMessage(), e);
     }
   }
+
+  public static Schema parseArrowIpcStream(byte[] stream) {
+    Schema schema;
+    try (BufferAllocator allocator = new RootAllocator();
+        ByteArrayInputStream bais = new ByteArrayInputStream(stream);
+        ArrowStreamReader reader = new ArrowStreamReader(bais, allocator)) {
+      schema = reader.getVectorSchemaRoot().getSchema();
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Failed to parse Arrow IPC stream", 
e);
+    }
+
+    Preconditions.checkArgument(schema != null, "No schema found in Arrow IPC 
stream");
+    return schema;
+  }
 }
diff --git 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LanceConstants.java
similarity index 77%
copy from 
lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
copy to 
lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LanceConstants.java
index f39ea2e684..c34a7be58a 100644
--- 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LanceConstants.java
@@ -17,13 +17,18 @@
  * under the License.
  */
 
-package org.apache.gravitino.lance.service;
+package org.apache.gravitino.lance.common.utils;
 
-public class ServiceConstants {
+public class LanceConstants {
   public static final String LANCE_HTTP_HEADER_PREFIX = "x-lance-";
 
   public static final String LANCE_TABLE_LOCATION_HEADER =
       LANCE_HTTP_HEADER_PREFIX + "table-location";
   public static final String LANCE_TABLE_PROPERTIES_PREFIX_HEADER =
       LANCE_HTTP_HEADER_PREFIX + "table-properties";
+  // Key for table location in table properties map
+  public static final String LANCE_LOCATION = "location";
+
+  // Prefix for storage options in LanceConfig
+  public static final String LANCE_STORAGE_OPTIONS_PREFIX = "lance.storage.";
 }
diff --git 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LancePropertiesUtils.java
similarity index 50%
copy from 
lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
copy to 
lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LancePropertiesUtils.java
index f39ea2e684..e674a7266a 100644
--- 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LancePropertiesUtils.java
@@ -15,15 +15,31 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
+ *
  */
+package org.apache.gravitino.lance.common.utils;
+
+import static 
org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_STORAGE_OPTIONS_PREFIX;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class LancePropertiesUtils {
 
-package org.apache.gravitino.lance.service;
+  private LancePropertiesUtils() {
+    // Private constructor to prevent instantiation
+  }
 
-public class ServiceConstants {
-  public static final String LANCE_HTTP_HEADER_PREFIX = "x-lance-";
+  public static Map<String, String> getLanceStorageOptions(Map<String, String> 
tableProperties) {
+    if (tableProperties == null) {
+      return Map.of();
+    }
 
-  public static final String LANCE_TABLE_LOCATION_HEADER =
-      LANCE_HTTP_HEADER_PREFIX + "table-location";
-  public static final String LANCE_TABLE_PROPERTIES_PREFIX_HEADER =
-      LANCE_HTTP_HEADER_PREFIX + "table-properties";
+    return tableProperties.entrySet().stream()
+        .filter(e -> e.getKey().startsWith(LANCE_STORAGE_OPTIONS_PREFIX))
+        .collect(
+            Collectors.toMap(
+                e -> 
e.getKey().substring(LANCE_STORAGE_OPTIONS_PREFIX.length()),
+                Map.Entry::getValue));
+  }
 }
diff --git 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/SerializationUtils.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/SerializationUtils.java
new file mode 100644
index 0000000000..8e5fb2494b
--- /dev/null
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/SerializationUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.gravitino.lance.common.utils;
+
+import com.google.common.collect.ImmutableMap;
+import com.lancedb.lance.namespace.util.JsonUtil;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+
+public class SerializationUtils {
+
+  private SerializationUtils() {
+    // Utility class
+  }
+
+  // Lance REST uses a unique way to serialize and serialize table, please see:
+  // see 
https://github.com/lancedb/lance-namespace/blob/2033b2fca126e87e56ba0d5ec19c5ec010c7a98f/
+  // 
java/lance-namespace-core/src/main/java/com/lancedb/lance/namespace/rest/RestNamespace.java#L207-L208
+  public static Map<String, String> deserializeProperties(String 
serializedProperties) {
+    return StringUtils.isBlank(serializedProperties)
+        ? ImmutableMap.of()
+        : JsonUtil.parse(
+            serializedProperties,
+            jsonNode -> {
+              Map<String, String> map = new HashMap<>();
+              jsonNode
+                  .fields()
+                  .forEachRemaining(
+                      entry -> {
+                        map.put(entry.getKey(), entry.getValue().asText());
+                      });
+              return map;
+            });
+  }
+}
diff --git 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
 
b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/utils/TestArrowUtils.java
similarity index 50%
rename from 
lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
rename to 
lance/lance-common/src/test/java/org/apache/gravitino/lance/common/utils/TestArrowUtils.java
index f39ea2e684..43f0bf6ec6 100644
--- 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
+++ 
b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/utils/TestArrowUtils.java
@@ -16,14 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.gravitino.lance.common.utils;
 
-package org.apache.gravitino.lance.service;
+import java.util.Arrays;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
 
-public class ServiceConstants {
-  public static final String LANCE_HTTP_HEADER_PREFIX = "x-lance-";
+public class TestArrowUtils {
 
-  public static final String LANCE_TABLE_LOCATION_HEADER =
-      LANCE_HTTP_HEADER_PREFIX + "table-location";
-  public static final String LANCE_TABLE_PROPERTIES_PREFIX_HEADER =
-      LANCE_HTTP_HEADER_PREFIX + "table-properties";
+  @Test
+  public void testParseArrowIpcStream() throws Exception {
+    Schema schema =
+        new Schema(
+            Arrays.asList(
+                Field.nullable("id", new ArrowType.Int(32, true)),
+                Field.nullable("value", new ArrowType.Utf8())));
+    byte[] ipcStream = ArrowUtils.generateIpcStream(schema);
+    Schema parsedSchema = ArrowUtils.parseArrowIpcStream(ipcStream);
+
+    Assertions.assertEquals(schema, parsedSchema);
+  }
 }
diff --git 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
 
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
index 5590eef9bd..290730f39a 100644
--- 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
+++ 
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
@@ -19,21 +19,26 @@
 package org.apache.gravitino.lance.service.rest;
 
 import static 
org.apache.gravitino.lance.common.ops.NamespaceWrapper.NAMESPACE_DELIMITER_DEFAULT;
-import static 
org.apache.gravitino.lance.service.ServiceConstants.LANCE_TABLE_LOCATION_HEADER;
-import static 
org.apache.gravitino.lance.service.ServiceConstants.LANCE_TABLE_PROPERTIES_PREFIX_HEADER;
+import static 
org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_LOCATION;
+import static 
org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_LOCATION_HEADER;
+import static 
org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_PROPERTIES_PREFIX_HEADER;
 
 import com.codahale.metrics.annotation.ResponseMetered;
 import com.codahale.metrics.annotation.Timed;
-import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.collect.Maps;
+import com.lancedb.lance.namespace.model.CreateEmptyTableRequest;
+import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
+import com.lancedb.lance.namespace.model.CreateTableRequest;
 import com.lancedb.lance.namespace.model.CreateTableResponse;
 import com.lancedb.lance.namespace.model.DeregisterTableRequest;
 import com.lancedb.lance.namespace.model.DeregisterTableResponse;
+import com.lancedb.lance.namespace.model.DescribeTableRequest;
 import com.lancedb.lance.namespace.model.DescribeTableResponse;
 import com.lancedb.lance.namespace.model.RegisterTableRequest;
+import com.lancedb.lance.namespace.model.RegisterTableRequest.ModeEnum;
 import com.lancedb.lance.namespace.model.RegisterTableResponse;
-import com.lancedb.lance.namespace.util.JsonUtil;
 import java.util.Map;
+import java.util.Optional;
 import javax.inject.Inject;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DefaultValue;
@@ -47,8 +52,8 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.MultivaluedMap;
 import javax.ws.rs.core.Response;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
+import org.apache.gravitino.lance.common.utils.SerializationUtils;
 import org.apache.gravitino.lance.service.LanceExceptionMapper;
 import org.apache.gravitino.metrics.MetricNames;
 
@@ -70,10 +75,14 @@ public class LanceTableOperations {
   @ResponseMetered(name = "describe-table", absolute = true)
   public Response describeTable(
       @PathParam("id") String tableId,
-      @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) @QueryParam("delimiter") 
String delimiter) {
+      @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) @QueryParam("delimiter") 
String delimiter,
+      DescribeTableRequest request) {
     try {
+      validateDescribeTableRequest(request);
       DescribeTableResponse response =
-          lanceNamespace.asTableOps().describeTable(tableId, delimiter);
+          lanceNamespace
+              .asTableOps()
+              .describeTable(tableId, delimiter, 
Optional.ofNullable(request.getVersion()));
       return Response.ok(response).build();
     } catch (Exception e) {
       return LanceExceptionMapper.toRESTResponse(tableId, e);
@@ -97,13 +106,12 @@ public class LanceTableOperations {
       MultivaluedMap<String, String> headersMap = headers.getRequestHeaders();
       String tableLocation = headersMap.getFirst(LANCE_TABLE_LOCATION_HEADER);
       String tableProperties = 
headersMap.getFirst(LANCE_TABLE_PROPERTIES_PREFIX_HEADER);
-
-      Map<String, String> props =
-          JsonUtil.mapper().readValue(tableProperties, new TypeReference<>() 
{});
+      CreateTableRequest.ModeEnum modeEnum = 
CreateTableRequest.ModeEnum.fromValue(mode);
+      Map<String, String> props = 
SerializationUtils.deserializeProperties(tableProperties);
       CreateTableResponse response =
           lanceNamespace
               .asTableOps()
-              .createTable(tableId, mode, delimiter, tableLocation, props, 
arrowStreamBody);
+              .createTable(tableId, modeEnum, delimiter, tableLocation, props, 
arrowStreamBody);
       return Response.ok(response).build();
     } catch (Exception e) {
       return LanceExceptionMapper.toRESTResponse(tableId, e);
@@ -112,27 +120,25 @@ public class LanceTableOperations {
 
   @POST
   @Path("/create-empty")
+  @Produces("application/json")
   @Timed(name = "create-empty-table." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
   @ResponseMetered(name = "create-empty-table", absolute = true)
   public Response createEmptyTable(
       @PathParam("id") String tableId,
-      @QueryParam("mode") @DefaultValue("create") String mode, // create, 
exist_ok, overwrite
       @QueryParam("delimiter") @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) 
String delimiter,
+      CreateEmptyTableRequest request,
       @Context HttpHeaders headers) {
     try {
-      // Extract table properties from header
-      MultivaluedMap<String, String> headersMap = headers.getRequestHeaders();
-      String tableLocation = headersMap.getFirst(LANCE_TABLE_LOCATION_HEADER);
-      String tableProperties = 
headersMap.getFirst(LANCE_TABLE_PROPERTIES_PREFIX_HEADER);
+      validateCreateEmptyTableRequest(request);
 
+      String tableLocation = request.getLocation();
       Map<String, String> props =
-          StringUtils.isBlank(tableProperties)
-              ? Map.of()
-              : JsonUtil.mapper().readValue(tableProperties, new 
TypeReference<>() {});
-      CreateTableResponse response =
-          lanceNamespace
-              .asTableOps()
-              .createTable(tableId, mode, delimiter, tableLocation, props, 
null);
+          request.getProperties() == null
+              ? Maps.newHashMap()
+              : Maps.newHashMap(request.getProperties());
+
+      CreateEmptyTableResponse response =
+          lanceNamespace.asTableOps().createEmptyTable(tableId, delimiter, 
tableLocation, props);
       return Response.ok(response).build();
     } catch (Exception e) {
       return LanceExceptionMapper.toRESTResponse(tableId, e);
@@ -145,18 +151,18 @@ public class LanceTableOperations {
   @ResponseMetered(name = "register-table", absolute = true)
   public Response registerTable(
       @PathParam("id") String tableId,
-      @QueryParam("mode") @DefaultValue("create") String mode, // overwrite or
       @QueryParam("delimiter") @DefaultValue("$") String delimiter,
       @Context HttpHeaders headers,
       RegisterTableRequest registerTableRequest) {
     try {
+      validateRegisterTableRequest(registerTableRequest);
+
       Map<String, String> props =
           registerTableRequest.getProperties() == null
               ? Maps.newHashMap()
               : Maps.newHashMap(registerTableRequest.getProperties());
-      props.put("register", "true");
-      props.put("location", registerTableRequest.getLocation());
-      props.put("format", "lance");
+      props.put(LANCE_LOCATION, registerTableRequest.getLocation());
+      ModeEnum mode = registerTableRequest.getMode();
 
       RegisterTableResponse response =
           lanceNamespace.asTableOps().registerTable(tableId, mode, delimiter, 
props);
@@ -176,6 +182,7 @@ public class LanceTableOperations {
       @Context HttpHeaders headers,
       DeregisterTableRequest deregisterTableRequest) {
     try {
+      validateDeregisterTableRequest(deregisterTableRequest);
       DeregisterTableResponse response =
           lanceNamespace.asTableOps().deregisterTable(tableId, delimiter);
       return Response.ok(response).build();
@@ -183,4 +190,26 @@ public class LanceTableOperations {
       return LanceExceptionMapper.toRESTResponse(tableId, e);
     }
   }
+
+  private void validateCreateEmptyTableRequest(
+      @SuppressWarnings("unused") CreateEmptyTableRequest request) {
+    // No specific fields to validate for now
+  }
+
+  private void validateRegisterTableRequest(
+      @SuppressWarnings("unused") RegisterTableRequest request) {
+    // No specific fields to validate for now
+  }
+
+  private void validateDeregisterTableRequest(
+      @SuppressWarnings("unused") DeregisterTableRequest request) {
+    // We will ignore the id in the request body since it's already provided 
in the path param.
+    // No specific fields to validate for now
+  }
+
+  private void validateDescribeTableRequest(
+      @SuppressWarnings("unused") DescribeTableRequest request) {
+    // We will ignore the id in the request body since it's already provided 
in the path param
+    // No specific fields to validate for now
+  }
 }
diff --git 
a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
 
b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
index f1fe008782..c7454442b6 100644
--- 
a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
+++ 
b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
@@ -24,38 +24,68 @@ import com.google.common.collect.Sets;
 import com.lancedb.lance.namespace.LanceNamespace;
 import com.lancedb.lance.namespace.LanceNamespaceException;
 import com.lancedb.lance.namespace.LanceNamespaces;
+import com.lancedb.lance.namespace.client.apache.ApiException;
+import com.lancedb.lance.namespace.model.CreateEmptyTableRequest;
+import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
 import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
 import com.lancedb.lance.namespace.model.CreateNamespaceResponse;
+import com.lancedb.lance.namespace.model.CreateTableRequest;
+import com.lancedb.lance.namespace.model.CreateTableResponse;
+import com.lancedb.lance.namespace.model.DeregisterTableRequest;
+import com.lancedb.lance.namespace.model.DeregisterTableResponse;
 import com.lancedb.lance.namespace.model.DescribeNamespaceRequest;
 import com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
+import com.lancedb.lance.namespace.model.DescribeTableRequest;
+import com.lancedb.lance.namespace.model.DescribeTableResponse;
 import com.lancedb.lance.namespace.model.DropNamespaceRequest;
 import com.lancedb.lance.namespace.model.DropNamespaceResponse;
+import com.lancedb.lance.namespace.model.ErrorResponse;
+import com.lancedb.lance.namespace.model.JsonArrowField;
 import com.lancedb.lance.namespace.model.ListNamespacesRequest;
 import com.lancedb.lance.namespace.model.ListNamespacesResponse;
+import com.lancedb.lance.namespace.model.ListTablesRequest;
 import com.lancedb.lance.namespace.model.NamespaceExistsRequest;
+import com.lancedb.lance.namespace.model.RegisterTableRequest;
+import com.lancedb.lance.namespace.model.RegisterTableRequest.ModeEnum;
+import com.lancedb.lance.namespace.model.RegisterTableResponse;
 import com.lancedb.lance.namespace.rest.RestNamespaceConfig;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
 import org.apache.arrow.memory.BufferAllocator;
 import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Schema;
 import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.integration.test.util.BaseIT;
 import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.lance.common.utils.ArrowUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.com.google.common.base.Joiner;
 
 public class LanceRESTServiceIT extends BaseIT {
+  private static final String CATALOG_NAME = 
GravitinoITUtils.genRandomName("lance_rest_catalog");
+  private static final String SCHEMA_NAME = 
GravitinoITUtils.genRandomName("lance_rest_schema");
 
   private GravitinoMetalake metalake;
+  private Catalog catalog;
   private Map<String, String> properties =
       new HashMap<>() {
         {
@@ -372,6 +402,272 @@ public class LanceRESTServiceIT extends BaseIT {
     Assertions.assertEquals(404, exception.getCode());
   }
 
+  @Test
+  void testCreateEmptyTable() throws ApiException {
+    catalog = createCatalog(CATALOG_NAME);
+    createSchema();
+
+    CreateEmptyTableRequest request = new CreateEmptyTableRequest();
+    String location = tempDir + "/" + "empty_table/";
+    request.setLocation(location);
+    request.setProperties(
+        ImmutableMap.of(
+            "key1", "v1",
+            "lance.storage.a", "value_a",
+            "lance.storage.b", "value_b"));
+    request.setId(List.of(CATALOG_NAME, SCHEMA_NAME, "empty_table"));
+
+    CreateEmptyTableResponse response = ns.createEmptyTable(request);
+    Assertions.assertNotNull(response);
+    Assertions.assertEquals(location, response.getLocation());
+    Assertions.assertEquals("v1", response.getProperties().get("key1"));
+    Assertions.assertEquals("value_a", response.getStorageOptions().get("a"));
+    Assertions.assertEquals("value_b", response.getStorageOptions().get("b"));
+
+    DescribeTableRequest describeTableRequest = new DescribeTableRequest();
+    describeTableRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME, 
"empty_table"));
+
+    DescribeTableResponse loadTable = ns.describeTable(describeTableRequest);
+    Assertions.assertNotNull(loadTable);
+    Assertions.assertEquals(location, loadTable.getLocation());
+
+    // Try to create the same table again should fail
+    LanceNamespaceException exception =
+        Assertions.assertThrows(
+            LanceNamespaceException.class,
+            () -> {
+              ns.createEmptyTable(request);
+            });
+    Assertions.assertTrue(exception.getMessage().contains("Table already 
exists"));
+    Assertions.assertEquals(409, exception.getCode());
+  }
+
+  @Test
+  void testCreateTable() throws IOException, ApiException {
+    catalog = createCatalog(CATALOG_NAME);
+    createSchema();
+
+    String location = tempDir + "/" + "table/";
+    List<String> ids = List.of(CATALOG_NAME, SCHEMA_NAME, "table");
+    org.apache.arrow.vector.types.pojo.Schema schema =
+        new org.apache.arrow.vector.types.pojo.Schema(
+            Arrays.asList(
+                Field.nullable("id", new ArrowType.Int(32, true)),
+                Field.nullable("value", new ArrowType.Utf8())));
+    byte[] body = ArrowUtils.generateIpcStream(schema);
+
+    CreateTableRequest request = new CreateTableRequest();
+    request.setId(ids);
+    request.setLocation(location);
+    request.setProperties(
+        ImmutableMap.of(
+            "key1", "v1",
+            "lance.storage.a", "value_a",
+            "lance.storage.b", "value_b"));
+
+    CreateTableResponse response = ns.createTable(request, body);
+    Assertions.assertNotNull(response);
+    Assertions.assertEquals(location, response.getLocation());
+    Assertions.assertEquals("v1", response.getProperties().get("key1"));
+    Assertions.assertEquals("value_a", response.getStorageOptions().get("a"));
+    Assertions.assertEquals("value_b", response.getStorageOptions().get("b"));
+
+    DescribeTableRequest describeTableRequest = new DescribeTableRequest();
+    describeTableRequest.setId(ids);
+    DescribeTableResponse loadTable = ns.describeTable(describeTableRequest);
+    Assertions.assertNotNull(loadTable);
+    Assertions.assertEquals(location, loadTable.getLocation());
+
+    List<JsonArrowField> jsonArrowFields = loadTable.getSchema().getFields();
+    for (int i = 0; i < jsonArrowFields.size(); i++) {
+      JsonArrowField jsonArrowField = jsonArrowFields.get(i);
+      Field originalField = schema.getFields().get(i);
+      Assertions.assertEquals(originalField.getName(), 
jsonArrowField.getName());
+
+      if (i == 0) {
+        Assertions.assertEquals("int32", jsonArrowField.getType().getType());
+      } else if (i == 1) {
+        Assertions.assertEquals("utf8", jsonArrowField.getType().getType());
+      }
+    }
+    // Check the location exists
+    Assertions.assertTrue(new File(location).exists());
+    Assertions.assertEquals("v1", loadTable.getProperties().get("key1"));
+    Assertions.assertEquals("value_a", loadTable.getStorageOptions().get("a"));
+    Assertions.assertEquals("value_b", loadTable.getStorageOptions().get("b"));
+
+    // Check overwrite mode
+    String newLocation = tempDir + "/" + "table_new/";
+    request.setLocation(newLocation);
+    request.setMode(CreateTableRequest.ModeEnum.OVERWRITE);
+    request.setProperties(
+        ImmutableMap.of(
+            "key1", "v2",
+            "lance.storage.a", "value_va",
+            "lance.storage.b", "value_vb"));
+
+    response = Assertions.assertDoesNotThrow(() -> ns.createTable(request, 
body));
+
+    Assertions.assertNotNull(response);
+    Assertions.assertEquals(newLocation, response.getLocation());
+    Assertions.assertTrue(response.getProperties().get("key1").equals("v2"));
+    Assertions.assertEquals("value_va", response.getStorageOptions().get("a"));
+    Assertions.assertEquals("value_vb", response.getStorageOptions().get("b"));
+    Assertions.assertTrue(new File(newLocation).exists());
+    Assertions.assertFalse(new File(location).exists());
+
+    // Check exist_ok mode
+    request.setMode(CreateTableRequest.ModeEnum.EXIST_OK);
+    response = Assertions.assertDoesNotThrow(() -> ns.createTable(request, 
body));
+
+    Assertions.assertNotNull(response);
+    Assertions.assertEquals("v2", response.getProperties().get("key1"));
+    Assertions.assertEquals("value_va", response.getStorageOptions().get("a"));
+    Assertions.assertEquals("value_vb", response.getStorageOptions().get("b"));
+    Assertions.assertEquals(newLocation, response.getLocation());
+    Assertions.assertTrue(new File(newLocation).exists());
+
+    // Create table again without overwrite or exist_ok should fail
+    request.setMode(CreateTableRequest.ModeEnum.CREATE);
+    LanceNamespaceException exception =
+        Assertions.assertThrows(LanceNamespaceException.class, () -> 
ns.createTable(request, body));
+    Assertions.assertTrue(exception.getMessage().contains("already exists"));
+    Assertions.assertEquals(409, exception.getCode());
+
+    // Create a table without location should fail
+    CreateTableRequest noLocationRequest = new CreateTableRequest();
+    noLocationRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME, 
"no_location_table"));
+    LanceNamespaceException noLocationException =
+        Assertions.assertThrows(
+            LanceNamespaceException.class, () -> 
ns.createTable(noLocationRequest, body));
+    Assertions.assertTrue(
+        noLocationException.getMessage().contains("No location specified for 
table"));
+
+    // Create table with invalid schema should fail
+    byte[] invalidBody = "".getBytes(Charset.defaultCharset());
+    CreateTableRequest invalidRequest = new CreateTableRequest();
+    invalidRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME, "invalid_table"));
+    invalidRequest.setLocation(tempDir + "/" + "invalid_table/");
+    LanceNamespaceException apiException =
+        Assertions.assertThrows(
+            LanceNamespaceException.class, () -> 
ns.createTable(invalidRequest, invalidBody));
+    Assertions.assertTrue(apiException.getMessage().contains("Failed to parse 
Arrow IPC stream"));
+    Assertions.assertEquals(400, apiException.getCode());
+
+    // Create table with wrong ids should fail
+    CreateTableRequest wrongIdRequest = new CreateTableRequest();
+    wrongIdRequest.setId(List.of(CATALOG_NAME, "wrong_schema")); // This is a 
schema NOT a table.
+    wrongIdRequest.setLocation(tempDir + "/" + "wrong_id_table/");
+    LanceNamespaceException wrongIdException =
+        Assertions.assertThrows(
+            LanceNamespaceException.class, () -> 
ns.createTable(wrongIdRequest, body));
+    Assertions.assertTrue(wrongIdException.getMessage().contains("Expected at 
3-level namespace"));
+    Assertions.assertEquals(400, wrongIdException.getCode());
+
+    // Now test list tables
+    ListTablesRequest listRequest = new ListTablesRequest();
+    listRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME));
+    var listResponse = ns.listTables(listRequest);
+    Set<String> stringSet = listResponse.getTables();
+    Assertions.assertEquals(1, stringSet.size());
+    Assertions.assertTrue(stringSet.contains(Joiner.on(".").join(ids)));
+  }
+
+  @Test
+  void testRegisterTable() {
+    catalog = createCatalog(CATALOG_NAME);
+    createSchema();
+
+    String location = tempDir + "/" + "register/";
+    List<String> ids = List.of(CATALOG_NAME, SCHEMA_NAME, "table_register");
+    RegisterTableRequest registerTableRequest = new RegisterTableRequest();
+    registerTableRequest.setLocation(location);
+    registerTableRequest.setMode(ModeEnum.CREATE);
+    registerTableRequest.setId(ids);
+    registerTableRequest.setProperties(ImmutableMap.of("key1", "value1"));
+
+    RegisterTableResponse response = ns.registerTable(registerTableRequest);
+    Assertions.assertNotNull(response);
+
+    DescribeTableRequest describeTableRequest = new DescribeTableRequest();
+    describeTableRequest.setId(ids);
+    DescribeTableResponse loadTable = ns.describeTable(describeTableRequest);
+    Assertions.assertNotNull(loadTable);
+    Assertions.assertEquals(location, loadTable.getLocation());
+    Assertions.assertTrue(loadTable.getProperties().containsKey("key1"));
+
+    // Test register again with OVERWRITE mode
+    String newLocation = tempDir + "/" + "register_new/";
+    registerTableRequest.setMode(ModeEnum.OVERWRITE);
+    registerTableRequest.setLocation(newLocation);
+    response = Assertions.assertDoesNotThrow(() -> 
ns.registerTable(registerTableRequest));
+    Assertions.assertNotNull(response);
+    Assertions.assertEquals(newLocation, response.getLocation());
+
+    // Test deregister table
+    DeregisterTableRequest deregisterTableRequest = new 
DeregisterTableRequest();
+    deregisterTableRequest.setId(ids);
+    DeregisterTableResponse deregisterTableResponse = 
ns.deregisterTable(deregisterTableRequest);
+    Assertions.assertNotNull(deregisterTableResponse);
+    Assertions.assertEquals(newLocation, 
deregisterTableResponse.getLocation());
+  }
+
+  @Test
+  void testDeregisterNonExistingTable() {
+    catalog = createCatalog(CATALOG_NAME);
+    createSchema();
+
+    List<String> ids = List.of(CATALOG_NAME, SCHEMA_NAME, 
"non_existing_table");
+    DeregisterTableRequest deregisterTableRequest = new 
DeregisterTableRequest();
+    deregisterTableRequest.setId(ids);
+
+    LanceNamespaceException exception =
+        Assertions.assertThrows(
+            LanceNamespaceException.class, () -> 
ns.deregisterTable(deregisterTableRequest));
+    Assertions.assertEquals(404, exception.getCode());
+    Assertions.assertTrue(exception.getMessage().contains("does not exist"));
+    Optional<ErrorResponse> responseOptional = exception.getErrorResponse();
+    Assertions.assertTrue(responseOptional.isPresent());
+    Assertions.assertEquals(
+        NoSuchTableException.class.getSimpleName(), 
responseOptional.get().getType());
+
+    // Try to create a table and then deregister table
+    CreateEmptyTableRequest createEmptyTableRequest = new 
CreateEmptyTableRequest();
+    String location = tempDir + "/" + "to_be_deregistered_table/";
+    ids = List.of(CATALOG_NAME, SCHEMA_NAME, "to_be_deregistered_table");
+    createEmptyTableRequest.setLocation(location);
+    createEmptyTableRequest.setProperties(ImmutableMap.of());
+    createEmptyTableRequest.setId(ids);
+    CreateEmptyTableResponse response =
+        Assertions.assertDoesNotThrow(() -> 
ns.createEmptyTable(createEmptyTableRequest));
+    Assertions.assertNotNull(response);
+    Assertions.assertEquals(location, response.getLocation());
+
+    // Now try to deregister
+    deregisterTableRequest.setId(ids);
+    DeregisterTableResponse deregisterTableResponse =
+        Assertions.assertDoesNotThrow(() -> 
ns.deregisterTable(deregisterTableRequest));
+    Assertions.assertNotNull(deregisterTableResponse);
+    Assertions.assertEquals(location, deregisterTableResponse.getLocation());
+    Assertions.assertTrue(Objects.equals(ids, 
deregisterTableResponse.getId()));
+    Assertions.assertTrue(
+        new File(location).exists(), "Data should still exist after 
deregistering the table.");
+
+    // Now try to describe the table, should fail
+    DescribeTableRequest describeTableRequest = new DescribeTableRequest();
+    describeTableRequest.setId(ids);
+    LanceNamespaceException lanceNamespaceException =
+        Assertions.assertThrows(
+            LanceNamespaceException.class, () -> 
ns.describeTable(describeTableRequest));
+    Assertions.assertEquals(404, lanceNamespaceException.getCode());
+
+    describeTableRequest.setVersion(1L);
+    lanceNamespaceException =
+        Assertions.assertThrows(
+            LanceNamespaceException.class, () -> 
ns.describeTable(describeTableRequest));
+    Assertions.assertEquals(406, lanceNamespaceException.getCode());
+  }
+
   private GravitinoMetalake createMetalake(String metalakeName) {
     return client.createMetalake(metalakeName, "metalake for lance rest 
service tests", null);
   }
@@ -385,6 +681,13 @@ public class LanceRESTServiceIT extends BaseIT {
         properties);
   }
 
+  private void createSchema() {
+    Map<String, String> schemaProperties = Maps.newHashMap();
+    String comment = "comment";
+    catalog.asSchemas().createSchema(SCHEMA_NAME, comment, schemaProperties);
+    catalog.asSchemas().loadSchema(SCHEMA_NAME);
+  }
+
   private String getLanceRestServiceUrl() {
     return String.format("http://%s:%d/lance";, "localhost", 
getLanceRESTServerPort());
   }
diff --git 
a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java
 
b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java
index 0ba8bf79b7..efe1f90436 100644
--- 
a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java
+++ 
b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java
@@ -27,13 +27,23 @@ import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
+import com.lancedb.lance.namespace.LanceNamespaceException;
+import com.lancedb.lance.namespace.model.CreateEmptyTableRequest;
+import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
 import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
 import com.lancedb.lance.namespace.model.CreateNamespaceResponse;
+import com.lancedb.lance.namespace.model.CreateTableResponse;
+import com.lancedb.lance.namespace.model.DeregisterTableRequest;
+import com.lancedb.lance.namespace.model.DeregisterTableResponse;
 import com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
+import com.lancedb.lance.namespace.model.DescribeTableRequest;
+import com.lancedb.lance.namespace.model.DescribeTableResponse;
 import com.lancedb.lance.namespace.model.DropNamespaceRequest;
 import com.lancedb.lance.namespace.model.DropNamespaceResponse;
 import com.lancedb.lance.namespace.model.ErrorResponse;
 import com.lancedb.lance.namespace.model.ListNamespacesResponse;
+import com.lancedb.lance.namespace.model.RegisterTableRequest;
+import com.lancedb.lance.namespace.model.RegisterTableResponse;
 import java.io.IOException;
 import java.util.regex.Pattern;
 import javax.servlet.http.HttpServletRequest;
@@ -42,6 +52,7 @@ import javax.ws.rs.core.Application;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.lance.common.ops.LanceTableOperations;
 import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
 import org.apache.gravitino.rest.RESTUtils;
 import org.glassfish.jersey.internal.inject.AbstractBinder;
@@ -66,6 +77,7 @@ public class TestLanceNamespaceOperations extends JerseyTest {
   private static NamespaceWrapper namespaceWrapper = 
mock(NamespaceWrapper.class);
   private static 
org.apache.gravitino.lance.common.ops.LanceNamespaceOperations namespaceOps =
       
mock(org.apache.gravitino.lance.common.ops.LanceNamespaceOperations.class);
+  private static LanceTableOperations tableOps = 
mock(LanceTableOperations.class);
 
   @Override
   protected Application configure() {
@@ -78,6 +90,7 @@ public class TestLanceNamespaceOperations extends JerseyTest {
 
     ResourceConfig resourceConfig = new ResourceConfig();
     resourceConfig.register(LanceNamespaceOperations.class);
+    
resourceConfig.register(org.apache.gravitino.lance.service.rest.LanceTableOperations.class);
     resourceConfig.register(
         new AbstractBinder() {
           @Override
@@ -93,6 +106,7 @@ public class TestLanceNamespaceOperations extends JerseyTest 
{
   @BeforeAll
   public static void setup() {
     when(namespaceWrapper.asNamespaceOps()).thenReturn(namespaceOps);
+    when(namespaceWrapper.asTableOps()).thenReturn(tableOps);
   }
 
   @Test
@@ -323,4 +337,291 @@ public class TestLanceNamespaceOperations extends 
JerseyTest {
     Assertions.assertEquals("Test exception", errorResp.getError());
     Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp.getType());
   }
+
+  @Test
+  void testCreateTable() {
+    String tableIds = "catalog.scheme.create_table";
+    String delimiter = ".";
+
+    // Test normal
+    CreateTableResponse createTableResponse = new CreateTableResponse();
+    when(tableOps.createTable(any(), any(), any(), any(), any(), any()))
+        .thenReturn(createTableResponse);
+
+    byte[] bytes = new byte[] {0x01, 0x02, 0x03};
+    Response resp =
+        target(String.format("/v1/table/%s/create", tableIds))
+            .queryParam("delimiter", delimiter)
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(bytes, "application/vnd.apache.arrow.stream"));
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+
+    // Test illegal argument
+    when(tableOps.createTable(any(), any(), any(), any(), any(), any()))
+        .thenThrow(new IllegalArgumentException("Illegal argument"));
+
+    resp =
+        target(String.format("/v1/table/%s/create", tableIds))
+            .queryParam("delimiter", delimiter)
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(bytes, "application/vnd.apache.arrow.stream"));
+    Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+
+    // Test runtime exception
+    Mockito.reset(tableOps);
+    when(tableOps.createTable(any(), any(), any(), any(), any(), any()))
+        .thenThrow(new RuntimeException("Runtime exception"));
+    resp =
+        target(String.format("/v1/table/%s/create", tableIds))
+            .queryParam("delimiter", delimiter)
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(bytes, "application/vnd.apache.arrow.stream"));
+
+    Assertions.assertEquals(
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+    ErrorResponse errorResp = resp.readEntity(ErrorResponse.class);
+    Assertions.assertEquals("Runtime exception", errorResp.getError());
+    Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp.getType());
+  }
+
+  @Test
+  void testCreateEmptyTable() {
+    String tableIds = "catalog.scheme.create_empty_table";
+    String delimiter = ".";
+
+    // Test normal
+    CreateEmptyTableResponse createTableResponse = new 
CreateEmptyTableResponse();
+    createTableResponse.setLocation("/path/to/table");
+    createTableResponse.setProperties(ImmutableMap.of("key", "value"));
+    when(tableOps.createEmptyTable(any(), any(), any(), 
any())).thenReturn(createTableResponse);
+
+    CreateEmptyTableRequest tableRequest = new CreateEmptyTableRequest();
+    tableRequest.setLocation("/path/to/table");
+
+    Response resp =
+        target(String.format("/v1/table/%s/create-empty", tableIds))
+            .queryParam("delimiter", delimiter)
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(tableRequest, 
MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+    CreateEmptyTableResponse response = 
resp.readEntity(CreateEmptyTableResponse.class);
+    Assertions.assertEquals(createTableResponse.getLocation(), 
response.getLocation());
+    Assertions.assertEquals(createTableResponse.getProperties(), 
response.getProperties());
+
+    Mockito.reset(tableOps);
+    // Test illegal argument
+    when(tableOps.createEmptyTable(any(), any(), any(), any()))
+        .thenThrow(new IllegalArgumentException("Illegal argument"));
+
+    resp =
+        target(String.format("/v1/table/%s/create-empty", tableIds))
+            .queryParam("delimiter", delimiter)
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(tableRequest, 
MediaType.APPLICATION_JSON_TYPE));
+    Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+
+    // Test runtime exception
+    Mockito.reset(tableOps);
+    when(tableOps.createEmptyTable(any(), any(), any(), any()))
+        .thenThrow(new RuntimeException("Runtime exception"));
+    resp =
+        target(String.format("/v1/table/%s/create-empty", tableIds))
+            .queryParam("delimiter", delimiter)
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(tableRequest, 
MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+    ErrorResponse errorResp = resp.readEntity(ErrorResponse.class);
+    Assertions.assertEquals("Runtime exception", errorResp.getError());
+    Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp.getType());
+  }
+
+  @Test
+  void testRegisterTable() {
+    String tableIds = "catalog.scheme.register_table";
+    String delimiter = ".";
+
+    // Test normal
+    RegisterTableResponse registerTableResponse = new RegisterTableResponse();
+    registerTableResponse.setLocation("/path/to/registered_table");
+    registerTableResponse.setProperties(ImmutableMap.of("key", "value"));
+    when(tableOps.registerTable(any(), any(), any(), 
any())).thenReturn(registerTableResponse);
+
+    RegisterTableRequest tableRequest = new RegisterTableRequest();
+    tableRequest.setLocation("/path/to/registered_table");
+    tableRequest.setMode(RegisterTableRequest.ModeEnum.CREATE);
+
+    Response resp =
+        target(String.format("/v1/table/%s/register", tableIds))
+            .queryParam("delimiter", delimiter)
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(tableRequest, 
MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+    RegisterTableResponse response = 
resp.readEntity(RegisterTableResponse.class);
+    Assertions.assertEquals(registerTableResponse.getLocation(), 
response.getLocation());
+    Assertions.assertEquals(registerTableResponse.getProperties(), 
response.getProperties());
+
+    // Test illegal argument
+    Mockito.reset(tableOps);
+    when(tableOps.registerTable(any(), any(), any(), any()))
+        .thenThrow(new IllegalArgumentException("Illegal argument"));
+    resp =
+        target(String.format("/v1/table/%s/register", tableIds))
+            .queryParam("delimiter", delimiter)
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(tableRequest, 
MediaType.APPLICATION_JSON_TYPE));
+    Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+
+    // Test runtime exception
+    Mockito.reset(tableOps);
+    when(tableOps.registerTable(any(), any(), any(), any()))
+        .thenThrow(new RuntimeException("Runtime exception"));
+    resp =
+        target(String.format("/v1/table/%s/register", tableIds))
+            .queryParam("delimiter", delimiter)
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(tableRequest, 
MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+    ErrorResponse errorResp = resp.readEntity(ErrorResponse.class);
+    Assertions.assertEquals("Runtime exception", errorResp.getError());
+    Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp.getType());
+  }
+
+  @Test
+  void testDeregisterTable() {
+    String tableIds = "catalog.scheme.deregister_table";
+    String delimiter = ".";
+
+    DeregisterTableRequest tableRequest = new DeregisterTableRequest();
+
+    DeregisterTableResponse deregisterTableResponse = new 
DeregisterTableResponse();
+    deregisterTableResponse.setLocation("/path/to/deregistered_table");
+    deregisterTableResponse.setProperties(ImmutableMap.of("key", "value"));
+    // Test normal
+    when(tableOps.deregisterTable(any(), 
any())).thenReturn(deregisterTableResponse);
+
+    Response resp =
+        target(String.format("/v1/table/%s/deregister", tableIds))
+            .queryParam("delimiter", delimiter)
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(tableRequest, 
MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+    DeregisterTableResponse response = 
resp.readEntity(DeregisterTableResponse.class);
+    Assertions.assertEquals(deregisterTableResponse.getLocation(), 
response.getLocation());
+    Assertions.assertEquals(deregisterTableResponse.getProperties(), 
response.getProperties());
+
+    // Test illegal argument
+    Mockito.reset(tableOps);
+    when(tableOps.deregisterTable(any(), any()))
+        .thenThrow(new IllegalArgumentException("Illegal argument"));
+    resp =
+        target(String.format("/v1/table/%s/deregister", tableIds))
+            .queryParam("delimiter", delimiter)
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(tableRequest, 
MediaType.APPLICATION_JSON_TYPE));
+    Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+
+    // Test not found exception
+    Mockito.reset(tableOps);
+    when(tableOps.deregisterTable(any(), any()))
+        .thenThrow(
+            LanceNamespaceException.notFound(
+                "Table not found", "NoSuchTableException", tableIds, ""));
+    resp =
+        target(String.format("/v1/table/%s/deregister", tableIds))
+            .queryParam("delimiter", delimiter)
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(tableRequest, 
MediaType.APPLICATION_JSON_TYPE));
+    Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
resp.getStatus());
+
+    // Test runtime exception
+    Mockito.reset(tableOps);
+    when(tableOps.deregisterTable(any(), any()))
+        .thenThrow(new RuntimeException("Runtime exception"));
+    resp =
+        target(String.format("/v1/table/%s/deregister", tableIds))
+            .queryParam("delimiter", delimiter)
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(tableRequest, 
MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+    ErrorResponse errorResp = resp.readEntity(ErrorResponse.class);
+    Assertions.assertEquals("Runtime exception", errorResp.getError());
+    Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp.getType());
+  }
+
+  @Test
+  void testDescribeTable() {
+    String tableIds = "catalog.scheme.describe_table";
+    String delimiter = ".";
+
+    // Test normal
+    DescribeTableResponse createTableResponse = new DescribeTableResponse();
+    createTableResponse.setLocation("/path/to/describe_table");
+    createTableResponse.setProperties(ImmutableMap.of("key", "value"));
+    when(tableOps.describeTable(any(), any(), 
any())).thenReturn(createTableResponse);
+
+    DescribeTableRequest tableRequest = new DescribeTableRequest();
+    Response resp =
+        target(String.format("/v1/table/%s/describe", tableIds))
+            .queryParam("delimiter", delimiter)
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(tableRequest, 
MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(Response.Status.OK.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+    DescribeTableResponse response = 
resp.readEntity(DescribeTableResponse.class);
+    Assertions.assertEquals(createTableResponse.getLocation(), 
response.getLocation());
+    Assertions.assertEquals(createTableResponse.getProperties(), 
response.getProperties());
+
+    // Test not found exception
+    Mockito.reset(tableOps);
+    when(tableOps.describeTable(any(), any(), any()))
+        .thenThrow(
+            LanceNamespaceException.notFound(
+                "Table not found", "NoSuchTableException", tableIds, ""));
+    resp =
+        target(String.format("/v1/table/%s/describe", tableIds))
+            .queryParam("delimiter", delimiter)
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(tableRequest, 
MediaType.APPLICATION_JSON_TYPE));
+    Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), 
resp.getStatus());
+
+    // Test runtime exception
+    Mockito.reset(tableOps);
+    when(tableOps.describeTable(any(), any(), any()))
+        .thenThrow(new RuntimeException("Runtime exception"));
+    resp =
+        target(String.format("/v1/table/%s/describe", tableIds))
+            .queryParam("delimiter", delimiter)
+            .request(MediaType.APPLICATION_JSON_TYPE)
+            .post(Entity.entity(tableRequest, 
MediaType.APPLICATION_JSON_TYPE));
+
+    Assertions.assertEquals(
+        Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), 
resp.getStatus());
+    Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE, 
resp.getMediaType());
+    ErrorResponse errorResp = resp.readEntity(ErrorResponse.class);
+    Assertions.assertEquals("Runtime exception", errorResp.getError());
+    Assertions.assertEquals(RuntimeException.class.getSimpleName(), 
errorResp.getType());
+  }
 }
diff --git a/lance/lance-rest-server/src/test/resources/log4j2.properties 
b/lance/lance-rest-server/src/test/resources/log4j2.properties
new file mode 100644
index 0000000000..b5db8a1ffc
--- /dev/null
+++ b/lance/lance-rest-server/src/test/resources/log4j2.properties
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set to debug or trace if log4j initialization is failing
+status = info
+
+# Name of the configuration
+name = ConsoleLogConfig
+
+# Console appender configuration
+appender.console.type = Console
+appender.console.name = consoleLogger
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L - 
%m%n
+
+# Log files location
+property.logPath = 
${sys:gravitino.log.path:-build/lance-rest-integration-test.log}
+
+# File appender configuration
+appender.file.type = File
+appender.file.name = fileLogger
+appender.file.fileName = ${logPath}
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c - %m%n
+
+# Root logger level
+rootLogger.level = info
+
+# Root logger referring to console and file appenders
+rootLogger.appenderRef.stdout.ref = consoleLogger
+rootLogger.appenderRef.file.ref = fileLogger
+
+# File appender configuration for testcontainers
+appender.testcontainersFile.type = File
+appender.testcontainersFile.name = testcontainersLogger
+appender.testcontainersFile.fileName = build/testcontainers.log
+appender.testcontainersFile.layout.type = PatternLayout
+appender.testcontainersFile.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] 
%-5p %c - %m%n
+
+# Logger for testcontainers
+logger.testcontainers.name = org.testcontainers
+logger.testcontainers.level = debug
+logger.testcontainers.additivity = false
+logger.testcontainers.appenderRef.file.ref = testcontainersLogger
+
+logger.tc.name = tc
+logger.tc.level = debug
+logger.tc.additivity = false
+logger.tc.appenderRef.file.ref = testcontainersLogger
+
+logger.docker.name = com.github.dockerjava
+logger.docker.level = warn
+logger.docker.additivity = false
+logger.docker.appenderRef.file.ref = testcontainersLogger
+
+logger.http.name = 
com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.wire
+logger.http.level = off

Reply via email to