This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-lance-namepspace-dev
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-lance-namepspace-dev by 
this push:
     new 96a1caa72c [#8893] feat(lance-rest-server): Support create/load table 
operations for lance (#8911)
96a1caa72c is described below

commit 96a1caa72c4e596eddf12509a8cec7b343f23708
Author: Mini Yu <[email protected]>
AuthorDate: Sat Oct 25 09:14:14 2025 +0800

    [#8893] feat(lance-rest-server): Support create/load table operations for 
lance (#8911)
    
    ### What changes were proposed in this pull request?
    
    1. Support alter lance table
    2. Support API for table operations in Lance REST server.
    
    ### Why are the changes needed?
    
    It's a feature.
    
    Fix: #8893
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A.
    
    ### How was this patch tested?
    
    Test locally with Spark and curl
    
    ---------
    
    Co-authored-by: mchades <[email protected]>
---
 .../org/apache/gravitino/rel/indexes/Index.java    |  45 +++
 .../GenericLakehouseCatalogOperations.java         |  20 +-
 .../lakehouse/lance/LanceCatalogOperations.java    | 120 ++++++-
 .../lakehouse/lance/LanceDataTypeConverter.java    |  69 +++++
 .../gravitino/client/TestRelationalCatalog.java    |  10 -
 .../org/apache/gravitino/dto/rel/TableDTO.java     |   2 -
 .../gravitino/dto/requests/TableCreateRequest.java |   3 -
 .../gravitino/dto/responses/TableResponse.java     |   3 -
 .../java/org/apache/gravitino/json/JsonUtils.java  |  22 +-
 .../org/apache/gravitino/dto/rel/TestTableDTO.java |  13 +-
 .../catalog/TableOperationDispatcher.java          |  51 +++
 .../lance/common/ops/LanceNamespaceOperations.java |   3 +
 .../lance/common/ops/LanceTableOperations.java     |  15 +-
 .../common/ops/arrow/ArrowRecordBatchList.java     |  40 +++
 .../gravitino/GravitinoLanceNamespaceWrapper.java  | 344 ++++++++++++++++++++-
 .../gravitino/lance/common/TestArrowIPC.java       |  83 +++++
 lance/lance-rest-server/build.gradle.kts           |   2 +
 .../service/rest/LanceNamespaceOperations.java     |  17 +
 .../lance/service/rest/LanceTableOperations.java   |  89 ++++--
 .../catalog/jdbc/mysql/MySQLMetadataAdapter.java   |   2 +
 20 files changed, 882 insertions(+), 71 deletions(-)

diff --git a/api/src/main/java/org/apache/gravitino/rel/indexes/Index.java 
b/api/src/main/java/org/apache/gravitino/rel/indexes/Index.java
index 4b92977fd5..fd3beb0e67 100644
--- a/api/src/main/java/org/apache/gravitino/rel/indexes/Index.java
+++ b/api/src/main/java/org/apache/gravitino/rel/indexes/Index.java
@@ -67,5 +67,50 @@ public interface Index {
      * UNIQUE KEY helps in avoiding redundancy and ensuring data accuracy in 
the database.
      */
     UNIQUE_KEY,
+
+    // The following index types are specific to Lance, for more, please see: 
IndexType in LanceDB
+    /**
+     * SCALAR index is used to optimize searches on scalar data types such as 
integers, floats,
+     * strings, etc. Currently, this type is only applicable to Lance.
+     */
+    SCALAR,
+    /**
+     * BTREE index is a balanced tree data structure that maintains sorted 
data and allows for
+     * logarithmic time complexity for search, insert, and delete operations. 
Currently, this type
+     * is only applicable to Lance.
+     */
+    BTREE,
+    /**
+     * Bitmap index is a type of database index that uses bit arrays (bitmaps) 
to represent the
+     * presence or absence of values in a column, enabling efficient querying 
and data retrieval.
+     * Currently, this type is only applicable to Lance.
+     */
+    BITMAP,
+    /**
+     * LABEL_LIST index is used to optimize searches on columns containing 
lists of labels or tags.
+     * Currently, this type is only applicable to Lance.
+     */
+    LABEL_LIST,
+    /**
+     * INVERTED index is a data structure used to optimize full-text searches 
by mapping terms to
+     * their locations within a dataset, allowing for quick retrieval of 
documents containing
+     * specific words or phrases. Currently, this type is only applicable to 
Lance.
+     */
+    INVERTED,
+    /**
+     * VECTOR index is used to optimize similarity searches in 
high-dimensional vector spaces.
+     * Currently, this type is only applicable to Lance.
+     */
+    VECTOR,
+    /** IVF_FLAT (Inverted File with Flat quantization) is an indexing method 
used for efficient */
+    IVF_FLAT,
+    /** IVF_SQ (Inverted File with Scalar Quantization) is an indexing method 
used for efficient */
+    IVF_SQ,
+    /** IVF_PQ (Inverted File with Product Quantization) is an indexing method 
used for efficient */
+    IVF_PQ,
+    /** IVF_HNSW_FLAT */
+    IVF_HNSW_SQ,
+    /** IVF_HNSW_PQ */
+    IVF_HNSW_PQ;
   }
 }
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
index acac35528e..0f85532e8c 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
@@ -258,7 +258,25 @@ public class GenericLakehouseCatalogOperations
   @Override
   public Table alterTable(NameIdentifier ident, TableChange... changes)
       throws NoSuchTableException, IllegalArgumentException {
-    throw new UnsupportedOperationException("Not implemented yet.");
+    EntityStore store = GravitinoEnv.getInstance().entityStore();
+    Namespace namespace = ident.namespace();
+    try {
+      GenericTableEntity tableEntity =
+          store.get(ident, Entity.EntityType.TABLE, GenericTableEntity.class);
+      Map<String, String> tableProperties = tableEntity.getProperties();
+      String format = tableProperties.getOrDefault("format", "lance");
+      LakehouseCatalogOperations lakehouseCatalogOperations =
+          SUPPORTED_FORMATS.compute(
+              format,
+              (k, v) ->
+                  v == null
+                      ? createLakehouseCatalogOperations(
+                          format, tableProperties, catalogInfo, 
propertiesMetadata)
+                      : v);
+      return lakehouseCatalogOperations.alterTable(ident, changes);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to list tables under schema " + 
namespace, e);
+    }
   }
 
   @Override
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
index 3e1146b7ad..342826a882 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
@@ -20,34 +20,49 @@
 package org.apache.gravitino.catalog.lakehouse.lance;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 import com.lancedb.lance.Dataset;
 import com.lancedb.lance.WriteParams;
+import com.lancedb.lance.index.DistanceType;
+import com.lancedb.lance.index.IndexParams;
+import com.lancedb.lance.index.IndexType;
+import com.lancedb.lance.index.vector.VectorIndexParams;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
 import org.apache.gravitino.catalog.lakehouse.LakehouseCatalogOperations;
 import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.gravitino.connector.GenericLakehouseTable;
 import org.apache.gravitino.connector.HasPropertyMetadata;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NoSuchTableException;
 import org.apache.gravitino.exceptions.TableAlreadyExistsException;
 import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GenericTableEntity;
 import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.GenericTable;
 import org.apache.gravitino.rel.Table;
 import org.apache.gravitino.rel.TableChange;
 import org.apache.gravitino.rel.expressions.distributions.Distribution;
 import org.apache.gravitino.rel.expressions.sorts.SortOrder;
 import org.apache.gravitino.rel.expressions.transforms.Transform;
 import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes.IndexImpl;
 import org.apache.gravitino.utils.PrincipalUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -133,20 +148,31 @@ public class LanceCatalogOperations implements 
LakehouseCatalogOperations {
             .map(
                 col -> {
                   boolean nullable = col.nullable();
+                  ArrowType parentType = 
converter.fromGravitino(col.dataType());
+                  List<ArrowType> childTypes = 
converter.getChildTypes(col.dataType());
+                  List<Field> childFields =
+                      childTypes.stream()
+                          .map(
+                              childType ->
+                                  new org.apache.arrow.vector.types.pojo.Field(
+                                      "",
+                                      
org.apache.arrow.vector.types.pojo.FieldType.nullable(
+                                          childType),
+                                      null))
+                          .collect(Collectors.toList());
+
                   if (nullable) {
                     return new org.apache.arrow.vector.types.pojo.Field(
                         col.name(),
-                        org.apache.arrow.vector.types.pojo.FieldType.nullable(
-                            converter.fromGravitino(col.dataType())),
-                        null);
+                        
org.apache.arrow.vector.types.pojo.FieldType.nullable(parentType),
+                        childFields);
                   }
 
                   // not nullable
                   return new org.apache.arrow.vector.types.pojo.Field(
                       col.name(),
-                      org.apache.arrow.vector.types.pojo.FieldType.notNullable(
-                          converter.fromGravitino(col.dataType())),
-                      null);
+                      
org.apache.arrow.vector.types.pojo.FieldType.notNullable(parentType),
+                      childFields);
                 })
             .collect(Collectors.toList());
     return new org.apache.arrow.vector.types.pojo.Schema(fields);
@@ -155,8 +181,86 @@ public class LanceCatalogOperations implements 
LakehouseCatalogOperations {
   @Override
   public Table alterTable(NameIdentifier ident, TableChange... changes)
       throws NoSuchTableException, IllegalArgumentException {
-    // Use another PRs to implement alter table for Lance tables
-    return null;
+    // Lance only supports adding indexes for now.
+    List<Index> addedIndexes = Lists.newArrayList();
+
+    for (TableChange change : changes) {
+      if (change instanceof TableChange.AddIndex addIndexChange) {
+        Index index =
+            IndexImpl.builder()
+                .withIndexType(addIndexChange.getType())
+                .withName(addIndexChange.getName())
+                .withFieldNames(addIndexChange.getFieldNames())
+                .build();
+        addedIndexes.add(index);
+      }
+    }
+
+    EntityStore entityStore = GravitinoEnv.getInstance().entityStore();
+    GenericTableEntity entity;
+    try {
+      entity = entityStore.get(ident, Entity.EntityType.TABLE, 
GenericTableEntity.class);
+    } catch (NoSuchEntityException e) {
+      throw new NoSuchTableException("No such table: %s", ident);
+    } catch (IOException ioe) {
+      throw new RuntimeException("Failed to load table entity for: " + ident, 
ioe);
+    }
+
+    String location = entity.getProperties().get("location");
+    try (Dataset dataset = Dataset.open(location, new RootAllocator())) {
+      // For Lance, we only support adding indexes, so in fact, we can't 
handle drop index here.
+      for (Index index : addedIndexes) {
+        IndexType indexType = IndexType.valueOf(index.type().name());
+        IndexParams indexParams = getIndexParamsByIndexType(indexType);
+
+        dataset.createIndex(
+            Arrays.stream(index.fieldNames())
+                .map(fieldPath -> String.join(".", fieldPath))
+                .collect(Collectors.toList()),
+            indexType,
+            Optional.of(index.name()),
+            indexParams,
+            true);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to alter Lance table: " + ident, e);
+    }
+
+    GenericTable oldTable = entity.toGenericTable();
+    Index[] newIndexes = oldTable.index();
+    for (Index index : addedIndexes) {
+      newIndexes = ArrayUtils.add(newIndexes, index);
+    }
+
+    return GenericLakehouseTable.builder()
+        .withFormat(oldTable.format())
+        .withProperties(oldTable.properties())
+        .withAuditInfo((AuditInfo) oldTable.auditInfo())
+        .withSortOrders(oldTable.sortOrder())
+        .withPartitioning(oldTable.partitioning())
+        .withDistribution(oldTable.distribution())
+        .withColumns(oldTable.columns())
+        .withIndexes(newIndexes)
+        .withName(oldTable.name())
+        .withComment(oldTable.comment())
+        .build();
+  }
+
+  private IndexParams getIndexParamsByIndexType(IndexType indexType) {
+    switch (indexType) {
+      case SCALAR:
+        return new IndexParams.Builder().build();
+      case VECTOR:
+        // TODO make these parameters configurable
+        int numberOfDimensions = 3; // this value should be determined 
dynamically based on the data
+        // Add properties to Index to set this value.
+        return new IndexParams.Builder()
+            .setVectorIndexParams(
+                VectorIndexParams.ivfPq(2, 8, numberOfDimensions, 
DistanceType.L2, 2))
+            .build();
+      default:
+        throw new IllegalArgumentException("Unsupported index type: " + 
indexType);
+    }
   }
 
   @Override
diff --git 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
index 117863659e..d7966edd5e 100644
--- 
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
+++ 
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
@@ -19,6 +19,8 @@
 
 package org.apache.gravitino.catalog.lakehouse.lance;
 
+import com.google.common.collect.Lists;
+import java.util.List;
 import org.apache.arrow.vector.types.DateUnit;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
 import org.apache.arrow.vector.types.TimeUnit;
@@ -27,9 +29,11 @@ import org.apache.arrow.vector.types.pojo.ArrowType.Bool;
 import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint;
 import org.apache.arrow.vector.types.pojo.ArrowType.Int;
 import org.apache.gravitino.connector.DataTypeConverter;
+import org.apache.gravitino.json.JsonUtils;
 import org.apache.gravitino.rel.types.Type;
 import org.apache.gravitino.rel.types.Types;
 import org.apache.gravitino.rel.types.Types.FixedType;
+import org.apache.gravitino.rel.types.Types.UnparsedType;
 
 public class LanceDataTypeConverter implements DataTypeConverter<ArrowType, 
ArrowType> {
 
@@ -67,6 +71,30 @@ public class LanceDataTypeConverter implements 
DataTypeConverter<ArrowType, Arro
         return new ArrowType.FixedSizeBinary(fixedType.length());
       case BINARY:
         return new ArrowType.Binary();
+      case UNPARSED:
+        String typeStr = ((UnparsedType) type).unparsedType().toString();
+        try {
+          Type t = JsonUtils.anyFieldMapper().readValue(typeStr, Type.class);
+          if (t instanceof Types.ListType) {
+            return ArrowType.List.INSTANCE;
+          } else if (t instanceof Types.MapType) {
+            return new ArrowType.Map(false);
+          } else if (t instanceof Types.StructType) {
+            return ArrowType.Struct.INSTANCE;
+          } else {
+            throw new UnsupportedOperationException(
+                "Unsupported UnparsedType conversion: " + t.simpleString());
+          }
+        } catch (Exception e) {
+          // FixedSizeListArray(integer, 3)
+          if (typeStr.startsWith("FixedSizeListArray")) {
+            int size =
+                Integer.parseInt(
+                    typeStr.substring(typeStr.indexOf(',') + 1, 
typeStr.indexOf(')')).trim());
+            return new ArrowType.FixedSizeList(size);
+          }
+          throw new UnsupportedOperationException("Failed to parse 
UnparsedType: " + typeStr, e);
+        }
       default:
         throw new UnsupportedOperationException("Unsupported Gravitino type: " 
+ type.name());
     }
@@ -116,8 +144,49 @@ public class LanceDataTypeConverter implements 
DataTypeConverter<ArrowType, Arro
       return Types.StringType.get();
     } else if (arrowType instanceof ArrowType.Binary) {
       return Types.BinaryType.get();
+      // TODO handle complex types like List, Map, Struct
     } else {
       throw new UnsupportedOperationException("Unsupported Arrow type: " + 
arrowType);
     }
   }
+
+  public List<ArrowType> getChildTypes(Type parentType) {
+    if (parentType.name() != Type.Name.UNPARSED) {
+      return List.of();
+    }
+
+    List<ArrowType> arrowTypes = Lists.newArrayList();
+    String typeStr = ((UnparsedType) parentType).unparsedType().toString();
+    try {
+      Type t = JsonUtils.anyFieldMapper().readValue(typeStr, Type.class);
+      if (t instanceof Types.ListType listType) {
+        arrowTypes.add(fromGravitino(listType.elementType()));
+      } else if (t instanceof Types.MapType mapType) {
+        arrowTypes.add(fromGravitino(mapType.keyType()));
+        arrowTypes.add(fromGravitino(mapType.valueType()));
+      } else {
+        // TODO support struct type.
+        throw new UnsupportedOperationException(
+            "Unsupported UnparsedType conversion: " + t.simpleString());
+      }
+
+      return arrowTypes;
+    } catch (Exception e) {
+      // FixedSizeListArray(integer, 3)
+
+      try {
+        if (typeStr.startsWith("FixedSizeListArray")) {
+          String type = typeStr.substring(typeStr.indexOf('(') + 1, 
typeStr.indexOf(',')).trim();
+          Type childType = JsonUtils.anyFieldMapper().readValue("\"" + type + 
"\"", Type.class);
+          arrowTypes.add(fromGravitino(childType));
+
+          return arrowTypes;
+        }
+      } catch (Exception e1) {
+        throw new UnsupportedOperationException("Failed to parse UnparsedType: 
" + typeStr, e1);
+      }
+
+      throw new UnsupportedOperationException("Failed to parse UnparsedType: " 
+ typeStr, e);
+    }
+  }
 }
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestRelationalCatalog.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestRelationalCatalog.java
index c5e36247bb..ff0a29c032 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestRelationalCatalog.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestRelationalCatalog.java
@@ -571,16 +571,6 @@ public class TestRelationalCatalog extends TestBase {
                     tableId, fromDTOs(columns), "comment", emptyMap, 
errorPartitioning));
     Assertions.assertTrue(ex2.getMessage().contains("not found in table"));
 
-    // Test empty columns
-    Throwable ex3 =
-        Assertions.assertThrows(
-            IllegalArgumentException.class,
-            () ->
-                tableCatalog.createTable(
-                    tableId, new Column[0], "comment", emptyMap, 
errorPartitioning));
-    Assertions.assertTrue(
-        ex3.getMessage().contains("\"columns\" field is required and cannot be 
empty"));
-
     // Test partitioning with assignments
     Partitioning[] partitioningWithAssignments = {
       RangePartitioningDTO.of(
diff --git a/common/src/main/java/org/apache/gravitino/dto/rel/TableDTO.java 
b/common/src/main/java/org/apache/gravitino/dto/rel/TableDTO.java
index c1fb160a38..a23a1cd37c 100644
--- a/common/src/main/java/org/apache/gravitino/dto/rel/TableDTO.java
+++ b/common/src/main/java/org/apache/gravitino/dto/rel/TableDTO.java
@@ -314,8 +314,6 @@ public class TableDTO implements Table {
     public TableDTO build() {
       Preconditions.checkArgument(name != null && !name.isEmpty(), "name 
cannot be null or empty");
       Preconditions.checkArgument(audit != null, "audit cannot be null");
-      Preconditions.checkArgument(
-          columns != null && columns.length > 0, "columns cannot be null or 
empty");
 
       return new TableDTO(
           name,
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/requests/TableCreateRequest.java
 
b/common/src/main/java/org/apache/gravitino/dto/requests/TableCreateRequest.java
index d6d22ddb9f..8a97c9e2dd 100644
--- 
a/common/src/main/java/org/apache/gravitino/dto/requests/TableCreateRequest.java
+++ 
b/common/src/main/java/org/apache/gravitino/dto/requests/TableCreateRequest.java
@@ -122,9 +122,6 @@ public class TableCreateRequest implements RESTRequest {
   public void validate() throws IllegalArgumentException {
     Preconditions.checkArgument(
         StringUtils.isNotBlank(name), "\"name\" field is required and cannot 
be empty");
-    Preconditions.checkArgument(
-        columns != null && columns.length != 0,
-        "\"columns\" field is required and cannot be empty");
 
     if (sortOrders != null) {
       Arrays.stream(sortOrders).forEach(sortOrder -> 
sortOrder.validate(columns));
diff --git 
a/common/src/main/java/org/apache/gravitino/dto/responses/TableResponse.java 
b/common/src/main/java/org/apache/gravitino/dto/responses/TableResponse.java
index d3dfa21599..a1cc326295 100644
--- a/common/src/main/java/org/apache/gravitino/dto/responses/TableResponse.java
+++ b/common/src/main/java/org/apache/gravitino/dto/responses/TableResponse.java
@@ -63,9 +63,6 @@ public class TableResponse extends BaseResponse {
     Preconditions.checkArgument(table != null, "table must not be null");
     Preconditions.checkArgument(
         StringUtils.isNotBlank(table.name()), "table 'name' must not be null 
and empty");
-    Preconditions.checkArgument(
-        table.columns() != null && table.columns().length > 0,
-        "table 'columns' must not be null and empty");
     Preconditions.checkArgument(table.auditInfo() != null, "table 'audit' must 
not be null");
     Preconditions.checkArgument(
         table.partitioning() != null, "table 'partitions' must not be null");
diff --git a/common/src/main/java/org/apache/gravitino/json/JsonUtils.java 
b/common/src/main/java/org/apache/gravitino/json/JsonUtils.java
index 5fd2ab3f8f..fb9a33f268 100644
--- a/common/src/main/java/org/apache/gravitino/json/JsonUtils.java
+++ b/common/src/main/java/org/apache/gravitino/json/JsonUtils.java
@@ -715,7 +715,7 @@ public class JsonUtils {
       String text = node.asText().toLowerCase();
       return text.equals(Types.NullType.get().simpleString())
           ? Types.NullType.get()
-          : fromPrimitiveTypeString(text);
+          : fromPrimitiveTypeString(text, node.asText());
     }
 
     if (node.isObject() && node.has(TYPE)) {
@@ -834,49 +834,49 @@ public class JsonUtils {
     gen.writeEndObject();
   }
 
-  private static Type fromPrimitiveTypeString(String typeString) {
-    Type.PrimitiveType primitiveType = TYPES.get(typeString);
+  private static Type fromPrimitiveTypeString(String lowerTypeString, String 
orignalTypeString) {
+    Type.PrimitiveType primitiveType = TYPES.get(lowerTypeString);
     if (primitiveType != null) {
       return primitiveType;
     }
 
-    Matcher fixed = FIXED.matcher(typeString);
+    Matcher fixed = FIXED.matcher(lowerTypeString);
     if (fixed.matches()) {
       return Types.FixedType.of(Integer.parseInt(fixed.group(1)));
     }
 
-    Matcher fixedChar = FIXEDCHAR.matcher(typeString);
+    Matcher fixedChar = FIXEDCHAR.matcher(lowerTypeString);
     if (fixedChar.matches()) {
       return Types.FixedCharType.of(Integer.parseInt(fixedChar.group(1)));
     }
 
-    Matcher varchar = VARCHAR.matcher(typeString);
+    Matcher varchar = VARCHAR.matcher(lowerTypeString);
     if (varchar.matches()) {
       return Types.VarCharType.of(Integer.parseInt(varchar.group(1)));
     }
 
-    Matcher decimal = DECIMAL.matcher(typeString);
+    Matcher decimal = DECIMAL.matcher(lowerTypeString);
     if (decimal.matches()) {
       return Types.DecimalType.of(
           Integer.parseInt(decimal.group(1)), 
Integer.parseInt(decimal.group(2)));
     }
 
-    Matcher time = TIME.matcher(typeString);
+    Matcher time = TIME.matcher(lowerTypeString);
     if (time.matches()) {
       return Types.TimeType.of(Integer.parseInt(time.group(1)));
     }
 
-    Matcher timestampTz = TIMESTAMP_TZ.matcher(typeString);
+    Matcher timestampTz = TIMESTAMP_TZ.matcher(lowerTypeString);
     if (timestampTz.matches()) {
       return 
Types.TimestampType.withTimeZone(Integer.parseInt(timestampTz.group(1)));
     }
 
-    Matcher timestamp = TIMESTAMP.matcher(typeString);
+    Matcher timestamp = TIMESTAMP.matcher(lowerTypeString);
     if (timestamp.matches()) {
       return 
Types.TimestampType.withoutTimeZone(Integer.parseInt(timestamp.group(1)));
     }
 
-    return Types.UnparsedType.of(typeString);
+    return Types.UnparsedType.of(orignalTypeString);
   }
 
   private static Types.StructType readStructType(JsonNode node) {
diff --git 
a/common/src/test/java/org/apache/gravitino/dto/rel/TestTableDTO.java 
b/common/src/test/java/org/apache/gravitino/dto/rel/TestTableDTO.java
index 5f0733c5ea..44de2e03d4 100644
--- a/common/src/test/java/org/apache/gravitino/dto/rel/TestTableDTO.java
+++ b/common/src/test/java/org/apache/gravitino/dto/rel/TestTableDTO.java
@@ -19,11 +19,11 @@
 package org.apache.gravitino.dto.rel;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.time.Instant;
 import org.apache.gravitino.dto.AuditDTO;
 import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 public class TestTableDTO {
@@ -32,7 +32,10 @@ public class TestTableDTO {
     AuditDTO audit =
         
AuditDTO.builder().withCreator("creator").withCreateTime(Instant.now()).build();
     TableDTO.Builder<?> builder = 
TableDTO.builder().withName("t1").withAudit(audit);
-    assertThrows(IllegalArgumentException.class, builder::build);
+    Assertions.assertDoesNotThrow(
+        () -> {
+          builder.build();
+        });
   }
 
   @Test
@@ -41,7 +44,11 @@ public class TestTableDTO {
         
AuditDTO.builder().withCreator("creator").withCreateTime(Instant.now()).build();
     TableDTO.Builder<?> builder =
         TableDTO.builder().withName("t1").withAudit(audit).withColumns(new 
ColumnDTO[0]);
-    assertThrows(IllegalArgumentException.class, builder::build);
+
+    Assertions.assertDoesNotThrow(
+        () -> {
+          builder.build();
+        });
   }
 
   @Test
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index 1cbab0d6ed..27119f0c99 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -262,6 +262,57 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
             tableId = te.id();
           }
 
+          if (isGenericLakehouseCatalog(catalogIdent)) {
+            // For generic lakehouse catalog, we only update the table entity 
with basic info.
+            GenericTableEntity genericTableEntity =
+                operateOnEntity(
+                    ident, id -> store.get(id, TABLE, 
GenericTableEntity.class), "GET", tableId);
+            if (genericTableEntity == null) {
+              throw new NoSuchTableException("No such table: %s", ident);
+            }
+
+            GenericTable genericTable = (GenericTable) alteredTable;
+            GenericTableEntity updatedGenericTableEntity =
+                operateOnEntity(
+                    ident,
+                    id ->
+                        store.update(
+                            id,
+                            GenericTableEntity.class,
+                            TABLE,
+                            tableEntity ->
+                                GenericTableEntity.getBuilder()
+                                    .withId(tableEntity.id())
+                                    .withName(alteredTable.name())
+                                    .withNamespace(getNewNamespace(ident, 
changes))
+                                    .withFormat(genericTable.format())
+                                    .withAuditInfo(
+                                        AuditInfo.builder()
+                                            
.withCreator(tableEntity.auditInfo().creator())
+                                            
.withCreateTime(tableEntity.auditInfo().createTime())
+                                            .withLastModifier(
+                                                
PrincipalUtils.getCurrentPrincipal().getName())
+                                            
.withLastModifiedTime(Instant.now())
+                                            .build())
+                                    .withColumns(tableEntity.columns())
+                                    .withIndexes(genericTable.index())
+                                    
.withDistribution(genericTable.distribution())
+                                    
.withPartitions(genericTable.partitioning())
+                                    .withSortOrder(genericTable.sortOrder())
+                                    .withProperties(genericTable.properties())
+                                    .withComment(genericTable.comment())
+                                    .build()),
+                    "UPDATE",
+                    tableId);
+
+            return EntityCombinedTable.of(alteredTable, 
updatedGenericTableEntity)
+                .withHiddenProperties(
+                    getHiddenPropertyNames(
+                        getCatalogIdentifier(ident),
+                        HasPropertyMetadata::tablePropertiesMetadata,
+                        alteredTable.properties()));
+          }
+
           TableEntity updatedTableEntity =
               operateOnEntity(
                   ident,
diff --git 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceNamespaceOperations.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceNamespaceOperations.java
index 226de4dbd7..49141665a5 100644
--- 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceNamespaceOperations.java
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceNamespaceOperations.java
@@ -25,6 +25,7 @@ import 
com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
 import com.lancedb.lance.namespace.model.DropNamespaceRequest;
 import com.lancedb.lance.namespace.model.DropNamespaceResponse;
 import com.lancedb.lance.namespace.model.ListNamespacesResponse;
+import com.lancedb.lance.namespace.model.ListTablesResponse;
 import java.util.Map;
 
 public interface LanceNamespaceOperations {
@@ -47,4 +48,6 @@ public interface LanceNamespaceOperations {
       DropNamespaceRequest.BehaviorEnum behavior);
 
   void namespaceExists(String namespaceId, String delimiter) throws 
LanceNamespaceException;
+
+  ListTablesResponse listTables(String id, String delimiter, String pageToken, 
Integer limit);
 }
diff --git 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
index 057dce8fb3..b8a967cd30 100644
--- 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
@@ -18,11 +18,20 @@
  */
 package org.apache.gravitino.lance.common.ops;
 
-import com.lancedb.lance.namespace.model.ListTablesResponse;
+import com.lancedb.lance.namespace.model.CreateTableResponse;
+import com.lancedb.lance.namespace.model.DescribeTableResponse;
+import java.util.Map;
 
 public interface LanceTableOperations {
 
-  ListTablesResponse listTables(String id, String delimiter, String pageToken, 
Integer limit);
+  DescribeTableResponse describeTable(String tableId, String delimiter);
 
-  // todo: add more table operation methods
+  CreateTableResponse createTable(
+      String tableId,
+      String mode,
+      String delimiter,
+      String tableLocation,
+      Map<String, String> tableProperties,
+      String rootCatalog,
+      byte[] arrowStreamBody);
 }
diff --git 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/arrow/ArrowRecordBatchList.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/arrow/ArrowRecordBatchList.java
new file mode 100644
index 0000000000..b0c6a089d5
--- /dev/null
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/arrow/ArrowRecordBatchList.java
@@ -0,0 +1,40 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lance.common.ops.arrow;
+
+import java.util.List;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+public class ArrowRecordBatchList {
+  private final Schema schema;
+
+  @SuppressWarnings("unused")
+  private final List<VectorSchemaRoot> recordBatches;
+
+  public Schema getSchema() {
+    return schema;
+  }
+
+  public ArrowRecordBatchList(Schema schema, List<VectorSchemaRoot> 
recordBatches) {
+    this.schema = schema;
+    this.recordBatches = recordBatches;
+  }
+}
diff --git 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
index cb1b85752a..1f9e41a2cb 100644
--- 
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
+++ 
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
@@ -20,7 +20,9 @@ package org.apache.gravitino.lance.common.ops.gravitino;
 
 import static 
org.apache.gravitino.lance.common.config.LanceConfig.METALAKE_NAME;
 import static 
org.apache.gravitino.lance.common.config.LanceConfig.NAMESPACE_URI;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -28,14 +30,22 @@ import com.google.common.collect.Sets;
 import com.lancedb.lance.namespace.LanceNamespaceException;
 import com.lancedb.lance.namespace.ObjectIdentifier;
 import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
+import com.lancedb.lance.namespace.model.CreateNamespaceRequest.ModeEnum;
 import com.lancedb.lance.namespace.model.CreateNamespaceResponse;
+import com.lancedb.lance.namespace.model.CreateTableResponse;
 import com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
+import com.lancedb.lance.namespace.model.DescribeTableResponse;
 import com.lancedb.lance.namespace.model.DropNamespaceRequest;
 import com.lancedb.lance.namespace.model.DropNamespaceResponse;
+import com.lancedb.lance.namespace.model.JsonArrowDataType;
+import com.lancedb.lance.namespace.model.JsonArrowField;
+import com.lancedb.lance.namespace.model.JsonArrowSchema;
 import com.lancedb.lance.namespace.model.ListNamespacesResponse;
 import com.lancedb.lance.namespace.model.ListTablesResponse;
 import com.lancedb.lance.namespace.util.CommonUtil;
 import com.lancedb.lance.namespace.util.PageUtil;
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -44,11 +54,27 @@ import java.util.Optional;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.function.IntFunction;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.ArrowType.Bool;
+import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint;
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.CatalogChange;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
 import org.apache.gravitino.Schema;
 import org.apache.gravitino.SchemaChange;
 import org.apache.gravitino.client.GravitinoClient;
@@ -58,10 +84,18 @@ import 
org.apache.gravitino.exceptions.NoSuchSchemaException;
 import org.apache.gravitino.exceptions.NonEmptyCatalogException;
 import org.apache.gravitino.exceptions.NonEmptySchemaException;
 import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.json.JsonUtils;
 import org.apache.gravitino.lance.common.config.LanceConfig;
 import org.apache.gravitino.lance.common.ops.LanceNamespaceOperations;
 import org.apache.gravitino.lance.common.ops.LanceTableOperations;
 import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
+import org.apache.gravitino.lance.common.ops.arrow.ArrowRecordBatchList;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.rel.types.Types.FixedType;
+import org.apache.gravitino.rel.types.Types.UnparsedType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -245,12 +279,6 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
     }
   }
 
-  @Override
-  public ListTablesResponse listTables(
-      String id, String delimiter, String pageToken, Integer limit) {
-    throw new UnsupportedOperationException("Not implemented yet");
-  }
-
   private boolean isLakehouseCatalog(Catalog catalog) {
     return catalog.type().equals(Catalog.Type.RELATIONAL)
         && "generic-lakehouse".equals(catalog.provider());
@@ -467,4 +495,308 @@ public class GravitinoLanceNamespaceWrapper extends 
NamespaceWrapper
 
     return Stream.concat(setPropertiesStream, 
removePropertiesStream).toArray(arrayCreator);
   }
+
+  @Override
+  public ListTablesResponse listTables(
+      String id, String delimiter, String pageToken, Integer limit) {
+    ObjectIdentifier nsId = ObjectIdentifier.of(id, Pattern.quote(delimiter));
+    Preconditions.checkArgument(
+        nsId.levels() <= 2, "Expected at most 2-level namespace but got: %s", 
nsId.levels());
+    String catalogName = nsId.levelAtListPos(0);
+    Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
+    String schemaName = nsId.levelAtListPos(1);
+    List<String> tables =
+        
Arrays.stream(catalog.asTableCatalog().listTables(Namespace.of(schemaName)))
+            .map(ident -> Joiner.on(delimiter).join(catalogName, schemaName, 
ident.name()))
+            .collect(Collectors.toList());
+
+    Collections.sort(tables);
+    PageUtil.Page page = PageUtil.splitPage(tables, pageToken, 
PageUtil.normalizePageSize(limit));
+    ListNamespacesResponse response = new ListNamespacesResponse();
+    response.setNamespaces(Sets.newHashSet(page.items()));
+    response.setPageToken(page.nextPageToken());
+
+    return new ListTablesResponse()
+        .tables(response.getNamespaces())
+        .pageToken(response.getPageToken());
+  }
+
+  @Override
+  public DescribeTableResponse describeTable(String tableId, String delimiter) 
{
+    ObjectIdentifier nsId = ObjectIdentifier.of(tableId, 
Pattern.quote(delimiter));
+    Preconditions.checkArgument(
+        nsId.levels() <= 3, "Expected at most 3-level namespace but got: %s", 
nsId.levels());
+
+    String catalogName = nsId.levelAtListPos(0);
+    Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
+    NameIdentifier tableIdentifier =
+        NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
+
+    Table table = catalog.asTableCatalog().loadTable(tableIdentifier);
+    DescribeTableResponse response = new DescribeTableResponse();
+    response.setProperties(table.properties());
+    response.setLocation(table.properties().get("location"));
+    response.setSchema(toJsonArrowSchema(table.columns()));
+    return response;
+  }
+
+  private JsonArrowSchema toJsonArrowSchema(Column[] columns) {
+    List<JsonArrowField> fields = new ArrayList<>();
+    for (Column column : columns) {
+      ArrowType arrowType = fromGravitinoType(column.dataType());
+      FieldType fieldType = new FieldType(column.nullable(), arrowType, null, 
null);
+      Field field = new Field(column.name(), fieldType, null);
+
+      JsonArrowDataType jsonArrowDataType = new JsonArrowDataType();
+      // other filed needs to be set accordingly such as list, map, struct
+      jsonArrowDataType.setType(arrowType.toString());
+
+      JsonArrowField arrowField = new JsonArrowField();
+      arrowField.setName(field.getName());
+      arrowField.setType(jsonArrowDataType);
+
+      fields.add(arrowField);
+    }
+
+    JsonArrowSchema jsonArrowSchema = new JsonArrowSchema();
+    jsonArrowSchema.setFields(fields);
+    return jsonArrowSchema;
+  }
+
+  @Override
+  public CreateTableResponse createTable(
+      String tableId,
+      String mode,
+      String delimiter,
+      String tableLocation,
+      Map<String, String> tableProperties,
+      String rootCatalog,
+      byte[] arrowStreamBody) {
+    ObjectIdentifier nsId = ObjectIdentifier.of(tableId, 
Pattern.quote(delimiter));
+    Preconditions.checkArgument(
+        nsId.levels() <= 3, "Expected at most 3-level namespace but got: %s", 
nsId.levels());
+    if (rootCatalog != null) {
+      List<String> levels = nsId.listStyleId();
+      List<String> newLevels = Lists.newArrayList(rootCatalog);
+      newLevels.addAll(levels);
+      nsId = ObjectIdentifier.of(newLevels);
+    }
+
+    // Parser column information.
+    List<Column> columns = Lists.newArrayList();
+    if (arrowStreamBody != null) {
+      ArrowRecordBatchList recordBatchList = 
parseArrowIpcStream(arrowStreamBody);
+      columns = extractColumns(recordBatchList);
+    }
+
+    String catalogName = nsId.levelAtListPos(0);
+    Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
+
+    NameIdentifier tableIdentifier =
+        NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
+
+    Map<String, String> createTableProperties = 
Maps.newHashMap(tableProperties);
+    createTableProperties.put("location", tableLocation);
+    createTableProperties.put("mode", mode);
+    // TODO considering the mode (create, exist_ok, overwrite)
+
+    ModeEnum createMode = ModeEnum.fromValue(mode.toLowerCase());
+    switch (createMode) {
+      case EXIST_OK:
+        if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
+          CreateTableResponse response = new CreateTableResponse();
+          Table existingTable = 
catalog.asTableCatalog().loadTable(tableIdentifier);
+          response.setProperties(existingTable.properties());
+          response.setLocation(existingTable.properties().get("location"));
+          response.setVersion(0L);
+          return response;
+        }
+        break;
+      case CREATE:
+        if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
+          throw LanceNamespaceException.conflict(
+              "Table already exists: " + tableId,
+              SchemaAlreadyExistsException.class.getSimpleName(),
+              tableId,
+              CommonUtil.formatCurrentStackTrace());
+        }
+        break;
+      case OVERWRITE:
+        if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
+          catalog.asTableCatalog().dropTable(tableIdentifier);
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("Unknown mode: " + mode);
+    }
+
+    Table t =
+        catalog
+            .asTableCatalog()
+            .createTable(
+                tableIdentifier,
+                columns.toArray(new Column[0]),
+                tableLocation,
+                createTableProperties);
+
+    CreateTableResponse response = new CreateTableResponse();
+    response.setProperties(t.properties());
+    response.setLocation(tableLocation);
+    response.setVersion(0L);
+    return response;
+  }
+
+  private ArrowRecordBatchList parseArrowIpcStream(byte[] stream) {
+    try (BufferAllocator allocator = new RootAllocator();
+        ByteArrayInputStream bais = new ByteArrayInputStream(stream);
+        ArrowStreamReader reader = new ArrowStreamReader(bais, allocator)) {
+
+      org.apache.arrow.vector.types.pojo.Schema schema = 
reader.getVectorSchemaRoot().getSchema();
+      List<VectorSchemaRoot> batches = new ArrayList<>();
+
+      while (reader.loadNextBatch()) {
+        VectorSchemaRoot root = reader.getVectorSchemaRoot();
+        if (root.getRowCount() > 0) {
+          batches.add(root);
+        }
+      }
+      return new ArrowRecordBatchList(schema, batches);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to parse Arrow IPC stream", e);
+    }
+  }
+
+  private List<Column> extractColumns(ArrowRecordBatchList recordBatchList) {
+    List<Column> columns = new ArrayList<>();
+    org.apache.arrow.vector.types.pojo.Schema arrowSchema = 
recordBatchList.getSchema();
+
+    for (org.apache.arrow.vector.types.pojo.Field field : 
arrowSchema.getFields()) {
+      columns.add(toGravitinoColumn(field));
+    }
+    return columns;
+  }
+
+  private Column toGravitinoColumn(Field field) {
+    return Column.of(
+        field.getName(),
+        toGravitinoType(field),
+        field.getMetadata().get("comment"),
+        field.isNullable(),
+        false,
+        DEFAULT_VALUE_NOT_SET);
+  }
+
+  private ArrowType fromGravitinoType(Type type) {
+    switch (type.name()) {
+      case BOOLEAN:
+        return Bool.INSTANCE;
+      case BYTE:
+        return new Int(8, true);
+      case SHORT:
+        return new Int(16, true);
+      case INTEGER:
+        return new Int(32, true);
+      case LONG:
+        return new Int(64, true);
+      case FLOAT:
+        return new FloatingPoint(FloatingPointPrecision.SINGLE);
+      case DOUBLE:
+        return new FloatingPoint(FloatingPointPrecision.DOUBLE);
+      case DECIMAL:
+        // Lance uses FIXED_SIZE_BINARY for decimal types
+        return new ArrowType.FixedSizeBinary(16); // assuming 16 bytes for 
decimal
+      case DATE:
+        return new ArrowType.Date(DateUnit.DAY);
+      case TIME:
+        return new ArrowType.Time(TimeUnit.MILLISECOND, 32);
+      case TIMESTAMP:
+        return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
+      case VARCHAR:
+      case STRING:
+        return new ArrowType.Utf8();
+      case FIXED:
+        FixedType fixedType = (FixedType) type;
+        return new ArrowType.FixedSizeBinary(fixedType.length());
+      case BINARY:
+        return new ArrowType.Binary();
+      case UNPARSED:
+        String typeStr = ((UnparsedType) type).unparsedType().toString();
+        try {
+          Type t = JsonUtils.anyFieldMapper().readValue(typeStr, Type.class);
+          if (t instanceof Types.ListType) {
+            return ArrowType.List.INSTANCE;
+          } else if (t instanceof Types.MapType) {
+            return new ArrowType.Map(false);
+          } else if (t instanceof Types.StructType) {
+            return ArrowType.Struct.INSTANCE;
+          } else {
+            throw new UnsupportedOperationException(
+                "Unsupported UnparsedType conversion: " + t.simpleString());
+          }
+        } catch (Exception e) {
+          // FixedSizeListArray(integer, 3)
+          if (typeStr.startsWith("FixedSizeListArray")) {
+            int size =
+                Integer.parseInt(
+                    typeStr.substring(typeStr.indexOf(',') + 1, 
typeStr.indexOf(')')).trim());
+            return new ArrowType.FixedSizeList(size);
+          }
+          throw new UnsupportedOperationException("Failed to parse 
UnparsedType: " + typeStr, e);
+        }
+      default:
+        throw new UnsupportedOperationException("Unsupported Gravitino type: " 
+ type.name());
+    }
+  }
+
+  private Type toGravitinoType(Field field) {
+    FieldType parentType = field.getFieldType();
+    ArrowType arrowType = parentType.getType();
+    if (arrowType instanceof Bool) {
+      return Types.BooleanType.get();
+    } else if (arrowType instanceof Int) {
+      Int intType = (Int) arrowType;
+      switch (intType.getBitWidth()) {
+        case 8 -> {
+          return Types.ByteType.get();
+        }
+        case 16 -> {
+          return Types.ShortType.get();
+        }
+        case 32 -> {
+          return Types.IntegerType.get();
+        }
+        case 64 -> {
+          return Types.LongType.get();
+        }
+        default -> throw new UnsupportedOperationException(
+            "Unsupported Int bit width: " + intType.getBitWidth());
+      }
+    } else if (arrowType instanceof FloatingPoint) {
+      FloatingPoint floatingPoint = (FloatingPoint) arrowType;
+      switch (floatingPoint.getPrecision()) {
+        case SINGLE:
+          return Types.FloatType.get();
+        case DOUBLE:
+          return Types.DoubleType.get();
+        default:
+          throw new UnsupportedOperationException(
+              "Unsupported FloatingPoint precision: " + 
floatingPoint.getPrecision());
+      }
+    } else if (arrowType instanceof ArrowType.FixedSizeBinary) {
+      ArrowType.FixedSizeBinary fixedSizeBinary = (ArrowType.FixedSizeBinary) 
arrowType;
+      return Types.FixedType.of(fixedSizeBinary.getByteWidth());
+    } else if (arrowType instanceof ArrowType.Date) {
+      return Types.DateType.get();
+    } else if (arrowType instanceof ArrowType.Time) {
+      return Types.TimeType.get();
+    } else if (arrowType instanceof ArrowType.Timestamp) {
+      return Types.TimestampType.withoutTimeZone();
+    } else if (arrowType instanceof ArrowType.Utf8) {
+      return Types.StringType.get();
+    } else if (arrowType instanceof ArrowType.Binary) {
+      return Types.BinaryType.get();
+    } else {
+      return Types.UnparsedType.of(arrowType.toString());
+    }
+  }
 }
diff --git 
a/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/TestArrowIPC.java
 
b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/TestArrowIPC.java
new file mode 100644
index 0000000000..71f1bfc587
--- /dev/null
+++ 
b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/TestArrowIPC.java
@@ -0,0 +1,83 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.gravitino.lance.common;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.Arrays;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Text;
+import org.junit.jupiter.api.Test;
+
+public class TestArrowIPC {
+
+  private static final String FILENAME = "/tmp/initial_data.arrow";
+  private static final int RECORD_COUNT = 3;
+
+  @Test
+  void testIPC() throws Exception {
+    try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+
+      Schema schema =
+          new Schema(
+              Arrays.asList(
+                  Field.nullable("id", new ArrowType.Int(32, true)),
+                  Field.nullable("value", new ArrowType.Utf8())));
+
+      try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator)) 
{
+        IntVector idVector = (IntVector) root.getVector("id");
+        VarCharVector valueVector = (VarCharVector) root.getVector("value");
+
+        idVector.allocateNew();
+        valueVector.allocateNew();
+
+        for (int i = 0; i < RECORD_COUNT; i++) {
+          idVector.setSafe(i, i + 1);
+          valueVector.setSafe(i, new Text("Row_" + (i + 1)));
+        }
+
+        idVector.setValueCount(RECORD_COUNT);
+        valueVector.setValueCount(RECORD_COUNT);
+        root.setRowCount(RECORD_COUNT);
+
+        File outFile = new File(FILENAME);
+        try (FileOutputStream fos = new FileOutputStream(outFile);
+            ArrowStreamWriter writer = new ArrowStreamWriter(root, null, fos)) 
{
+
+          writer.start();
+          writer.writeBatch();
+          writer.end();
+        }
+
+        System.out.println(
+            "✅ Successfully generated Arrow IPC Stream file: " + 
outFile.getAbsolutePath());
+        System.out.println("--- Ready for cURL test ---");
+      }
+    }
+  }
+}
diff --git a/lance/lance-rest-server/build.gradle.kts 
b/lance/lance-rest-server/build.gradle.kts
index 4e4ca7db3c..0d7a2c9852 100644
--- a/lance/lance-rest-server/build.gradle.kts
+++ b/lance/lance-rest-server/build.gradle.kts
@@ -38,6 +38,8 @@ dependencies {
   }
 
   implementation(project(":lance:lance-common"))
+  implementation(libs.lance)
+  implementation(libs.commons.lang3)
 
   implementation(libs.bundles.jetty)
   implementation(libs.bundles.jersey)
diff --git 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
 
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
index dd548541ad..493d0acace 100644
--- 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
+++ 
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
@@ -28,6 +28,7 @@ import 
com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
 import com.lancedb.lance.namespace.model.DropNamespaceRequest;
 import com.lancedb.lance.namespace.model.DropNamespaceResponse;
 import com.lancedb.lance.namespace.model.ListNamespacesResponse;
+import com.lancedb.lance.namespace.model.ListTablesResponse;
 import java.util.regex.Pattern;
 import javax.inject.Inject;
 import javax.ws.rs.Consumes;
@@ -159,4 +160,20 @@ public class LanceNamespaceOperations {
       return LanceExceptionMapper.toRESTResponse(namespaceId, e);
     }
   }
+
+  @GET
+  @Path("{id}/table/list")
+  public Response listTables(
+      @PathParam("id") String namespaceId,
+      @DefaultValue("$") @QueryParam("delimiter") String delimiter,
+      @QueryParam("page_token") String pageToken,
+      @QueryParam("limit") Integer limit) {
+    try {
+      ListTablesResponse response =
+          lanceNamespace.asNamespaceOps().listTables(namespaceId, delimiter, 
pageToken, limit);
+      return Response.ok(response).build();
+    } catch (Exception e) {
+      return LanceExceptionMapper.toRESTResponse(namespaceId, e);
+    }
+  }
 }
diff --git 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
 
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
index 1f30d1b326..359fc94c42 100644
--- 
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
+++ 
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
@@ -18,27 +18,27 @@
  */
 package org.apache.gravitino.lance.service.rest;
 
-import static 
org.apache.gravitino.lance.common.ops.NamespaceWrapper.NAMESPACE_DELIMITER_DEFAULT;
-
-import com.codahale.metrics.annotation.ResponseMetered;
-import com.codahale.metrics.annotation.Timed;
-import com.lancedb.lance.namespace.model.ListTablesResponse;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.lancedb.lance.namespace.model.CreateTableResponse;
+import com.lancedb.lance.namespace.model.DescribeTableResponse;
+import com.lancedb.lance.namespace.util.JsonUtil;
+import java.util.Map;
 import javax.inject.Inject;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DefaultValue;
-import javax.ws.rs.Encoded;
-import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
+import javax.ws.rs.POST;
 import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
 import org.apache.gravitino.lance.service.LanceExceptionMapper;
-import org.apache.gravitino.metrics.MetricNames;
 
-@Path("/v1/namespace/{id}/table")
+@Path("/v1/table/{id}")
 @Consumes(MediaType.APPLICATION_JSON)
 @Produces(MediaType.APPLICATION_JSON)
 public class LanceTableOperations {
@@ -50,21 +50,68 @@ public class LanceTableOperations {
     this.lanceNamespace = lanceNamespace;
   }
 
-  @GET
-  @Path("/list")
-  @Timed(name = "list-tables." + MetricNames.HTTP_PROCESS_DURATION, absolute = 
true)
-  @ResponseMetered(name = "list-tables", absolute = true)
-  public Response listTables(
-      @Encoded @PathParam("id") String namespaceId,
-      @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) @QueryParam("delimiter") 
String delimiter,
-      @QueryParam("page_token") String pageToken,
-      @QueryParam("limit") Integer limit) {
+  @POST
+  @Path("/describe")
+  public Response describeTable(
+      @PathParam("id") String tableId,
+      @DefaultValue("$") @QueryParam("delimiter") String delimiter) {
+    try {
+      DescribeTableResponse response =
+          lanceNamespace.asTableOps().describeTable(tableId, delimiter);
+      return Response.ok(response).build();
+    } catch (Exception e) {
+      return LanceExceptionMapper.toRESTResponse(tableId, e);
+    }
+  }
+
+  @POST
+  @Path("/create")
+  @Consumes("application/vnd.apache.arrow.stream")
+  @Produces("application/json")
+  public Response createTable(
+      @PathParam("id") String tableId,
+      @QueryParam("mode") @DefaultValue("create") String mode, // create, 
exist_ok, overwrite
+      @QueryParam("delimiter") @DefaultValue("$") String delimiter,
+      @HeaderParam("x-lance-table-location") String tableLocation,
+      @HeaderParam("x-lance-table-properties") String tableProperties,
+      @HeaderParam("x-lance-root-catalog") String rootCatalog,
+      byte[] arrowStreamBody) {
+    try {
+      Map<String, String> props =
+          JsonUtil.mapper().readValue(tableProperties, new 
TypeReference<Map<String, String>>() {});
+      CreateTableResponse response =
+          lanceNamespace
+              .asTableOps()
+              .createTable(
+                  tableId, mode, delimiter, tableLocation, props, rootCatalog, 
arrowStreamBody);
+      return Response.ok(response).build();
+    } catch (Exception e) {
+      return LanceExceptionMapper.toRESTResponse(tableId, e);
+    }
+  }
+
+  @POST
+  @Path("/create-empty")
+  public Response createEmptyTable(
+      @PathParam("id") String tableId,
+      @QueryParam("mode") @DefaultValue("create") String mode, // create, 
exist_ok, overwrite
+      @QueryParam("delimiter") @DefaultValue("$") String delimiter,
+      @HeaderParam("x-lance-table-location") String tableLocation,
+      @HeaderParam("x-lance-root-catalog") String rootCatalog,
+      @HeaderParam("x-lance-table-properties") String tableProperties) {
     try {
-      ListTablesResponse response =
-          lanceNamespace.asTableOps().listTables(namespaceId, delimiter, 
pageToken, limit);
+      Map<String, String> props =
+          StringUtils.isBlank(tableProperties)
+              ? Map.of()
+              : JsonUtil.mapper()
+                  .readValue(tableProperties, new TypeReference<Map<String, 
String>>() {});
+      CreateTableResponse response =
+          lanceNamespace
+              .asTableOps()
+              .createTable(tableId, mode, delimiter, tableLocation, props, 
rootCatalog, null);
       return Response.ok(response).build();
     } catch (Exception e) {
-      return LanceExceptionMapper.toRESTResponse(namespaceId, e);
+      return LanceExceptionMapper.toRESTResponse(tableId, e);
     }
   }
 }
diff --git 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/jdbc/mysql/MySQLMetadataAdapter.java
 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/jdbc/mysql/MySQLMetadataAdapter.java
index 1c8d051cd5..d77fd62016 100644
--- 
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/jdbc/mysql/MySQLMetadataAdapter.java
+++ 
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/jdbc/mysql/MySQLMetadataAdapter.java
@@ -283,6 +283,8 @@ public class MySQLMetadataAdapter extends 
CatalogConnectorMetadataAdapter {
                     .collect(Collectors.toUnmodifiableList());
             uniqueKeys.add(String.format("%s:%s", index.name(), 
Strings.join(columns, ',')));
             break;
+          default:
+            throw new UnsupportedOperationException("Unsupported index type: " 
+ index.type());
         }
       }
       if (!primaryKeys.isEmpty()) {

Reply via email to