This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 60b822b39d [#8892][#8893] feat(Lance-REST-Server): Lance namespace and 
table operations (#9182)
60b822b39d is described below

commit 60b822b39da5c463f6be49bc34601b96c0b15e14
Author: Junda Yang <[email protected]>
AuthorDate: Wed Nov 19 18:37:29 2025 -0800

    [#8892][#8893] feat(Lance-REST-Server): Lance namespace and table 
operations (#9182)
    
    ### What changes were proposed in this pull request?
    
    This PR implements table and namespace operations for Lance REST Server:
    
    - Namespace Operations: Full CRUD APIs (create, list, load, alter, drop)
    with GravitinoLanceNamespaceWrapper and REST endpoints
    - Table Operations: Create, load, and alter table support with
    LanceDataTypeConverter for type mapping and Arrow IPC for schema
    exchange
    - Configuration: Refined Lance config keys, added
    gravitino-lance-rest-server.conf.template, and optimized build scripts
    
    ### Why are the changes needed?
    
    The Lance REST Server lacked essential namespace and table operations
    required for production use. These changes enable:
    
    - Full CRUD operations on Lance namespaces and tables through REST APIs
    - Integration with external systems (Spark, Flink, etc.)
    - Lance as a first-class table format in Gravitino's generic lakehouse
    catalog
    
    Fix: #8892
    Fix: #8893
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, new REST APIs added:
    
    - Namespace: POST/GET/PUT/DELETE /v1/namespaces/{namespace}
    - Table: POST/GET/PUT /v1/namespaces/{namespace}/tables/{table}
    - Configuration: Added conf/gravitino-lance-rest-server.conf.template
    and updated Lance property keys.
    
    ### How was this patch tested?
    
    Unit tests: TestLanceConfig, TestArrowIPC, updated TestRelationalCatalog
    and TestTableDTO
    
    ---------
    
    Co-authored-by: mchades <[email protected]>
    Co-authored-by: Mini Yu <[email protected]>
    Co-authored-by: Jerry Shao <[email protected]>
---
 .../org/apache/gravitino/rel/indexes/Index.java    |  45 ++
 .../GenericLakehouseCatalogOperations.java         |  23 +-
 .../lakehouse/lance/LanceCatalogOperations.java    | 120 +++-
 .../lakehouse/lance/LanceDataTypeConverter.java    |  69 +++
 .../gravitino/client/TestRelationalCatalog.java    |  10 -
 .../org/apache/gravitino/dto/rel/TableDTO.java     |   2 -
 .../gravitino/dto/requests/TableCreateRequest.java |   3 -
 .../gravitino/dto/responses/TableResponse.java     |   3 -
 .../java/org/apache/gravitino/json/JsonUtils.java  |  22 +-
 .../org/apache/gravitino/dto/rel/TestTableDTO.java |  13 +-
 conf/gravitino-lance-rest-server.conf.template     |  49 ++
 conf/gravitino.conf.template                       |  17 +-
 .../catalog/TableOperationDispatcher.java          |  51 ++
 .../gravitino/lance/common/config/LanceConfig.java |  35 +-
 .../lance/common/ops/LanceNamespaceOperations.java |  11 +-
 .../lance/common/ops/LanceTableOperations.java     |  15 +-
 .../common/ops/arrow/ArrowRecordBatchList.java     |  40 ++
 .../gravitino/GravitinoLanceNamespaceWrapper.java  | 666 ++++++++++++++++++++-
 .../gravitino/lance/common/TestArrowIPC.java       |  83 +++
 .../lance/common/config/TestLanceConfig.java       |  33 +-
 lance/lance-rest-server/build.gradle.kts           |   3 +-
 .../apache/gravitino/lance/LanceRESTService.java   |   6 +-
 .../service/rest/LanceNamespaceOperations.java     | 111 +++-
 .../lance/service/rest/LanceTableOperations.java   |  89 ++-
 .../apache/gravitino/server/TestServerConfig.java  |   3 +-
 .../catalog/jdbc/mysql/MySQLMetadataAdapter.java   |   2 +
 26 files changed, 1378 insertions(+), 146 deletions(-)

diff --git a/api/src/main/java/org/apache/gravitino/rel/indexes/Index.java 
b/api/src/main/java/org/apache/gravitino/rel/indexes/Index.java
index 4b92977fd5..fd3beb0e67 100644
--- a/api/src/main/java/org/apache/gravitino/rel/indexes/Index.java
+++ b/api/src/main/java/org/apache/gravitino/rel/indexes/Index.java
@@ -67,5 +67,50 @@ public interface Index {
      * UNIQUE KEY helps in avoiding redundancy and ensuring data accuracy in 
the database.
      */
     UNIQUE_KEY,
+
+    // The following index types are specific to Lance, for more, please see: 
IndexType in LanceDB
+    /**
+     * SCALAR index is used to optimize searches on scalar data types such as 
integers, floats,
+     * strings, etc. Currently, this type is only applicable to Lance.
+     */
+    SCALAR,
+    /**
+     * BTREE index is a balanced tree data structure that maintains sorted 
data and allows for
+     * logarithmic time complexity for search, insert, and delete operations. 
Currently, this type
+     * is only applicable to Lance.
+     */
+    BTREE,
+    /**
+     * Bitmap index is a type of database index that uses bit arrays (bitmaps) 
to represent the
+     * presence or absence of values in a column, enabling efficient querying 
and data retrieval.
+     * Currently, this type is only applicable to Lance.
+     */
+    BITMAP,
+    /**
+     * LABEL_LIST index is used to optimize searches on columns containing 
lists of labels or tags.
+     * Currently, this type is only applicable to Lance.
+     */
+    LABEL_LIST,
+    /**
+     * INVERTED index is a data structure used to optimize full-text searches 
by mapping terms to
+     * their locations within a dataset, allowing for quick retrieval of 
documents containing
+     * specific words or phrases. Currently, this type is only applicable to 
Lance.
+     */
+    INVERTED,
+    /**
+     * VECTOR index is used to optimize similarity searches in 
high-dimensional vector spaces.
+     * Currently, this type is only applicable to Lance.
+     */
+    VECTOR,
+    /** IVF_FLAT (Inverted File with Flat quantization) is an indexing method 
used for efficient */
+    IVF_FLAT,
+    /** IVF_SQ (Inverted File with Scalar Quantization) is an indexing method 
used for efficient */
+    IVF_SQ,
+    /** IVF_PQ (Inverted File with Product Quantization) is an indexing method 
used for efficient */
+    IVF_PQ,
+    /** IVF_HNSW_FLAT */
+    IVF_HNSW_SQ,
+    /** IVF_HNSW_PQ */
+    IVF_HNSW_PQ;
   }
 }
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
index 63a6935544..46b9687a21 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
@@ -272,10 +272,25 @@ public class GenericLakehouseCatalogOperations
   @Override
   public Table alterTable(NameIdentifier ident, TableChange... changes)
       throws NoSuchTableException, IllegalArgumentException {
-    // TODO(#8838): Implement table alteration
-    throw new UnsupportedOperationException(
-        "Table operations are not yet implemented. "
-            + "This feature is planned for a future release.");
+    EntityStore store = GravitinoEnv.getInstance().entityStore();
+    Namespace namespace = ident.namespace();
+    try {
+      GenericTableEntity tableEntity =
+          store.get(ident, Entity.EntityType.TABLE, GenericTableEntity.class);
+      Map<String, String> tableProperties = tableEntity.getProperties();
+      String format = tableProperties.getOrDefault("format", "lance");
+      LakehouseCatalogOperations lakehouseCatalogOperations =
+          SUPPORTED_FORMATS.compute(
+              format,
+              (k, v) ->
+                  v == null
+                      ? createLakehouseCatalogOperations(
+                          format, tableProperties, catalogInfo, 
propertiesMetadata)
+                      : v);
+      return lakehouseCatalogOperations.alterTable(ident, changes);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to list tables under schema " + 
namespace, e);
+    }
   }
 
   @Override
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
index 3e1146b7ad..442a6f6c2c 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
@@ -20,34 +20,49 @@
 package org.apache.gravitino.catalog.lakehouse.lance;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import com.lancedb.lance.Dataset;
 import com.lancedb.lance.WriteParams;
+import com.lancedb.lance.index.DistanceType;
+import com.lancedb.lance.index.IndexParams;
+import com.lancedb.lance.index.IndexType;
+import com.lancedb.lance.index.vector.VectorIndexParams;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.catalog.lakehouse.LakehouseCatalogOperations;
 import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.gravitino.connector.GenericLakehouseTable;
 import org.apache.gravitino.connector.HasPropertyMetadata;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
 import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GenericTableEntity;
 import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.GenericTable;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
 import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes.IndexImpl;
 import org.apache.gravitino.utils.PrincipalUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -133,20 +148,31 @@ public class LanceCatalogOperations implements 
LakehouseCatalogOperations {
             .map(
                 col -> {
                   boolean nullable = col.nullable();
+                  ArrowType parentType = 
converter.fromGravitino(col.dataType());
+                  List<ArrowType> childTypes = 
converter.getChildTypes(col.dataType());
+                  List<Field> childFields =
+                      childTypes.stream()
+                          .map(
+                              childType ->
+                                  new org.apache.arrow.vector.types.pojo.Field(
+                                      "",
+                                      
org.apache.arrow.vector.types.pojo.FieldType.nullable(
+                                          childType),
+                                      null))
+                          .collect(Collectors.toList());
+
                   if (nullable) {
                     return new org.apache.arrow.vector.types.pojo.Field(
                         col.name(),
-                        org.apache.arrow.vector.types.pojo.FieldType.nullable(
-                            converter.fromGravitino(col.dataType())),
-                        null);
+                        
org.apache.arrow.vector.types.pojo.FieldType.nullable(parentType),
+                        childFields);
                   }
 
                   // not nullable
                   return new org.apache.arrow.vector.types.pojo.Field(
                       col.name(),
-                      org.apache.arrow.vector.types.pojo.FieldType.notNullable(
-                          converter.fromGravitino(col.dataType())),
-                      null);
+                      
org.apache.arrow.vector.types.pojo.FieldType.notNullable(parentType),
+                      childFields);
                 })
             .collect(Collectors.toList());
     return new org.apache.arrow.vector.types.pojo.Schema(fields);
@@ -155,8 +181,86 @@ public class LanceCatalogOperations implements 
LakehouseCatalogOperations {
   @Override
   public Table alterTable(NameIdentifier ident, TableChange... changes)
       throws NoSuchTableException, IllegalArgumentException {
-    // Use another PRs to implement alter table for Lance tables
-    return null;
+    // Lance only supports adding indexes for now.
+    List<Index> addedIndexes = Lists.newArrayList();
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.AddIndex addIndexChange) {
+        Index index =
+            IndexImpl.builder()
+                .withIndexType(addIndexChange.getType())
+                .withName(addIndexChange.getName())
+                .withFieldNames(addIndexChange.getFieldNames())
+                .build();
+        addedIndexes.add(index);
+      }
+    }
+
+    EntityStore entityStore = GravitinoEnv.getInstance().entityStore();
+    GenericTableEntity entity;
+    try {
+      entity = entityStore.get(ident, Entity.EntityType.TABLE, 
GenericTableEntity.class);
+    } catch (NoSuchEntityException e) {
+      throw new NoSuchTableException("No such table: %s", ident);
+    } catch (IOException ioe) {
+      throw new RuntimeException("Failed to load table entity for: " + ident, 
ioe);
+    }
+
+    String location = entity.getProperties().get("location");
+    try (Dataset dataset = Dataset.open(location, new RootAllocator())) {
+      // For Lance, we only support adding indexes, so in fact, we can't 
handle drop index here.
+      for (Index index : addedIndexes) {
+        IndexType indexType = IndexType.valueOf(index.type().name());
+        IndexParams indexParams = getIndexParamsByIndexType(indexType);
+
+        dataset.createIndex(
+            Arrays.stream(index.fieldNames())
+                .map(fieldPath -> String.join(".", fieldPath))
+                .collect(Collectors.toList()),
+            indexType,
+            Optional.of(index.name()),
+            indexParams,
+            true);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to alter Lance table: " + ident, e);
+    }
+
+    GenericTable oldTable = entity.toGenericTable();
+    Index[] newIndexes = oldTable.index();
+    for (Index index : addedIndexes) {
+      newIndexes = ArrayUtils.add(newIndexes, index);
+    }
+
+    return GenericLakehouseTable.builder()
+        .withFormat(oldTable.format())
+        .withProperties(oldTable.properties())
+        .withAuditInfo((AuditInfo) oldTable.auditInfo())
+        .withSortOrders(oldTable.sortOrder())
+        .withPartitioning(oldTable.partitioning())
+        .withDistribution(oldTable.distribution())
+        .withColumns(oldTable.columns())
+        .withIndexes(newIndexes)
+        .withName(oldTable.name())
+        .withComment(oldTable.comment())
+        .build();
+  }
+
+  private IndexParams getIndexParamsByIndexType(IndexType indexType) {
+    switch (indexType) {
+      case SCALAR:
+        return IndexParams.builder().build();
+      case VECTOR:
+        // TODO make these parameters configurable
+        int numberOfDimensions = 3; // this value should be determined 
dynamically based on the data
+        // Add properties to Index to set this value.
+        return IndexParams.builder()
+            .setVectorIndexParams(
+                VectorIndexParams.ivfPq(2, 8, numberOfDimensions, 
DistanceType.L2, 2))
+            .build();
+      default:
+        throw new IllegalArgumentException("Unsupported index type: " + 
indexType);
+    }
   }
 
   @Override
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
index 117863659e..d7966edd5e 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
@@ -19,6 +19,8 @@
 
 package org.apache.gravitino.catalog.lakehouse.lance;
 
+import com.google.common.collect.Lists;
+import java.util.List;
 import org.apache.arrow.vector.types.DateUnit;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
 import org.apache.arrow.vector.types.TimeUnit;
@@ -27,9 +29,11 @@ import org.apache.arrow.vector.types.pojo.ArrowType.Bool;
 import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint;
 import org.apache.arrow.vector.types.pojo.ArrowType.Int;
 import org.apache.gravitino.connector.DataTypeConverter;
+import org.apache.gravitino.json.JsonUtils;
 import org.apache.gravitino.rel.types.Type;
 import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.rel.types.Types.FixedType;
+import org.apache.gravitino.rel.types.Types.UnparsedType;
 
 public class LanceDataTypeConverter implements DataTypeConverter<ArrowType, 
ArrowType> {
 
@@ -67,6 +71,30 @@ public class LanceDataTypeConverter implements 
DataTypeConverter<ArrowType, Arro
         return new ArrowType.FixedSizeBinary(fixedType.length());
       case BINARY:
         return new ArrowType.Binary();
+      case UNPARSED:
+        String typeStr = ((UnparsedType) type).unparsedType().toString();
+        try {
+          Type t = JsonUtils.anyFieldMapper().readValue(typeStr, Type.class);
+          if (t instanceof Types.ListType) {
+            return ArrowType.List.INSTANCE;
+          } else if (t instanceof Types.MapType) {
+            return new ArrowType.Map(false);
+          } else if (t instanceof Types.StructType) {
+            return ArrowType.Struct.INSTANCE;
+          } else {
+            throw new UnsupportedOperationException(
+                "Unsupported UnparsedType conversion: " + t.simpleString());
+          }
+        } catch (Exception e) {
+          // FixedSizeListArray(integer, 3)
+          if (typeStr.startsWith("FixedSizeListArray")) {
+            int size =
+                Integer.parseInt(
+                    typeStr.substring(typeStr.indexOf(',') + 1, 
typeStr.indexOf(')')).trim());
+            return new ArrowType.FixedSizeList(size);
+          }
+          throw new UnsupportedOperationException("Failed to parse 
UnparsedType: " + typeStr, e);
+        }
       default:
         throw new UnsupportedOperationException("Unsupported Gravitino type: " 
+ type.name());
     }
@@ -116,8 +144,49 @@ public class LanceDataTypeConverter implements 
DataTypeConverter<ArrowType, Arro
       return Types.StringType.get();
     } else if (arrowType instanceof ArrowType.Binary) {
       return Types.BinaryType.get();
+      // TODO handle complex types like List, Map, Struct
     } else {
       throw new UnsupportedOperationException("Unsupported Arrow type: " + 
arrowType);
     }
   }
+
+  public List<ArrowType> getChildTypes(Type parentType) {
+    if (parentType.name() != Type.Name.UNPARSED) {
+      return List.of();
+    }
+
+    List<ArrowType> arrowTypes = Lists.newArrayList();
+    String typeStr = ((UnparsedType) parentType).unparsedType().toString();
+    try {
+      Type t = JsonUtils.anyFieldMapper().readValue(typeStr, Type.class);
+      if (t instanceof Types.ListType listType) {
+        arrowTypes.add(fromGravitino(listType.elementType()));
+      } else if (t instanceof Types.MapType mapType) {
+        arrowTypes.add(fromGravitino(mapType.keyType()));
+        arrowTypes.add(fromGravitino(mapType.valueType()));
+      } else {
+        // TODO support struct type.
+        throw new UnsupportedOperationException(
+            "Unsupported UnparsedType conversion: " + t.simpleString());
+      }
+
+      return arrowTypes;
+    } catch (Exception e) {
+      // FixedSizeListArray(integer, 3)
+
+      try {
+        if (typeStr.startsWith("FixedSizeListArray")) {
+          String type = typeStr.substring(typeStr.indexOf('(') + 1, 
typeStr.indexOf(',')).trim();
+          Type childType = JsonUtils.anyFieldMapper().readValue("\"" + type + 
"\"", Type.class);
+          arrowTypes.add(fromGravitino(childType));
+
+          return arrowTypes;
+        }
+      } catch (Exception e1) {
+        throw new UnsupportedOperationException("Failed to parse UnparsedType: 
" + typeStr, e1);
+      }
+
+      throw new UnsupportedOperationException("Failed to parse UnparsedType: " 
+ typeStr, e);
+    }
+  }
 }
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestRelationalCatalog.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestRelationalCatalog.java
index c5e36247bb..ff0a29c032 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestRelationalCatalog.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestRelationalCatalog.java
@@ -571,16 +571,6 @@ public class TestRelationalCatalog extends TestBase {
                     tableId, fromDTOs(columns), "comment", emptyMap, 
errorPartitioning));
     Assertions.assertTrue(ex2.getMessage().contains("not found in table"));
 
-    // Test empty columns
-    Throwable ex3 =
-        Assertions.assertThrows(
-            IllegalArgumentException.class,
-            () ->
-                tableCatalog.createTable(
-                    tableId, new Column[0], "comment", emptyMap, 
errorPartitioning));
-    Assertions.assertTrue(
-        ex3.getMessage().contains("\"columns\" field is required and cannot be 
empty"));
-
     // Test partitioning with assignments
     Partitioning[] partitioningWithAssignments = {
       RangePartitioningDTO.of(
diff --git a/common/src/main/java/org/apache/gravitino/dto/rel/TableDTO.java 
b/common/src/main/java/org/apache/gravitino/dto/rel/TableDTO.java
index c1fb160a38..a23a1cd37c 100644
--- a/common/src/main/java/org/apache/gravitino/dto/rel/TableDTO.java
+++ b/common/src/main/java/org/apache/gravitino/dto/rel/TableDTO.java
@@ -314,8 +314,6 @@ public class TableDTO implements Table {
     public TableDTO build() {
       Preconditions.checkArgument(name != null && !name.isEmpty(), "name 
cannot be null or empty");
       Preconditions.checkArgument(audit != null, "audit cannot be null");
-      Preconditions.checkArgument(
-          columns != null && columns.length > 0, "columns cannot be null or 
empty");
 
       return new TableDTO(
           name,
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/requests/TableCreateRequest.java
 
b/common/src/main/java/org/apache/gravitino/dto/requests/TableCreateRequest.java
index 965de27c17..e9a33e6d77 100644
--- 
a/common/src/main/java/org/apache/gravitino/dto/requests/TableCreateRequest.java
+++ 
b/common/src/main/java/org/apache/gravitino/dto/requests/TableCreateRequest.java
@@ -122,9 +122,6 @@ public class TableCreateRequest implements RESTRequest {
   public void validate() throws IllegalArgumentException {
     Preconditions.checkArgument(
         StringUtils.isNotBlank(name), "\"name\" field is required and cannot 
be empty");
-    Preconditions.checkArgument(
-        columns != null && columns.length != 0,
-        "\"columns\" field is required and cannot be empty");
 
     if (sortOrders != null) {
       Arrays.stream(sortOrders).forEach(sortOrder -> 
sortOrder.validate(columns));
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/responses/TableResponse.java 
b/common/src/main/java/org/apache/gravitino/dto/responses/TableResponse.java
index d3dfa21599..a1cc326295 100644
--- a/common/src/main/java/org/apache/gravitino/dto/responses/TableResponse.java
+++ b/common/src/main/java/org/apache/gravitino/dto/responses/TableResponse.java
@@ -63,9 +63,6 @@ public class TableResponse extends BaseResponse {
     Preconditions.checkArgument(table != null, "table must not be null");
     Preconditions.checkArgument(
         StringUtils.isNotBlank(table.name()), "table 'name' must not be null 
and empty");
-    Preconditions.checkArgument(
-        table.columns() != null && table.columns().length > 0,
-        "table 'columns' must not be null and empty");
     Preconditions.checkArgument(table.auditInfo() != null, "table 'audit' must 
not be null");
     Preconditions.checkArgument(
         table.partitioning() != null, "table 'partitions' must not be null");
diff --git a/common/src/main/java/org/apache/gravitino/json/JsonUtils.java 
b/common/src/main/java/org/apache/gravitino/json/JsonUtils.java
index 5fd2ab3f8f..fb9a33f268 100644
--- a/common/src/main/java/org/apache/gravitino/json/JsonUtils.java
+++ b/common/src/main/java/org/apache/gravitino/json/JsonUtils.java
@@ -715,7 +715,7 @@ public class JsonUtils {
       String text = node.asText().toLowerCase();
       return text.equals(Types.NullType.get().simpleString())
           ? Types.NullType.get()
-          : fromPrimitiveTypeString(text);
+          : fromPrimitiveTypeString(text, node.asText());
     }
 
     if (node.isObject() && node.has(TYPE)) {
@@ -834,49 +834,49 @@ public class JsonUtils {
     gen.writeEndObject();
   }
 
-  private static Type fromPrimitiveTypeString(String typeString) {
-    Type.PrimitiveType primitiveType = TYPES.get(typeString);
+  private static Type fromPrimitiveTypeString(String lowerTypeString, String 
orignalTypeString) {
+    Type.PrimitiveType primitiveType = TYPES.get(lowerTypeString);
     if (primitiveType != null) {
       return primitiveType;
     }
 
-    Matcher fixed = FIXED.matcher(typeString);
+    Matcher fixed = FIXED.matcher(lowerTypeString);
     if (fixed.matches()) {
       return Types.FixedType.of(Integer.parseInt(fixed.group(1)));
     }
 
-    Matcher fixedChar = FIXEDCHAR.matcher(typeString);
+    Matcher fixedChar = FIXEDCHAR.matcher(lowerTypeString);
     if (fixedChar.matches()) {
       return Types.FixedCharType.of(Integer.parseInt(fixedChar.group(1)));
     }
 
-    Matcher varchar = VARCHAR.matcher(typeString);
+    Matcher varchar = VARCHAR.matcher(lowerTypeString);
     if (varchar.matches()) {
       return Types.VarCharType.of(Integer.parseInt(varchar.group(1)));
     }
 
-    Matcher decimal = DECIMAL.matcher(typeString);
+    Matcher decimal = DECIMAL.matcher(lowerTypeString);
     if (decimal.matches()) {
       return Types.DecimalType.of(
           Integer.parseInt(decimal.group(1)), 
Integer.parseInt(decimal.group(2)));
     }
 
-    Matcher time = TIME.matcher(typeString);
+    Matcher time = TIME.matcher(lowerTypeString);
     if (time.matches()) {
       return Types.TimeType.of(Integer.parseInt(time.group(1)));
     }
 
-    Matcher timestampTz = TIMESTAMP_TZ.matcher(typeString);
+    Matcher timestampTz = TIMESTAMP_TZ.matcher(lowerTypeString);
     if (timestampTz.matches()) {
       return 
Types.TimestampType.withTimeZone(Integer.parseInt(timestampTz.group(1)));
     }
 
-    Matcher timestamp = TIMESTAMP.matcher(typeString);
+    Matcher timestamp = TIMESTAMP.matcher(lowerTypeString);
     if (timestamp.matches()) {
       return 
Types.TimestampType.withoutTimeZone(Integer.parseInt(timestamp.group(1)));
     }
 
-    return Types.UnparsedType.of(typeString);
+    return Types.UnparsedType.of(orignalTypeString);
   }
 
   private static Types.StructType readStructType(JsonNode node) {
diff --git 
a/common/src/test/java/org/apache/gravitino/dto/rel/TestTableDTO.java 
b/common/src/test/java/org/apache/gravitino/dto/rel/TestTableDTO.java
index 5f0733c5ea..44de2e03d4 100644
--- a/common/src/test/java/org/apache/gravitino/dto/rel/TestTableDTO.java
+++ b/common/src/test/java/org/apache/gravitino/dto/rel/TestTableDTO.java
@@ -19,11 +19,11 @@
 package org.apache.gravitino.dto.rel;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.time.Instant;
 import org.apache.gravitino.dto.AuditDTO;
 import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 public class TestTableDTO {
@@ -32,7 +32,10 @@ public class TestTableDTO {
     AuditDTO audit =
         
AuditDTO.builder().withCreator("creator").withCreateTime(Instant.now()).build();
     TableDTO.Builder<?> builder = 
TableDTO.builder().withName("t1").withAudit(audit);
-    assertThrows(IllegalArgumentException.class, builder::build);
+    Assertions.assertDoesNotThrow(
+        () -> {
+          builder.build();
+        });
   }
 
   @Test
@@ -41,7 +44,11 @@ public class TestTableDTO {
         
AuditDTO.builder().withCreator("creator").withCreateTime(Instant.now()).build();
     TableDTO.Builder<?> builder =
         TableDTO.builder().withName("t1").withAudit(audit).withColumns(new 
ColumnDTO[0]);
-    assertThrows(IllegalArgumentException.class, builder::build);
+
+    Assertions.assertDoesNotThrow(
+        () -> {
+          builder.build();
+        });
   }
 
   @Test
diff --git a/conf/gravitino-lance-rest-server.conf.template 
b/conf/gravitino-lance-rest-server.conf.template
new file mode 100644
index 0000000000..d74fec3eb7
--- /dev/null
+++ b/conf/gravitino-lance-rest-server.conf.template
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# THE CONFIGURATION FOR Lance REST SERVER
+gravitino.lance-rest.shutdown.timeout = 3000
+
+# THE CONFIGURATION FOR Lance REST WEB SERVER
+# The host name of the built-in web server
+gravitino.lance-rest.host = 0.0.0.0
+# The http port number of the built-in web server
+gravitino.lance-rest.httpPort = 9101
+# The min thread size of the built-in web server
+gravitino.lance-rest.minThreads = 24
+# The max thread size of the built-in web server
+gravitino.lance-rest.maxThreads = 200
+# The stop timeout of the built-in web server
+gravitino.lance-rest.stopTimeout = 30000
+# The timeout of idle connections
+gravitino.lance-rest.idleTimeout = 30000
+# The executor thread pool work queue size of the built-in web server
+gravitino.lance-rest.threadPoolWorkQueueSize = 100
+# The request header size of the built-in web server
+gravitino.lance-rest.requestHeaderSize = 131072
+# The response header size of the built-in web server
+gravitino.lance-rest.responseHeaderSize = 131072
+
+# THE CONFIGURATION FOR Lance namespace backend
+# The backend Lance namespace for Lance REST service, it's recommended to use 
Gravitino
+gravitino.lance-rest.namespace-backend = gravitino
+# The uri of the Lance REST service gravitino namespace backend
+gravitino.lance-rest.gravitino.uri = http://localhost:8090
+# The metalake name used for Lance REST service gravitino namespace backend, 
please create the metalake before using it, and configure the metalake name 
here.
+# gravitino.lance-rest.gravitino.metalake-name = metalake
diff --git a/conf/gravitino.conf.template b/conf/gravitino.conf.template
index 418d14f14c..44a9e50f82 100644
--- a/conf/gravitino.conf.template
+++ b/conf/gravitino.conf.template
@@ -81,7 +81,7 @@ gravitino.authorization.enable = false
 gravitino.authorization.serviceAdmins = anonymous
 
 # THE CONFIGURATION FOR AUXILIARY SERVICE
-# Auxiliary service names, separate by ','
+# Auxiliary service names, separate by ',', currently support iceberg-rest and 
lance-rest
 gravitino.auxService.names = iceberg-rest
 # Iceberg REST service classpath
 gravitino.iceberg-rest.classpath = iceberg-rest-server/libs, 
iceberg-rest-server/conf
@@ -93,3 +93,18 @@ gravitino.iceberg-rest.httpPort = 9001
 gravitino.iceberg-rest.catalog-backend = memory
 # The warehouse directory of Iceberg catalog for Iceberg REST service
 gravitino.iceberg-rest.warehouse = /tmp/
+
+# Lance REST service classpath
+gravitino.lance-rest.classpath = lance-rest-server/libs
+# Lance REST service host
+gravitino.lance-rest.host = 0.0.0.0
+# Lance REST service http port
+gravitino.lance-rest.httpPort = 9101
+
+# THE CONFIGURATION FOR Lance namespace backend
+# The backend Lance namespace for Lance REST service, it's recommended to use 
Gravitino
+gravitino.lance-rest.namespace-backend = gravitino
+# The uri of the Lance REST service gravitino namespace backend
+gravitino.lance-rest.gravitino.uri = http://localhost:8090
+# The metalake name used for Lance REST service gravitino namespace backend, 
please create the metalake first before using it, and configure the metalake 
name here.
+# gravitino.lance-rest.gravitino.metalake-name = metalake
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index 1cbab0d6ed..27119f0c99 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -262,6 +262,57 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
             tableId = te.id();
           }
 
+          if (isGenericLakehouseCatalog(catalogIdent)) {
+            // For generic lakehouse catalog, we only update the table entity 
with basic info.
+            GenericTableEntity genericTableEntity =
+                operateOnEntity(
+                    ident, id -> store.get(id, TABLE, 
GenericTableEntity.class), "GET", tableId);
+            if (genericTableEntity == null) {
+              throw new NoSuchTableException("No such table: %s", ident);
+            }
+
+            GenericTable genericTable = (GenericTable) alteredTable;
+            GenericTableEntity updatedGenericTableEntity =
+                operateOnEntity(
+                    ident,
+                    id ->
+                        store.update(
+                            id,
+                            GenericTableEntity.class,
+                            TABLE,
+                            tableEntity ->
+                                GenericTableEntity.getBuilder()
+                                    .withId(tableEntity.id())
+                                    .withName(alteredTable.name())
+                                    .withNamespace(getNewNamespace(ident, 
changes))
+                                    .withFormat(genericTable.format())
+                                    .withAuditInfo(
+                                        AuditInfo.builder()
+                                            
.withCreator(tableEntity.auditInfo().creator())
+                                            
.withCreateTime(tableEntity.auditInfo().createTime())
+                                            .withLastModifier(
+                                                
PrincipalUtils.getCurrentPrincipal().getName())
+                                            
.withLastModifiedTime(Instant.now())
+                                            .build())
+                                    .withColumns(tableEntity.columns())
+                                    .withIndexes(genericTable.index())
+                                    
.withDistribution(genericTable.distribution())
+                                    
.withPartitions(genericTable.partitioning())
+                                    .withSortOrder(genericTable.sortOrder())
+                                    .withProperties(genericTable.properties())
+                                    .withComment(genericTable.comment())
+                                    .build()),
+                    "UPDATE",
+                    tableId);
+
+            return EntityCombinedTable.of(alteredTable, 
updatedGenericTableEntity)
+                .withHiddenProperties(
+                    getHiddenPropertyNames(
+                        getCatalogIdentifier(ident),
+                        HasPropertyMetadata::tablePropertiesMetadata,
+                        alteredTable.properties()));
+          }
+
           TableEntity updatedTableEntity =
               operateOnEntity(
                   ident,
diff --git 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/config/LanceConfig.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/config/LanceConfig.java
index b6614c87ee..f517d1a349 100644
--- 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/config/LanceConfig.java
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/config/LanceConfig.java
@@ -36,36 +36,29 @@ public class LanceConfig extends Config implements 
OverwriteDefaultConfig {
 
   public static final int DEFAULT_LANCE_REST_SERVICE_HTTP_PORT = 9101;
   public static final int DEFAULT_LANCE_REST_SERVICE_HTTPS_PORT = 9533;
-  public static final String DEFAULT_NAMESPACE_BACKEND = "gravitino";
-  public static final String DEFAULT_URI = "http://localhost:8090";;
-
-  public static final ConfigEntry<String> CATALOG_NAME =
-      new ConfigBuilder(LANCE_CONFIG_PREFIX + "catalog-name")
-          .doc("Logical Lance catalog served by the REST endpoint")
-          .version(ConfigConstants.VERSION_0_1_0)
-          .stringConf()
-          .createWithDefault("default");
+  public static final String GRAVITINO_NAMESPACE_BACKEND = "gravitino";
+  public static final String GRAVITINO_URI = "http://localhost:8090";;
 
   public static final ConfigEntry<String> NAMESPACE_BACKEND =
-      new ConfigBuilder(LANCE_CONFIG_PREFIX + CONFIG_NAMESPACE_BACKEND)
+      new ConfigBuilder(CONFIG_NAMESPACE_BACKEND)
           .doc("The backend implementation for namespace operations")
           .version(ConfigConstants.VERSION_0_1_0)
           .stringConf()
-          .createWithDefault(DEFAULT_NAMESPACE_BACKEND);
+          .createWithDefault(GRAVITINO_NAMESPACE_BACKEND);
 
   public static final ConfigEntry<String> METALAKE_NAME =
-      new ConfigBuilder(LANCE_CONFIG_PREFIX + CONFIG_METALAKE)
-          .doc("The Metalake name for Gravitino namespace backend")
+      new ConfigBuilder(LANCE_CONFIG_PREFIX + GRAVITINO_NAMESPACE_BACKEND + 
"." + CONFIG_METALAKE)
+          .doc("The Metalake name for Lance Gravitino namespace backend")
           .version(ConfigConstants.VERSION_0_1_0)
           .stringConf()
           .create();
 
-  public static final ConfigEntry<String> NAMESPACE_URI =
-      new ConfigBuilder(LANCE_CONFIG_PREFIX + CONFIG_URI)
-          .doc("The URI for the namespace backend, e.g., Gravitino server URI")
+  public static final ConfigEntry<String> NAMESPACE_BACKEND_URI =
+      new ConfigBuilder(LANCE_CONFIG_PREFIX + GRAVITINO_NAMESPACE_BACKEND + 
"." + CONFIG_URI)
+          .doc("The URI of the namespace backend, e.g., Gravitino server URI")
           .version(ConfigConstants.VERSION_0_1_0)
           .stringConf()
-          .createWithDefault(DEFAULT_URI);
+          .createWithDefault(GRAVITINO_URI);
 
   public LanceConfig(Map<String, String> properties) {
     super(false);
@@ -76,12 +69,12 @@ public class LanceConfig extends Config implements 
OverwriteDefaultConfig {
     super(false);
   }
 
-  public String getCatalogName() {
-    return get(CATALOG_NAME);
+  public String getNamespaceBackend() {
+    return get(NAMESPACE_BACKEND);
   }
 
-  public String getNamespaceUri() {
-    return get(NAMESPACE_URI);
+  public String getNamespaceBackendUri() {
+    return get(NAMESPACE_BACKEND_URI);
   }
 
   public String getGravitinoMetalake() {
diff --git 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceNamespaceOperations.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceNamespaceOperations.java
index 1b5da98ec0..49141665a5 100644
--- 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceNamespaceOperations.java
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceNamespaceOperations.java
@@ -25,6 +25,7 @@ import 
com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
 import com.lancedb.lance.namespace.model.DropNamespaceRequest;
 import com.lancedb.lance.namespace.model.DropNamespaceResponse;
 import com.lancedb.lance.namespace.model.ListNamespacesResponse;
+import com.lancedb.lance.namespace.model.ListTablesResponse;
 import java.util.Map;
 
 public interface LanceNamespaceOperations {
@@ -32,19 +33,21 @@ public interface LanceNamespaceOperations {
   ListNamespacesResponse listNamespaces(
       String namespaceId, String delimiter, String pageToken, Integer limit);
 
-  DescribeNamespaceResponse describeNamespace(String id, String delimiter);
+  DescribeNamespaceResponse describeNamespace(String namespaceId, String 
delimiter);
 
   CreateNamespaceResponse createNamespace(
-      String id,
+      String namespaceId,
       String delimiter,
       CreateNamespaceRequest.ModeEnum mode,
       Map<String, String> properties);
 
   DropNamespaceResponse dropNamespace(
-      String id,
+      String namespaceId,
       String delimiter,
       DropNamespaceRequest.ModeEnum mode,
       DropNamespaceRequest.BehaviorEnum behavior);
 
-  void namespaceExists(String id, String delimiter) throws 
LanceNamespaceException;
+  void namespaceExists(String namespaceId, String delimiter) throws 
LanceNamespaceException;
+
+  ListTablesResponse listTables(String id, String delimiter, String pageToken, 
Integer limit);
 }
diff --git 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
index 057dce8fb3..b8a967cd30 100644
--- 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
@@ -18,11 +18,20 @@
  */
 package org.apache.gravitino.lance.common.ops;
 
-import com.lancedb.lance.namespace.model.ListTablesResponse;
+import com.lancedb.lance.namespace.model.CreateTableResponse;
+import com.lancedb.lance.namespace.model.DescribeTableResponse;
+import java.util.Map;
 
 public interface LanceTableOperations {
 
-  ListTablesResponse listTables(String id, String delimiter, String pageToken, 
Integer limit);
+  DescribeTableResponse describeTable(String tableId, String delimiter);
 
-  // todo: add more table operation methods
+  CreateTableResponse createTable(
+      String tableId,
+      String mode,
+      String delimiter,
+      String tableLocation,
+      Map<String, String> tableProperties,
+      String rootCatalog,
+      byte[] arrowStreamBody);
 }
diff --git 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/arrow/ArrowRecordBatchList.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/arrow/ArrowRecordBatchList.java
new file mode 100644
index 0000000000..b0c6a089d5
--- /dev/null
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/arrow/ArrowRecordBatchList.java
@@ -0,0 +1,40 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lance.common.ops.arrow;
+
+import java.util.List;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+public class ArrowRecordBatchList {
+  private final Schema schema;
+
+  @SuppressWarnings("unused")
+  private final List<VectorSchemaRoot> recordBatches;
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public ArrowRecordBatchList(Schema schema, List<VectorSchemaRoot> 
recordBatches) {
+    this.schema = schema;
+    this.recordBatches = recordBatches;
+  }
+}
diff --git 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
index 59f637b5a1..fe6404a424 100644
--- 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
@@ -19,34 +19,83 @@
 package org.apache.gravitino.lance.common.ops.gravitino;
 
 import static 
org.apache.gravitino.lance.common.config.LanceConfig.METALAKE_NAME;
-import static 
org.apache.gravitino.lance.common.config.LanceConfig.NAMESPACE_URI;
+import static 
org.apache.gravitino.lance.common.config.LanceConfig.NAMESPACE_BACKEND_URI;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.lancedb.lance.namespace.LanceNamespaceException;
 import com.lancedb.lance.namespace.ObjectIdentifier;
 import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
+import com.lancedb.lance.namespace.model.CreateNamespaceRequest.ModeEnum;
 import com.lancedb.lance.namespace.model.CreateNamespaceResponse;
+import com.lancedb.lance.namespace.model.CreateTableResponse;
 import com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
+import com.lancedb.lance.namespace.model.DescribeTableResponse;
 import com.lancedb.lance.namespace.model.DropNamespaceRequest;
 import com.lancedb.lance.namespace.model.DropNamespaceResponse;
+import com.lancedb.lance.namespace.model.JsonArrowDataType;
+import com.lancedb.lance.namespace.model.JsonArrowField;
+import com.lancedb.lance.namespace.model.JsonArrowSchema;
 import com.lancedb.lance.namespace.model.ListNamespacesResponse;
 import com.lancedb.lance.namespace.model.ListTablesResponse;
+import com.lancedb.lance.namespace.util.CommonUtil;
 import com.lancedb.lance.namespace.util.PageUtil;
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.ArrowType.Bool;
+import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint;
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
+import org.apache.gravitino.CatalogChange;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.Schema;
+import org.apache.gravitino.SchemaChange;
 import org.apache.gravitino.client.GravitinoClient;
+import org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.exceptions.NonEmptyCatalogException;
+import org.apache.gravitino.exceptions.NonEmptySchemaException;
+import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.json.JsonUtils;
 import org.apache.gravitino.lance.common.config.LanceConfig;
 import org.apache.gravitino.lance.common.ops.LanceNamespaceOperations;
 import org.apache.gravitino.lance.common.ops.LanceTableOperations;
 import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
+import org.apache.gravitino.lance.common.ops.arrow.ArrowRecordBatchList;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.rel.types.Types.FixedType;
+import org.apache.gravitino.rel.types.Types.UnparsedType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,11 +111,11 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
 
   @Override
   protected void initialize() {
-    String uri = config().get(NAMESPACE_URI);
+    String uri = config().get(NAMESPACE_BACKEND_URI);
     String metalakeName = config().get(METALAKE_NAME);
     Preconditions.checkArgument(
         StringUtils.isNotBlank(metalakeName),
-        "Metalake name must be provided for Gravitino namespace backend");
+        "Metalake name must be provided for Lance Gravitino namespace 
backend");
 
     this.client = 
GravitinoClient.builder(uri).withMetalake(metalakeName).build();
   }
@@ -102,7 +151,6 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
     List<String> namespaces;
     switch (nsId.levels()) {
       case 0:
-        // List catalogs of type relational and provider generic-lakehouse
         namespaces =
             Arrays.stream(client.listCatalogsInfo())
                 .filter(this::isLakehouseCatalog)
@@ -111,16 +159,14 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
         break;
 
       case 1:
-        // List schemas under the catalog
-        String catalogName = nsId.levelAtListPos(0);
-        Catalog catalog = client.loadCatalog(catalogName);
-        if (!isLakehouseCatalog(catalog)) {
-          throw new NoSuchCatalogException("Catalog not found: %s", 
catalogName);
-        }
-
+        Catalog catalog = 
loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
         namespaces = Lists.newArrayList(catalog.asSchemas().listSchemas());
         break;
 
+      case 2:
+        namespaces = Lists.newArrayList();
+        break;
+
       default:
         throw new IllegalArgumentException(
             "Expected at most 2-level namespace but got: " + namespaceId);
@@ -136,39 +182,621 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
   }
 
   @Override
-  public DescribeNamespaceResponse describeNamespace(String id, String 
delimiter) {
-    throw new UnsupportedOperationException("Not implemented yet");
+  public DescribeNamespaceResponse describeNamespace(String namespaceId, 
String delimiter) {
+    ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter);
+    Preconditions.checkArgument(
+        nsId.levels() <= 2 && nsId.levels() > 0,
+        "Expected at most 2-level and at least 1-level namespace but got: %s",
+        namespaceId);
+
+    Catalog catalog = loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
+    Map<String, String> properties = Maps.newHashMap();
+
+    switch (nsId.levels()) {
+      case 1:
+        
Optional.ofNullable(catalog.properties()).ifPresent(properties::putAll);
+        break;
+      case 2:
+        String schemaName = nsId.levelAtListPos(1);
+        Schema schema = catalog.asSchemas().loadSchema(schemaName);
+        Optional.ofNullable(schema.properties()).ifPresent(properties::putAll);
+        break;
+      default:
+        throw new IllegalArgumentException(
+            "Expected at most 2-level and at least 1-level namespace but got: 
" + namespaceId);
+    }
+
+    DescribeNamespaceResponse response = new DescribeNamespaceResponse();
+    response.setProperties(properties);
+    return response;
   }
 
   @Override
   public CreateNamespaceResponse createNamespace(
-      String id,
+      String namespaceId,
       String delimiter,
       CreateNamespaceRequest.ModeEnum mode,
       Map<String, String> properties) {
-    throw new UnsupportedOperationException("Not implemented yet");
+    ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter);
+    Preconditions.checkArgument(
+        nsId.levels() <= 2 && nsId.levels() > 0,
+        "Expected at most 2-level and at least 1-level namespace but got: %s",
+        namespaceId);
+
+    switch (nsId.levels()) {
+      case 1:
+        return createOrUpdateCatalog(nsId.levelAtListPos(0), mode, properties);
+      case 2:
+        return createOrUpdateSchema(
+            nsId.levelAtListPos(0), nsId.levelAtListPos(1), mode, properties);
+      default:
+        throw new IllegalArgumentException(
+            "Expected at most 2-level and at least 1-level namespace but got: 
" + namespaceId);
+    }
   }
 
   @Override
   public DropNamespaceResponse dropNamespace(
-      String id,
+      String namespaceId,
       String delimiter,
       DropNamespaceRequest.ModeEnum mode,
       DropNamespaceRequest.BehaviorEnum behavior) {
-    throw new UnsupportedOperationException("Not implemented yet");
+    ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter);
+    Preconditions.checkArgument(
+        nsId.levels() <= 2 && nsId.levels() > 0,
+        "Expected at most 2-level and at least 1-level namespace but got: %s",
+        namespaceId);
+
+    switch (nsId.levels()) {
+      case 1:
+        return dropCatalog(nsId.levelAtListPos(0), mode, behavior);
+      case 2:
+        return dropSchema(nsId.levelAtListPos(0), nsId.levelAtListPos(1), 
mode, behavior);
+      default:
+        throw new IllegalArgumentException(
+            "Expected at most 2-level and at least 1-level namespace but got: 
" + namespaceId);
+    }
   }
 
   @Override
-  public void namespaceExists(String id, String delimiter) throws 
LanceNamespaceException {}
+  public void namespaceExists(String namespaceId, String delimiter) throws 
LanceNamespaceException {
+    ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter);
+    Preconditions.checkArgument(
+        nsId.levels() <= 2 && nsId.levels() > 0,
+        "Expected at most 2-level and at least 1-level namespace but got: %s",
+        namespaceId);
+
+    Catalog catalog = loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
+    if (nsId.levels() == 2) {
+      String schemaName = nsId.levelAtListPos(1);
+      if (!catalog.asSchemas().schemaExists(schemaName)) {
+        throw LanceNamespaceException.notFound(
+            "Schema not found: " + schemaName,
+            NoSuchSchemaException.class.getSimpleName(),
+            schemaName,
+            CommonUtil.formatCurrentStackTrace());
+      }
+    }
+  }
 
   private boolean isLakehouseCatalog(Catalog catalog) {
     return catalog.type().equals(Catalog.Type.RELATIONAL)
         && "generic-lakehouse".equals(catalog.provider());
   }
 
+  private Catalog loadAndValidateLakehouseCatalog(String catalogName) {
+    Catalog catalog;
+    try {
+      catalog = client.loadCatalog(catalogName);
+    } catch (NoSuchCatalogException e) {
+      throw LanceNamespaceException.notFound(
+          "Catalog not found: " + catalogName,
+          NoSuchCatalogException.class.getSimpleName(),
+          catalogName,
+          CommonUtil.formatCurrentStackTrace());
+    }
+    if (!isLakehouseCatalog(catalog)) {
+      throw LanceNamespaceException.notFound(
+          "Catalog is not a lakehouse catalog: " + catalogName,
+          NoSuchCatalogException.class.getSimpleName(),
+          catalogName,
+          CommonUtil.formatCurrentStackTrace());
+    }
+    return catalog;
+  }
+
+  private CreateNamespaceResponse createOrUpdateCatalog(
+      String catalogName, CreateNamespaceRequest.ModeEnum mode, Map<String, 
String> properties) {
+    CreateNamespaceResponse response = new CreateNamespaceResponse();
+
+    Catalog catalog;
+    try {
+      catalog = client.loadCatalog(catalogName);
+    } catch (NoSuchCatalogException e) {
+      // Catalog does not exist, create it
+      Catalog createdCatalog =
+          client.createCatalog(
+              catalogName,
+              Catalog.Type.RELATIONAL,
+              "generic-lakehouse",
+              "created by Lance REST server",
+              properties);
+      response.setProperties(
+          createdCatalog.properties() == null ? Maps.newHashMap() : 
createdCatalog.properties());
+      return response;
+    }
+
+    // Catalog exists, validate type
+    if (!isLakehouseCatalog(catalog)) {
+      throw LanceNamespaceException.conflict(
+          "Catalog already exists but is not a lakehouse catalog: " + 
catalogName,
+          CatalogAlreadyExistsException.class.getSimpleName(),
+          catalogName,
+          CommonUtil.formatCurrentStackTrace());
+    }
+
+    // Catalog exists, handle based on mode
+    switch (mode) {
+      case EXIST_OK:
+        response.setProperties(Maps.newHashMap());
+        return response;
+      case CREATE:
+        throw LanceNamespaceException.conflict(
+            "Catalog already exists: " + catalogName,
+            CatalogAlreadyExistsException.class.getSimpleName(),
+            catalogName,
+            CommonUtil.formatCurrentStackTrace());
+      case OVERWRITE:
+        CatalogChange[] changes =
+            buildChanges(
+                properties,
+                catalog.properties(),
+                CatalogChange::setProperty,
+                CatalogChange::removeProperty,
+                CatalogChange[]::new);
+        Catalog alteredCatalog = client.alterCatalog(catalogName, changes);
+        
Optional.ofNullable(alteredCatalog.properties()).ifPresent(response::setProperties);
+        return response;
+      default:
+        throw new IllegalArgumentException("Unknown mode: " + mode);
+    }
+  }
+
+  private CreateNamespaceResponse createOrUpdateSchema(
+      String catalogName,
+      String schemaName,
+      CreateNamespaceRequest.ModeEnum mode,
+      Map<String, String> properties) {
+    CreateNamespaceResponse response = new CreateNamespaceResponse();
+    Catalog loadedCatalog = loadAndValidateLakehouseCatalog(catalogName);
+
+    Schema schema;
+    try {
+      schema = loadedCatalog.asSchemas().loadSchema(schemaName);
+    } catch (NoSuchSchemaException e) {
+      // Schema does not exist, create it
+      Schema createdSchema = 
loadedCatalog.asSchemas().createSchema(schemaName, null, properties);
+      response.setProperties(
+          createdSchema.properties() == null ? Maps.newHashMap() : 
createdSchema.properties());
+      return response;
+    }
+
+    // Schema exists, handle based on mode
+    switch (mode) {
+      case EXIST_OK:
+        response.setProperties(Maps.newHashMap());
+        return response;
+      case CREATE:
+        throw LanceNamespaceException.conflict(
+            "Schema already exists: " + schemaName,
+            SchemaAlreadyExistsException.class.getSimpleName(),
+            schemaName,
+            CommonUtil.formatCurrentStackTrace());
+      case OVERWRITE:
+        SchemaChange[] changes =
+            buildChanges(
+                properties,
+                schema.properties(),
+                SchemaChange::setProperty,
+                SchemaChange::removeProperty,
+                SchemaChange[]::new);
+        Schema alteredSchema = 
loadedCatalog.asSchemas().alterSchema(schemaName, changes);
+        
Optional.ofNullable(alteredSchema.properties()).ifPresent(response::setProperties);
+        return response;
+      default:
+        throw new IllegalArgumentException("Unknown mode: " + mode);
+    }
+  }
+
+  private DropNamespaceResponse dropCatalog(
+      String catalogName,
+      DropNamespaceRequest.ModeEnum mode,
+      DropNamespaceRequest.BehaviorEnum behavior) {
+    try {
+      boolean dropped =
+          client.dropCatalog(catalogName, behavior == 
DropNamespaceRequest.BehaviorEnum.CASCADE);
+      if (dropped) {
+        return new DropNamespaceResponse();
+      } else {
+        // Catalog did not exist
+        if (mode == DropNamespaceRequest.ModeEnum.FAIL) {
+          throw LanceNamespaceException.notFound(
+              "Catalog not found: " + catalogName,
+              NoSuchCatalogException.class.getSimpleName(),
+              catalogName,
+              CommonUtil.formatCurrentStackTrace());
+        }
+        return new DropNamespaceResponse(); // SKIP mode
+      }
+    } catch (NonEmptyCatalogException e) {
+      throw LanceNamespaceException.badRequest(
+          String.format("Catalog %s is not empty.", catalogName),
+          NonEmptyCatalogException.class.getSimpleName(),
+          catalogName,
+          CommonUtil.formatCurrentStackTrace());
+    }
+  }
+
+  private DropNamespaceResponse dropSchema(
+      String catalogName,
+      String schemaName,
+      DropNamespaceRequest.ModeEnum mode,
+      DropNamespaceRequest.BehaviorEnum behavior) {
+    try {
+      boolean dropped =
+          client
+              .loadCatalog(catalogName)
+              .asSchemas()
+              .dropSchema(schemaName, behavior == 
DropNamespaceRequest.BehaviorEnum.CASCADE);
+      if (dropped) {
+        return new DropNamespaceResponse();
+      } else {
+        // Schema did not exist
+        if (mode == DropNamespaceRequest.ModeEnum.FAIL) {
+          throw LanceNamespaceException.notFound(
+              "Schema not found: " + schemaName,
+              NoSuchSchemaException.class.getSimpleName(),
+              schemaName,
+              CommonUtil.formatCurrentStackTrace());
+        }
+        return new DropNamespaceResponse(); // SKIP mode
+      }
+    } catch (NoSuchCatalogException e) {
+      throw LanceNamespaceException.notFound(
+          "Catalog not found: " + catalogName,
+          NoSuchCatalogException.class.getSimpleName(),
+          catalogName,
+          CommonUtil.formatCurrentStackTrace());
+    } catch (NonEmptySchemaException e) {
+      throw LanceNamespaceException.badRequest(
+          String.format("Schema %s is not empty.", schemaName),
+          NonEmptySchemaException.class.getSimpleName(),
+          schemaName,
+          CommonUtil.formatCurrentStackTrace());
+    }
+  }
+
+  private <T> T[] buildChanges(
+      Map<String, String> newProps,
+      Map<String, String> oldProps,
+      BiFunction<String, String, T> setPropertyFunc,
+      Function<String, T> removePropertyFunc,
+      IntFunction<T[]> arrayCreator) {
+    Stream<T> setPropertiesStream =
+        newProps.entrySet().stream()
+            .map(entry -> setPropertyFunc.apply(entry.getKey(), 
entry.getValue()));
+
+    Stream<T> removePropertiesStream =
+        oldProps == null
+            ? Stream.empty()
+            : oldProps.keySet().stream()
+                .filter(key -> !newProps.containsKey(key))
+                .map(removePropertyFunc);
+
+    return Stream.concat(setPropertiesStream, 
removePropertiesStream).toArray(arrayCreator);
+  }
+
   @Override
   public ListTablesResponse listTables(
       String id, String delimiter, String pageToken, Integer limit) {
-    throw new UnsupportedOperationException("Not implemented yet");
+    ObjectIdentifier nsId = ObjectIdentifier.of(id, Pattern.quote(delimiter));
+    Preconditions.checkArgument(
+        nsId.levels() <= 2, "Expected at most 2-level namespace but got: %s", 
nsId.levels());
+    String catalogName = nsId.levelAtListPos(0);
+    Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
+    String schemaName = nsId.levelAtListPos(1);
+    List<String> tables =
+        
Arrays.stream(catalog.asTableCatalog().listTables(Namespace.of(schemaName)))
+            .map(ident -> Joiner.on(delimiter).join(catalogName, schemaName, 
ident.name()))
+            .collect(Collectors.toList());
+
+    Collections.sort(tables);
+    PageUtil.Page page = PageUtil.splitPage(tables, pageToken, 
PageUtil.normalizePageSize(limit));
+    ListNamespacesResponse response = new ListNamespacesResponse();
+    response.setNamespaces(Sets.newHashSet(page.items()));
+    response.setPageToken(page.nextPageToken());
+
+    return new ListTablesResponse()
+        .tables(response.getNamespaces())
+        .pageToken(response.getPageToken());
+  }
+
+  @Override
+  public DescribeTableResponse describeTable(String tableId, String delimiter) 
{
+    ObjectIdentifier nsId = ObjectIdentifier.of(tableId, 
Pattern.quote(delimiter));
+    Preconditions.checkArgument(
+        nsId.levels() <= 3, "Expected at most 3-level namespace but got: %s", 
nsId.levels());
+
+    String catalogName = nsId.levelAtListPos(0);
+    Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
+    NameIdentifier tableIdentifier =
+        NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
+
+    Table table = catalog.asTableCatalog().loadTable(tableIdentifier);
+    DescribeTableResponse response = new DescribeTableResponse();
+    response.setProperties(table.properties());
+    response.setLocation(table.properties().get("location"));
+    response.setSchema(toJsonArrowSchema(table.columns()));
+    return response;
+  }
+
+  private JsonArrowSchema toJsonArrowSchema(Column[] columns) {
+    List<JsonArrowField> fields = new ArrayList<>();
+    for (Column column : columns) {
+      ArrowType arrowType = fromGravitinoType(column.dataType());
+      FieldType fieldType = new FieldType(column.nullable(), arrowType, null, 
null);
+      Field field = new Field(column.name(), fieldType, null);
+
+      JsonArrowDataType jsonArrowDataType = new JsonArrowDataType();
+      // other filed needs to be set accordingly such as list, map, struct
+      jsonArrowDataType.setType(arrowType.toString());
+
+      JsonArrowField arrowField = new JsonArrowField();
+      arrowField.setName(field.getName());
+      arrowField.setType(jsonArrowDataType);
+
+      fields.add(arrowField);
+    }
+
+    JsonArrowSchema jsonArrowSchema = new JsonArrowSchema();
+    jsonArrowSchema.setFields(fields);
+    return jsonArrowSchema;
+  }
+
+  @Override
+  public CreateTableResponse createTable(
+      String tableId,
+      String mode,
+      String delimiter,
+      String tableLocation,
+      Map<String, String> tableProperties,
+      String rootCatalog,
+      byte[] arrowStreamBody) {
+    ObjectIdentifier nsId = ObjectIdentifier.of(tableId, 
Pattern.quote(delimiter));
+    Preconditions.checkArgument(
+        nsId.levels() <= 3, "Expected at most 3-level namespace but got: %s", 
nsId.levels());
+    if (rootCatalog != null) {
+      List<String> levels = nsId.listStyleId();
+      List<String> newLevels = Lists.newArrayList(rootCatalog);
+      newLevels.addAll(levels);
+      nsId = ObjectIdentifier.of(newLevels);
+    }
+
+    // Parser column information.
+    List<Column> columns = Lists.newArrayList();
+    if (arrowStreamBody != null) {
+      ArrowRecordBatchList recordBatchList = 
parseArrowIpcStream(arrowStreamBody);
+      columns = extractColumns(recordBatchList);
+    }
+
+    String catalogName = nsId.levelAtListPos(0);
+    Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
+
+    NameIdentifier tableIdentifier =
+        NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
+
+    Map<String, String> createTableProperties = 
Maps.newHashMap(tableProperties);
+    createTableProperties.put("location", tableLocation);
+    createTableProperties.put("mode", mode);
+    // TODO considering the mode (create, exist_ok, overwrite)
+
+    ModeEnum createMode = ModeEnum.fromValue(mode.toLowerCase());
+    switch (createMode) {
+      case EXIST_OK:
+        if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
+          CreateTableResponse response = new CreateTableResponse();
+          Table existingTable = 
catalog.asTableCatalog().loadTable(tableIdentifier);
+          response.setProperties(existingTable.properties());
+          response.setLocation(existingTable.properties().get("location"));
+          response.setVersion(0L);
+          return response;
+        }
+        break;
+      case CREATE:
+        if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
+          throw LanceNamespaceException.conflict(
+              "Table already exists: " + tableId,
+              SchemaAlreadyExistsException.class.getSimpleName(),
+              tableId,
+              CommonUtil.formatCurrentStackTrace());
+        }
+        break;
+      case OVERWRITE:
+        if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
+          catalog.asTableCatalog().dropTable(tableIdentifier);
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown mode: " + mode);
+    }
+
+    Table t =
+        catalog
+            .asTableCatalog()
+            .createTable(
+                tableIdentifier,
+                columns.toArray(new Column[0]),
+                tableLocation,
+                createTableProperties);
+
+    CreateTableResponse response = new CreateTableResponse();
+    response.setProperties(t.properties());
+    response.setLocation(tableLocation);
+    response.setVersion(0L);
+    return response;
+  }
+
+  private ArrowRecordBatchList parseArrowIpcStream(byte[] stream) {
+    try (BufferAllocator allocator = new RootAllocator();
+        ByteArrayInputStream bais = new ByteArrayInputStream(stream);
+        ArrowStreamReader reader = new ArrowStreamReader(bais, allocator)) {
+
+      org.apache.arrow.vector.types.pojo.Schema schema = 
reader.getVectorSchemaRoot().getSchema();
+      List<VectorSchemaRoot> batches = new ArrayList<>();
+
+      while (reader.loadNextBatch()) {
+        VectorSchemaRoot root = reader.getVectorSchemaRoot();
+        if (root.getRowCount() > 0) {
+          batches.add(root);
+        }
+      }
+      return new ArrowRecordBatchList(schema, batches);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to parse Arrow IPC stream", e);
+    }
+  }
+
+  private List<Column> extractColumns(ArrowRecordBatchList recordBatchList) {
+    List<Column> columns = new ArrayList<>();
+    org.apache.arrow.vector.types.pojo.Schema arrowSchema = 
recordBatchList.getSchema();
+
+    for (org.apache.arrow.vector.types.pojo.Field field : 
arrowSchema.getFields()) {
+      columns.add(toGravitinoColumn(field));
+    }
+    return columns;
+  }
+
+  private Column toGravitinoColumn(Field field) {
+    return Column.of(
+        field.getName(),
+        toGravitinoType(field),
+        field.getMetadata().get("comment"),
+        field.isNullable(),
+        false,
+        DEFAULT_VALUE_NOT_SET);
+  }
+
+  private ArrowType fromGravitinoType(Type type) {
+    switch (type.name()) {
+      case BOOLEAN:
+        return Bool.INSTANCE;
+      case BYTE:
+        return new Int(8, true);
+      case SHORT:
+        return new Int(16, true);
+      case INTEGER:
+        return new Int(32, true);
+      case LONG:
+        return new Int(64, true);
+      case FLOAT:
+        return new FloatingPoint(FloatingPointPrecision.SINGLE);
+      case DOUBLE:
+        return new FloatingPoint(FloatingPointPrecision.DOUBLE);
+      case DECIMAL:
+        // Lance uses FIXED_SIZE_BINARY for decimal types
+        return new ArrowType.FixedSizeBinary(16); // assuming 16 bytes for 
decimal
+      case DATE:
+        return new ArrowType.Date(DateUnit.DAY);
+      case TIME:
+        return new ArrowType.Time(TimeUnit.MILLISECOND, 32);
+      case TIMESTAMP:
+        return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
+      case VARCHAR:
+      case STRING:
+        return new ArrowType.Utf8();
+      case FIXED:
+        FixedType fixedType = (FixedType) type;
+        return new ArrowType.FixedSizeBinary(fixedType.length());
+      case BINARY:
+        return new ArrowType.Binary();
+      case UNPARSED:
+        String typeStr = ((UnparsedType) type).unparsedType().toString();
+        try {
+          Type t = JsonUtils.anyFieldMapper().readValue(typeStr, Type.class);
+          if (t instanceof Types.ListType) {
+            return ArrowType.List.INSTANCE;
+          } else if (t instanceof Types.MapType) {
+            return new ArrowType.Map(false);
+          } else if (t instanceof Types.StructType) {
+            return ArrowType.Struct.INSTANCE;
+          } else {
+            throw new UnsupportedOperationException(
+                "Unsupported UnparsedType conversion: " + t.simpleString());
+          }
+        } catch (Exception e) {
+          // FixedSizeListArray(integer, 3)
+          if (typeStr.startsWith("FixedSizeListArray")) {
+            int size =
+                Integer.parseInt(
+                    typeStr.substring(typeStr.indexOf(',') + 1, 
typeStr.indexOf(')')).trim());
+            return new ArrowType.FixedSizeList(size);
+          }
+          throw new UnsupportedOperationException("Failed to parse 
UnparsedType: " + typeStr, e);
+        }
+      default:
+        throw new UnsupportedOperationException("Unsupported Gravitino type: " 
+ type.name());
+    }
+  }
+
+  private Type toGravitinoType(Field field) {
+    FieldType parentType = field.getFieldType();
+    ArrowType arrowType = parentType.getType();
+    if (arrowType instanceof Bool) {
+      return Types.BooleanType.get();
+    } else if (arrowType instanceof Int) {
+      Int intType = (Int) arrowType;
+      switch (intType.getBitWidth()) {
+        case 8 -> {
+          return Types.ByteType.get();
+        }
+        case 16 -> {
+          return Types.ShortType.get();
+        }
+        case 32 -> {
+          return Types.IntegerType.get();
+        }
+        case 64 -> {
+          return Types.LongType.get();
+        }
+        default -> throw new UnsupportedOperationException(
+            "Unsupported Int bit width: " + intType.getBitWidth());
+      }
+    } else if (arrowType instanceof FloatingPoint) {
+      FloatingPoint floatingPoint = (FloatingPoint) arrowType;
+      switch (floatingPoint.getPrecision()) {
+        case SINGLE:
+          return Types.FloatType.get();
+        case DOUBLE:
+          return Types.DoubleType.get();
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported FloatingPoint precision: " + 
floatingPoint.getPrecision());
+      }
+    } else if (arrowType instanceof ArrowType.FixedSizeBinary) {
+      ArrowType.FixedSizeBinary fixedSizeBinary = (ArrowType.FixedSizeBinary) 
arrowType;
+      return Types.FixedType.of(fixedSizeBinary.getByteWidth());
+    } else if (arrowType instanceof ArrowType.Date) {
+      return Types.DateType.get();
+    } else if (arrowType instanceof ArrowType.Time) {
+      return Types.TimeType.get();
+    } else if (arrowType instanceof ArrowType.Timestamp) {
+      return Types.TimestampType.withoutTimeZone();
+    } else if (arrowType instanceof ArrowType.Utf8) {
+      return Types.StringType.get();
+    } else if (arrowType instanceof ArrowType.Binary) {
+      return Types.BinaryType.get();
+    } else {
+      return Types.UnparsedType.of(arrowType.toString());
+    }
   }
 }
diff --git 
a/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/TestArrowIPC.java
 
b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/TestArrowIPC.java
new file mode 100644
index 0000000000..71f1bfc587
--- /dev/null
+++ 
b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/TestArrowIPC.java
@@ -0,0 +1,83 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lance.common;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.Arrays;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Text;
+import org.junit.jupiter.api.Test;
+
+public class TestArrowIPC {
+
+  private static final String FILENAME = "/tmp/initial_data.arrow";
+  private static final int RECORD_COUNT = 3;
+
+  @Test
+  void testIPC() throws Exception {
+    try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+
+      Schema schema =
+          new Schema(
+              Arrays.asList(
+                  Field.nullable("id", new ArrowType.Int(32, true)),
+                  Field.nullable("value", new ArrowType.Utf8())));
+
+      try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) 
{
+        IntVector idVector = (IntVector) root.getVector("id");
+        VarCharVector valueVector = (VarCharVector) root.getVector("value");
+
+        idVector.allocateNew();
+        valueVector.allocateNew();
+
+        for (int i = 0; i < RECORD_COUNT; i++) {
+          idVector.setSafe(i, i + 1);
+          valueVector.setSafe(i, new Text("Row_" + (i + 1)));
+        }
+
+        idVector.setValueCount(RECORD_COUNT);
+        valueVector.setValueCount(RECORD_COUNT);
+        root.setRowCount(RECORD_COUNT);
+
+        File outFile = new File(FILENAME);
+        try (FileOutputStream fos = new FileOutputStream(outFile);
+            ArrowStreamWriter writer = new ArrowStreamWriter(root, null, fos)) 
{
+
+          writer.start();
+          writer.writeBatch();
+          writer.end();
+        }
+
+        System.out.println(
+            "✅ Successfully generated Arrow IPC Stream file: " + 
outFile.getAbsolutePath());
+        System.out.println("--- Ready for cURL test ---");
+      }
+    }
+  }
+}
diff --git 
a/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/config/TestLanceConfig.java
 
b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/config/TestLanceConfig.java
index 176634f309..6544ca7a3a 100644
--- 
a/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/config/TestLanceConfig.java
+++ 
b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/config/TestLanceConfig.java
@@ -26,27 +26,6 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 public class TestLanceConfig {
-  @Test
-  public void testLoadLanceConfig() {
-    Map<String, String> properties =
-        ImmutableMap.of("gravitino.lance-rest.catalog-name", "test_catalog");
-
-    LanceConfig lanceConfig = new LanceConfig();
-    lanceConfig.loadFromMap(properties, k -> 
k.startsWith("gravitino.lance-rest."));
-    Assertions.assertEquals("test_catalog", lanceConfig.getCatalogName());
-
-    LanceConfig lanceConfig2 = new LanceConfig(properties);
-    Assertions.assertEquals("test_catalog", lanceConfig2.getCatalogName());
-  }
-
-  @Test
-  public void testDefaultCatalogName() {
-    // Test default catalog name when not specified
-    Map<String, String> properties = ImmutableMap.of();
-    LanceConfig lanceConfig = new LanceConfig(properties);
-    Assertions.assertEquals("default", lanceConfig.getCatalogName());
-  }
-
   @Test
   public void testLanceHttpPort() {
     Map<String, String> properties = ImmutableMap.of();
@@ -74,18 +53,18 @@ public class TestLanceConfig {
     // Test default values
     Map<String, String> properties = ImmutableMap.of();
     LanceConfig lanceConfig = new LanceConfig(properties);
-    Assertions.assertEquals("http://localhost:8090";, 
lanceConfig.getNamespaceUri());
+    Assertions.assertEquals("http://localhost:8090";, 
lanceConfig.getNamespaceBackendUri());
     Assertions.assertNull(lanceConfig.getGravitinoMetalake()); // No default, 
must be configured
 
     // Test custom values
     properties =
         ImmutableMap.of(
-            LanceConfig.NAMESPACE_URI.getKey(),
+            LanceConfig.NAMESPACE_BACKEND_URI.getKey(),
             "http://gravitino-server:8090";,
             LanceConfig.METALAKE_NAME.getKey(),
             "production");
     lanceConfig = new LanceConfig(properties);
-    Assertions.assertEquals("http://gravitino-server:8090";, 
lanceConfig.getNamespaceUri());
+    Assertions.assertEquals("http://gravitino-server:8090";, 
lanceConfig.getNamespaceBackendUri());
     Assertions.assertEquals("production", lanceConfig.getGravitinoMetalake());
   }
 
@@ -94,8 +73,7 @@ public class TestLanceConfig {
     // Test all configurations together for auxiliary mode
     Map<String, String> properties =
         ImmutableMap.<String, String>builder()
-            .put(LanceConfig.CATALOG_NAME.getKey(), "lance_catalog")
-            .put(LanceConfig.NAMESPACE_URI.getKey(), 
"http://gravitino-prod:8090";)
+            .put(LanceConfig.NAMESPACE_BACKEND_URI.getKey(), 
"http://gravitino-prod:8090";)
             .put(LanceConfig.METALAKE_NAME.getKey(), "production")
             .put(LanceConfig.NAMESPACE_BACKEND.getKey(), "gravitino")
             .put(JettyServerConfig.WEBSERVER_HTTP_PORT.getKey(), "9101")
@@ -104,8 +82,7 @@ public class TestLanceConfig {
     LanceConfig lanceConfig = new LanceConfig(properties);
 
     // Verify all config values
-    Assertions.assertEquals("lance_catalog", lanceConfig.getCatalogName());
-    Assertions.assertEquals("http://gravitino-prod:8090";, 
lanceConfig.getNamespaceUri());
+    Assertions.assertEquals("http://gravitino-prod:8090";, 
lanceConfig.getNamespaceBackendUri());
     Assertions.assertEquals("production", lanceConfig.getGravitinoMetalake());
 
     JettyServerConfig jettyConfig = JettyServerConfig.fromConfig(lanceConfig);
diff --git a/lance/lance-rest-server/build.gradle.kts 
b/lance/lance-rest-server/build.gradle.kts
index 6709b51194..6beb4d1edd 100644
--- a/lance/lance-rest-server/build.gradle.kts
+++ b/lance/lance-rest-server/build.gradle.kts
@@ -26,7 +26,6 @@ plugins {
 
 dependencies {
   implementation(project(":api"))
-  implementation(project(":catalogs:catalog-common"))
   implementation(project(":common")) {
     exclude("*")
   }
@@ -38,6 +37,8 @@ dependencies {
   }
 
   implementation(project(":lance:lance-common"))
+  implementation(libs.lance)
+  implementation(libs.commons.lang3)
 
   implementation(libs.bundles.jetty)
   implementation(libs.bundles.jersey)
diff --git 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/LanceRESTService.java
 
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/LanceRESTService.java
index d1409c8e12..8c800e49d6 100644
--- 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/LanceRESTService.java
+++ 
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/LanceRESTService.java
@@ -48,6 +48,8 @@ public class LanceRESTService implements 
GravitinoAuxiliaryService {
   public static final String SERVICE_NAME = "lance-rest";
   public static final String LANCE_SPEC = "/lance/*";
 
+  private static final String LANCE_REST_SPEC_PACKAGE = 
"org.apache.gravitino.lance.service.rest";
+
   private JettyServer server;
   private NamespaceWrapper lanceNamespace;
 
@@ -70,7 +72,7 @@ public class LanceRESTService implements 
GravitinoAuxiliaryService {
 
     ResourceConfig resourceConfig = new ResourceConfig();
     resourceConfig.register(JacksonFeature.class);
-    resourceConfig.packages("org.apache.gravitino.lance.service.rest");
+    resourceConfig.packages(LANCE_REST_SPEC_PACKAGE);
     resourceConfig.register(
         new AbstractBinder() {
           @Override
@@ -90,7 +92,7 @@ public class LanceRESTService implements 
GravitinoAuxiliaryService {
     server.addCustomFilters(LANCE_SPEC);
     server.addSystemFilters(LANCE_SPEC);
 
-    LOG.info("Initialized Lance REST service for catalog {}", 
lanceConfig.getCatalogName());
+    LOG.info("Initialized Lance REST service for backend {}", 
lanceConfig.getNamespaceBackend());
   }
 
   @Override
diff --git 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
 
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
index 2d07357f30..493d0acace 100644
--- 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
+++ 
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
@@ -22,12 +22,20 @@ import static 
org.apache.gravitino.lance.common.ops.NamespaceWrapper.NAMESPACE_D
 
 import com.codahale.metrics.annotation.ResponseMetered;
 import com.codahale.metrics.annotation.Timed;
+import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
+import com.lancedb.lance.namespace.model.CreateNamespaceResponse;
+import com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
+import com.lancedb.lance.namespace.model.DropNamespaceRequest;
+import com.lancedb.lance.namespace.model.DropNamespaceResponse;
 import com.lancedb.lance.namespace.model.ListNamespacesResponse;
+import com.lancedb.lance.namespace.model.ListTablesResponse;
+import java.util.regex.Pattern;
 import javax.inject.Inject;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.Encoded;
 import javax.ws.rs.GET;
+import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
@@ -61,7 +69,108 @@ public class LanceNamespaceOperations {
       @QueryParam("limit") Integer limit) {
     try {
       ListNamespacesResponse response =
-          lanceNamespace.asNamespaceOps().listNamespaces(namespaceId, 
delimiter, pageToken, limit);
+          lanceNamespace
+              .asNamespaceOps()
+              .listNamespaces(namespaceId, Pattern.quote(delimiter), 
pageToken, limit);
+      return Response.ok(response).build();
+    } catch (Exception e) {
+      return LanceExceptionMapper.toRESTResponse(namespaceId, e);
+    }
+  }
+
+  @POST
+  @Path("/{id}/describe")
+  @Timed(name = "describe-namespaces." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
+  @ResponseMetered(name = "describe-namespaces", absolute = true)
+  public Response describeNamespace(
+      @Encoded @PathParam("id") String namespaceId,
+      @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) @QueryParam("delimiter") 
String delimiter) {
+    try {
+      DescribeNamespaceResponse response =
+          lanceNamespace.asNamespaceOps().describeNamespace(namespaceId, 
Pattern.quote(delimiter));
+      return Response.ok(response).build();
+    } catch (Exception e) {
+      return LanceExceptionMapper.toRESTResponse(namespaceId, e);
+    }
+  }
+
+  @POST
+  @Path("/{id}/create")
+  @Timed(name = "create-namespaces." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
+  @ResponseMetered(name = "create-namespaces", absolute = true)
+  public Response createNamespace(
+      @Encoded @PathParam("id") String namespaceId,
+      @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) @QueryParam("delimiter") 
String delimiter,
+      CreateNamespaceRequest request) {
+    try {
+      CreateNamespaceResponse response =
+          lanceNamespace
+              .asNamespaceOps()
+              .createNamespace(
+                  namespaceId,
+                  Pattern.quote(delimiter),
+                  request.getMode() == null
+                      ? CreateNamespaceRequest.ModeEnum.CREATE
+                      : request.getMode(),
+                  request.getProperties());
+      return Response.ok(response).build();
+    } catch (Exception e) {
+      return LanceExceptionMapper.toRESTResponse(namespaceId, e);
+    }
+  }
+
+  @POST
+  @Path("/{id}/drop")
+  @Timed(name = "drop-namespaces." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
+  @ResponseMetered(name = "drop-namespaces", absolute = true)
+  public Response dropNamespace(
+      @Encoded @PathParam("id") String namespaceId,
+      @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) @QueryParam("delimiter") 
String delimiter,
+      DropNamespaceRequest request) {
+    try {
+      DropNamespaceResponse response =
+          lanceNamespace
+              .asNamespaceOps()
+              .dropNamespace(
+                  namespaceId,
+                  Pattern.quote(delimiter),
+                  request.getMode() == null
+                      ? DropNamespaceRequest.ModeEnum.FAIL
+                      : request.getMode(),
+                  request.getBehavior() == null
+                      ? DropNamespaceRequest.BehaviorEnum.RESTRICT
+                      : request.getBehavior());
+      return Response.ok(response).build();
+    } catch (Exception e) {
+      return LanceExceptionMapper.toRESTResponse(namespaceId, e);
+    }
+  }
+
+  @POST
+  @Path("/{id}/exists")
+  @Timed(name = "namespace-exists." + MetricNames.HTTP_PROCESS_DURATION, 
absolute = true)
+  @ResponseMetered(name = "namespace-exists", absolute = true)
+  public Response namespaceExists(
+      @Encoded @PathParam("id") String namespaceId,
+      @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) @QueryParam("delimiter") 
String delimiter) {
+    try {
+      lanceNamespace.asNamespaceOps().namespaceExists(namespaceId, 
Pattern.quote(delimiter));
+      return Response.ok().build();
+    } catch (Exception e) {
+      return LanceExceptionMapper.toRESTResponse(namespaceId, e);
+    }
+  }
+
+  @GET
+  @Path("{id}/table/list")
+  public Response listTables(
+      @PathParam("id") String namespaceId,
+      @DefaultValue("$") @QueryParam("delimiter") String delimiter,
+      @QueryParam("page_token") String pageToken,
+      @QueryParam("limit") Integer limit) {
+    try {
+      ListTablesResponse response =
+          lanceNamespace.asNamespaceOps().listTables(namespaceId, delimiter, 
pageToken, limit);
       return Response.ok(response).build();
     } catch (Exception e) {
       return LanceExceptionMapper.toRESTResponse(namespaceId, e);
diff --git 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
 
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
index 1f30d1b326..359fc94c42 100644
--- 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
+++ 
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
@@ -18,27 +18,27 @@
  */
 package org.apache.gravitino.lance.service.rest;
 
-import static 
org.apache.gravitino.lance.common.ops.NamespaceWrapper.NAMESPACE_DELIMITER_DEFAULT;
-
-import com.codahale.metrics.annotation.ResponseMetered;
-import com.codahale.metrics.annotation.Timed;
-import com.lancedb.lance.namespace.model.ListTablesResponse;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.lancedb.lance.namespace.model.CreateTableResponse;
+import com.lancedb.lance.namespace.model.DescribeTableResponse;
+import com.lancedb.lance.namespace.util.JsonUtil;
+import java.util.Map;
 import javax.inject.Inject;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DefaultValue;
-import javax.ws.rs.Encoded;
-import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
+import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
 import org.apache.gravitino.lance.service.LanceExceptionMapper;
-import org.apache.gravitino.metrics.MetricNames;
 
-@Path("/v1/namespace/{id}/table")
+@Path("/v1/table/{id}")
 @Consumes(MediaType.APPLICATION_JSON)
 @Produces(MediaType.APPLICATION_JSON)
 public class LanceTableOperations {
@@ -50,21 +50,68 @@ public class LanceTableOperations {
     this.lanceNamespace = lanceNamespace;
   }
 
-  @GET
-  @Path("/list")
-  @Timed(name = "list-tables." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
-  @ResponseMetered(name = "list-tables", absolute = true)
-  public Response listTables(
-      @Encoded @PathParam("id") String namespaceId,
-      @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) @QueryParam("delimiter") 
String delimiter,
-      @QueryParam("page_token") String pageToken,
-      @QueryParam("limit") Integer limit) {
+  @POST
+  @Path("/describe")
+  public Response describeTable(
+      @PathParam("id") String tableId,
+      @DefaultValue("$") @QueryParam("delimiter") String delimiter) {
+    try {
+      DescribeTableResponse response =
+          lanceNamespace.asTableOps().describeTable(tableId, delimiter);
+      return Response.ok(response).build();
+    } catch (Exception e) {
+      return LanceExceptionMapper.toRESTResponse(tableId, e);
+    }
+  }
+
+  @POST
+  @Path("/create")
+  @Consumes("application/vnd.apache.arrow.stream")
+  @Produces("application/json")
+  public Response createTable(
+      @PathParam("id") String tableId,
+      @QueryParam("mode") @DefaultValue("create") String mode, // create, 
exist_ok, overwrite
+      @QueryParam("delimiter") @DefaultValue("$") String delimiter,
+      @HeaderParam("x-lance-table-location") String tableLocation,
+      @HeaderParam("x-lance-table-properties") String tableProperties,
+      @HeaderParam("x-lance-root-catalog") String rootCatalog,
+      byte[] arrowStreamBody) {
+    try {
+      Map<String, String> props =
+          JsonUtil.mapper().readValue(tableProperties, new 
TypeReference<Map<String, String>>() {});
+      CreateTableResponse response =
+          lanceNamespace
+              .asTableOps()
+              .createTable(
+                  tableId, mode, delimiter, tableLocation, props, rootCatalog, 
arrowStreamBody);
+      return Response.ok(response).build();
+    } catch (Exception e) {
+      return LanceExceptionMapper.toRESTResponse(tableId, e);
+    }
+  }
+
+  @POST
+  @Path("/create-empty")
+  public Response createEmptyTable(
+      @PathParam("id") String tableId,
+      @QueryParam("mode") @DefaultValue("create") String mode, // create, 
exist_ok, overwrite
+      @QueryParam("delimiter") @DefaultValue("$") String delimiter,
+      @HeaderParam("x-lance-table-location") String tableLocation,
+      @HeaderParam("x-lance-root-catalog") String rootCatalog,
+      @HeaderParam("x-lance-table-properties") String tableProperties) {
     try {
-      ListTablesResponse response =
-          lanceNamespace.asTableOps().listTables(namespaceId, delimiter, 
pageToken, limit);
+      Map<String, String> props =
+          StringUtils.isBlank(tableProperties)
+              ? Map.of()
+              : JsonUtil.mapper()
+                  .readValue(tableProperties, new TypeReference<Map<String, 
String>>() {});
+      CreateTableResponse response =
+          lanceNamespace
+              .asTableOps()
+              .createTable(tableId, mode, delimiter, tableLocation, props, 
rootCatalog, null);
       return Response.ok(response).build();
     } catch (Exception e) {
-      return LanceExceptionMapper.toRESTResponse(namespaceId, e);
+      return LanceExceptionMapper.toRESTResponse(tableId, e);
     }
   }
 }
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/TestServerConfig.java 
b/server-common/src/test/java/org/apache/gravitino/server/TestServerConfig.java
index fc9193e012..e46e27b807 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/TestServerConfig.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/TestServerConfig.java
@@ -67,7 +67,8 @@ public class TestServerConfig {
     for (Map.Entry<Object, Object> entry : properties.entrySet()) {
       String propKey = (String) entry.getKey();
       if 
(propKey.startsWith(AuxiliaryServiceManager.GRAVITINO_AUX_SERVICE_PREFIX)
-          || propKey.startsWith("gravitino.iceberg-rest.")) {
+          || propKey.startsWith("gravitino.iceberg-rest.")
+          || propKey.startsWith("gravitino.lance-rest.")) {
         continue;
       }
       Assertions.assertTrue(
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/jdbc/mysql/MySQLMetadataAdapter.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/jdbc/mysql/MySQLMetadataAdapter.java
index 1c8d051cd5..d77fd62016 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/jdbc/mysql/MySQLMetadataAdapter.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/jdbc/mysql/MySQLMetadataAdapter.java
@@ -283,6 +283,8 @@ public class MySQLMetadataAdapter extends 
CatalogConnectorMetadataAdapter {
                     .collect(Collectors.toUnmodifiableList());
             uniqueKeys.add(String.format("%s:%s", index.name(), 
Strings.join(columns, ',')));
             break;
+          default:
+            throw new UnsupportedOperationException("Unsupported index type: " 
+ index.type());
         }
       }
       if (!primaryKeys.isEmpty()) {

Reply via email to