This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-lance-namepspace-dev
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-lance-namepspace-dev by
this push:
new 96a1caa72c [#8893] feat(lance-rest-server): Support create/load table
operations for lance (#8911)
96a1caa72c is described below
commit 96a1caa72c4e596eddf12509a8cec7b343f23708
Author: Mini Yu <[email protected]>
AuthorDate: Sat Oct 25 09:14:14 2025 +0800
[#8893] feat(lance-rest-server): Support create/load table operations for
lance (#8911)
### What changes were proposed in this pull request?
1. Support alter lance table
2. Support API for table operations in Lance REST server.
### Why are the changes needed?
It's a feature.
Fix: #8893
### Does this PR introduce _any_ user-facing change?
N/A.
### How was this patch tested?
Test locally with Spark and curl
---------
Co-authored-by: mchades <[email protected]>
---
.../org/apache/gravitino/rel/indexes/Index.java | 45 +++
.../GenericLakehouseCatalogOperations.java | 20 +-
.../lakehouse/lance/LanceCatalogOperations.java | 120 ++++++-
.../lakehouse/lance/LanceDataTypeConverter.java | 69 +++++
.../gravitino/client/TestRelationalCatalog.java | 10 -
.../org/apache/gravitino/dto/rel/TableDTO.java | 2 -
.../gravitino/dto/requests/TableCreateRequest.java | 3 -
.../gravitino/dto/responses/TableResponse.java | 3 -
.../java/org/apache/gravitino/json/JsonUtils.java | 22 +-
.../org/apache/gravitino/dto/rel/TestTableDTO.java | 13 +-
.../catalog/TableOperationDispatcher.java | 51 +++
.../lance/common/ops/LanceNamespaceOperations.java | 3 +
.../lance/common/ops/LanceTableOperations.java | 15 +-
.../common/ops/arrow/ArrowRecordBatchList.java | 40 +++
.../gravitino/GravitinoLanceNamespaceWrapper.java | 344 ++++++++++++++++++++-
.../gravitino/lance/common/TestArrowIPC.java | 83 +++++
lance/lance-rest-server/build.gradle.kts | 2 +
.../service/rest/LanceNamespaceOperations.java | 17 +
.../lance/service/rest/LanceTableOperations.java | 89 ++++--
.../catalog/jdbc/mysql/MySQLMetadataAdapter.java | 2 +
20 files changed, 882 insertions(+), 71 deletions(-)
diff --git a/api/src/main/java/org/apache/gravitino/rel/indexes/Index.java
b/api/src/main/java/org/apache/gravitino/rel/indexes/Index.java
index 4b92977fd5..fd3beb0e67 100644
--- a/api/src/main/java/org/apache/gravitino/rel/indexes/Index.java
+++ b/api/src/main/java/org/apache/gravitino/rel/indexes/Index.java
@@ -67,5 +67,50 @@ public interface Index {
* UNIQUE KEY helps in avoiding redundancy and ensuring data accuracy in
the database.
*/
UNIQUE_KEY,
+
+ // The following index types are specific to Lance, for more, please see:
IndexType in LanceDB
+ /**
+ * SCALAR index is used to optimize searches on scalar data types such as
integers, floats,
+ * strings, etc. Currently, this type is only applicable to Lance.
+ */
+ SCALAR,
+ /**
+ * BTREE index is a balanced tree data structure that maintains sorted
data and allows for
+ * logarithmic time complexity for search, insert, and delete operations.
Currently, this type
+ * is only applicable to Lance.
+ */
+ BTREE,
+ /**
+ * Bitmap index is a type of database index that uses bit arrays (bitmaps)
to represent the
+ * presence or absence of values in a column, enabling efficient querying
and data retrieval.
+ * Currently, this type is only applicable to Lance.
+ */
+ BITMAP,
+ /**
+ * LABEL_LIST index is used to optimize searches on columns containing
lists of labels or tags.
+ * Currently, this type is only applicable to Lance.
+ */
+ LABEL_LIST,
+ /**
+ * INVERTED index is a data structure used to optimize full-text searches
by mapping terms to
+ * their locations within a dataset, allowing for quick retrieval of
documents containing
+ * specific words or phrases. Currently, this type is only applicable to
Lance.
+ */
+ INVERTED,
+ /**
+ * VECTOR index is used to optimize similarity searches in
high-dimensional vector spaces.
+ * Currently, this type is only applicable to Lance.
+ */
+ VECTOR,
+ /** IVF_FLAT (Inverted File with Flat quantization) is an indexing method
used for efficient */
+ IVF_FLAT,
+ /** IVF_SQ (Inverted File with Scalar Quantization) is an indexing method
used for efficient */
+ IVF_SQ,
+ /** IVF_PQ (Inverted File with Product Quantization) is an indexing method
used for efficient */
+ IVF_PQ,
+ /** IVF_HNSW_FLAT */
+ IVF_HNSW_SQ,
+ /** IVF_HNSW_PQ */
+ IVF_HNSW_PQ;
}
}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
index acac35528e..0f85532e8c 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
@@ -258,7 +258,25 @@ public class GenericLakehouseCatalogOperations
@Override
public Table alterTable(NameIdentifier ident, TableChange... changes)
throws NoSuchTableException, IllegalArgumentException {
- throw new UnsupportedOperationException("Not implemented yet.");
+ EntityStore store = GravitinoEnv.getInstance().entityStore();
+ Namespace namespace = ident.namespace();
+ try {
+ GenericTableEntity tableEntity =
+ store.get(ident, Entity.EntityType.TABLE, GenericTableEntity.class);
+ Map<String, String> tableProperties = tableEntity.getProperties();
+ String format = tableProperties.getOrDefault("format", "lance");
+ LakehouseCatalogOperations lakehouseCatalogOperations =
+ SUPPORTED_FORMATS.compute(
+ format,
+ (k, v) ->
+ v == null
+ ? createLakehouseCatalogOperations(
+ format, tableProperties, catalogInfo,
propertiesMetadata)
+ : v);
+ return lakehouseCatalogOperations.alterTable(ident, changes);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to list tables under schema " +
namespace, e);
+ }
}
@Override
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
index 3e1146b7ad..342826a882 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
@@ -20,34 +20,49 @@
package org.apache.gravitino.catalog.lakehouse.lance;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import com.lancedb.lance.Dataset;
import com.lancedb.lance.WriteParams;
+import com.lancedb.lance.index.DistanceType;
+import com.lancedb.lance.index.IndexParams;
+import com.lancedb.lance.index.IndexType;
+import com.lancedb.lance.index.vector.VectorIndexParams;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.ArrowType;
import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.catalog.lakehouse.LakehouseCatalogOperations;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.GenericLakehouseTable;
import org.apache.gravitino.connector.HasPropertyMetadata;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.GenericTableEntity;
import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.GenericTable;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableChange;
import org.apache.gravitino.rel.expressions.distributions.Distribution;
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
import org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.indexes.Index;
+import org.apache.gravitino.rel.indexes.Indexes.IndexImpl;
import org.apache.gravitino.utils.PrincipalUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -133,20 +148,31 @@ public class LanceCatalogOperations implements
LakehouseCatalogOperations {
.map(
col -> {
boolean nullable = col.nullable();
+ ArrowType parentType =
converter.fromGravitino(col.dataType());
+ List<ArrowType> childTypes =
converter.getChildTypes(col.dataType());
+ List<Field> childFields =
+ childTypes.stream()
+ .map(
+ childType ->
+ new org.apache.arrow.vector.types.pojo.Field(
+ "",
+
org.apache.arrow.vector.types.pojo.FieldType.nullable(
+ childType),
+ null))
+ .collect(Collectors.toList());
+
if (nullable) {
return new org.apache.arrow.vector.types.pojo.Field(
col.name(),
- org.apache.arrow.vector.types.pojo.FieldType.nullable(
- converter.fromGravitino(col.dataType())),
- null);
+
org.apache.arrow.vector.types.pojo.FieldType.nullable(parentType),
+ childFields);
}
// not nullable
return new org.apache.arrow.vector.types.pojo.Field(
col.name(),
- org.apache.arrow.vector.types.pojo.FieldType.notNullable(
- converter.fromGravitino(col.dataType())),
- null);
+
org.apache.arrow.vector.types.pojo.FieldType.notNullable(parentType),
+ childFields);
})
.collect(Collectors.toList());
return new org.apache.arrow.vector.types.pojo.Schema(fields);
@@ -155,8 +181,86 @@ public class LanceCatalogOperations implements
LakehouseCatalogOperations {
@Override
public Table alterTable(NameIdentifier ident, TableChange... changes)
throws NoSuchTableException, IllegalArgumentException {
- // Use another PRs to implement alter table for Lance tables
- return null;
+ // Lance only supports adding indexes for now.
+ List<Index> addedIndexes = Lists.newArrayList();
+
+ for (TableChange change : changes) {
+ if (change instanceof TableChange.AddIndex addIndexChange) {
+ Index index =
+ IndexImpl.builder()
+ .withIndexType(addIndexChange.getType())
+ .withName(addIndexChange.getName())
+ .withFieldNames(addIndexChange.getFieldNames())
+ .build();
+ addedIndexes.add(index);
+ }
+ }
+
+ EntityStore entityStore = GravitinoEnv.getInstance().entityStore();
+ GenericTableEntity entity;
+ try {
+ entity = entityStore.get(ident, Entity.EntityType.TABLE,
GenericTableEntity.class);
+ } catch (NoSuchEntityException e) {
+ throw new NoSuchTableException("No such table: %s", ident);
+ } catch (IOException ioe) {
+ throw new RuntimeException("Failed to load table entity for: " + ident,
ioe);
+ }
+
+ String location = entity.getProperties().get("location");
+ try (Dataset dataset = Dataset.open(location, new RootAllocator())) {
+ // For Lance, we only support adding indexes, so in fact, we can't
handle drop index here.
+ for (Index index : addedIndexes) {
+ IndexType indexType = IndexType.valueOf(index.type().name());
+ IndexParams indexParams = getIndexParamsByIndexType(indexType);
+
+ dataset.createIndex(
+ Arrays.stream(index.fieldNames())
+ .map(fieldPath -> String.join(".", fieldPath))
+ .collect(Collectors.toList()),
+ indexType,
+ Optional.of(index.name()),
+ indexParams,
+ true);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to alter Lance table: " + ident, e);
+ }
+
+ GenericTable oldTable = entity.toGenericTable();
+ Index[] newIndexes = oldTable.index();
+ for (Index index : addedIndexes) {
+ newIndexes = ArrayUtils.add(newIndexes, index);
+ }
+
+ return GenericLakehouseTable.builder()
+ .withFormat(oldTable.format())
+ .withProperties(oldTable.properties())
+ .withAuditInfo((AuditInfo) oldTable.auditInfo())
+ .withSortOrders(oldTable.sortOrder())
+ .withPartitioning(oldTable.partitioning())
+ .withDistribution(oldTable.distribution())
+ .withColumns(oldTable.columns())
+ .withIndexes(newIndexes)
+ .withName(oldTable.name())
+ .withComment(oldTable.comment())
+ .build();
+ }
+
+ private IndexParams getIndexParamsByIndexType(IndexType indexType) {
+ switch (indexType) {
+ case SCALAR:
+ return new IndexParams.Builder().build();
+ case VECTOR:
+ // TODO make these parameters configurable
+ int numberOfDimensions = 3; // this value should be determined
dynamically based on the data
+ // Add properties to Index to set this value.
+ return new IndexParams.Builder()
+ .setVectorIndexParams(
+ VectorIndexParams.ivfPq(2, 8, numberOfDimensions,
DistanceType.L2, 2))
+ .build();
+ default:
+ throw new IllegalArgumentException("Unsupported index type: " +
indexType);
+ }
}
@Override
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
index 117863659e..d7966edd5e 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceDataTypeConverter.java
@@ -19,6 +19,8 @@
package org.apache.gravitino.catalog.lakehouse.lance;
+import com.google.common.collect.Lists;
+import java.util.List;
import org.apache.arrow.vector.types.DateUnit;
import org.apache.arrow.vector.types.FloatingPointPrecision;
import org.apache.arrow.vector.types.TimeUnit;
@@ -27,9 +29,11 @@ import org.apache.arrow.vector.types.pojo.ArrowType.Bool;
import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint;
import org.apache.arrow.vector.types.pojo.ArrowType.Int;
import org.apache.gravitino.connector.DataTypeConverter;
+import org.apache.gravitino.json.JsonUtils;
import org.apache.gravitino.rel.types.Type;
import org.apache.gravitino.rel.types.Types;
import org.apache.gravitino.rel.types.Types.FixedType;
+import org.apache.gravitino.rel.types.Types.UnparsedType;
public class LanceDataTypeConverter implements DataTypeConverter<ArrowType,
ArrowType> {
@@ -67,6 +71,30 @@ public class LanceDataTypeConverter implements
DataTypeConverter<ArrowType, Arro
return new ArrowType.FixedSizeBinary(fixedType.length());
case BINARY:
return new ArrowType.Binary();
+ case UNPARSED:
+ String typeStr = ((UnparsedType) type).unparsedType().toString();
+ try {
+ Type t = JsonUtils.anyFieldMapper().readValue(typeStr, Type.class);
+ if (t instanceof Types.ListType) {
+ return ArrowType.List.INSTANCE;
+ } else if (t instanceof Types.MapType) {
+ return new ArrowType.Map(false);
+ } else if (t instanceof Types.StructType) {
+ return ArrowType.Struct.INSTANCE;
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported UnparsedType conversion: " + t.simpleString());
+ }
+ } catch (Exception e) {
+ // FixedSizeListArray(integer, 3)
+ if (typeStr.startsWith("FixedSizeListArray")) {
+ int size =
+ Integer.parseInt(
+ typeStr.substring(typeStr.indexOf(',') + 1,
typeStr.indexOf(')')).trim());
+ return new ArrowType.FixedSizeList(size);
+ }
+ throw new UnsupportedOperationException("Failed to parse
UnparsedType: " + typeStr, e);
+ }
default:
throw new UnsupportedOperationException("Unsupported Gravitino type: "
+ type.name());
}
@@ -116,8 +144,49 @@ public class LanceDataTypeConverter implements
DataTypeConverter<ArrowType, Arro
return Types.StringType.get();
} else if (arrowType instanceof ArrowType.Binary) {
return Types.BinaryType.get();
+ // TODO handle complex types like List, Map, Struct
} else {
throw new UnsupportedOperationException("Unsupported Arrow type: " +
arrowType);
}
}
+
+ public List<ArrowType> getChildTypes(Type parentType) {
+ if (parentType.name() != Type.Name.UNPARSED) {
+ return List.of();
+ }
+
+ List<ArrowType> arrowTypes = Lists.newArrayList();
+ String typeStr = ((UnparsedType) parentType).unparsedType().toString();
+ try {
+ Type t = JsonUtils.anyFieldMapper().readValue(typeStr, Type.class);
+ if (t instanceof Types.ListType listType) {
+ arrowTypes.add(fromGravitino(listType.elementType()));
+ } else if (t instanceof Types.MapType mapType) {
+ arrowTypes.add(fromGravitino(mapType.keyType()));
+ arrowTypes.add(fromGravitino(mapType.valueType()));
+ } else {
+ // TODO support struct type.
+ throw new UnsupportedOperationException(
+ "Unsupported UnparsedType conversion: " + t.simpleString());
+ }
+
+ return arrowTypes;
+ } catch (Exception e) {
+ // FixedSizeListArray(integer, 3)
+
+ try {
+ if (typeStr.startsWith("FixedSizeListArray")) {
+ String type = typeStr.substring(typeStr.indexOf('(') + 1,
typeStr.indexOf(',')).trim();
+ Type childType = JsonUtils.anyFieldMapper().readValue("\"" + type +
"\"", Type.class);
+ arrowTypes.add(fromGravitino(childType));
+
+ return arrowTypes;
+ }
+ } catch (Exception e1) {
+ throw new UnsupportedOperationException("Failed to parse UnparsedType:
" + typeStr, e1);
+ }
+
+ throw new UnsupportedOperationException("Failed to parse UnparsedType: "
+ typeStr, e);
+ }
+ }
}
diff --git
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestRelationalCatalog.java
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestRelationalCatalog.java
index c5e36247bb..ff0a29c032 100644
---
a/clients/client-java/src/test/java/org/apache/gravitino/client/TestRelationalCatalog.java
+++
b/clients/client-java/src/test/java/org/apache/gravitino/client/TestRelationalCatalog.java
@@ -571,16 +571,6 @@ public class TestRelationalCatalog extends TestBase {
tableId, fromDTOs(columns), "comment", emptyMap,
errorPartitioning));
Assertions.assertTrue(ex2.getMessage().contains("not found in table"));
- // Test empty columns
- Throwable ex3 =
- Assertions.assertThrows(
- IllegalArgumentException.class,
- () ->
- tableCatalog.createTable(
- tableId, new Column[0], "comment", emptyMap,
errorPartitioning));
- Assertions.assertTrue(
- ex3.getMessage().contains("\"columns\" field is required and cannot be
empty"));
-
// Test partitioning with assignments
Partitioning[] partitioningWithAssignments = {
RangePartitioningDTO.of(
diff --git a/common/src/main/java/org/apache/gravitino/dto/rel/TableDTO.java
b/common/src/main/java/org/apache/gravitino/dto/rel/TableDTO.java
index c1fb160a38..a23a1cd37c 100644
--- a/common/src/main/java/org/apache/gravitino/dto/rel/TableDTO.java
+++ b/common/src/main/java/org/apache/gravitino/dto/rel/TableDTO.java
@@ -314,8 +314,6 @@ public class TableDTO implements Table {
public TableDTO build() {
Preconditions.checkArgument(name != null && !name.isEmpty(), "name
cannot be null or empty");
Preconditions.checkArgument(audit != null, "audit cannot be null");
- Preconditions.checkArgument(
- columns != null && columns.length > 0, "columns cannot be null or
empty");
return new TableDTO(
name,
diff --git
a/common/src/main/java/org/apache/gravitino/dto/requests/TableCreateRequest.java
b/common/src/main/java/org/apache/gravitino/dto/requests/TableCreateRequest.java
index d6d22ddb9f..8a97c9e2dd 100644
---
a/common/src/main/java/org/apache/gravitino/dto/requests/TableCreateRequest.java
+++
b/common/src/main/java/org/apache/gravitino/dto/requests/TableCreateRequest.java
@@ -122,9 +122,6 @@ public class TableCreateRequest implements RESTRequest {
public void validate() throws IllegalArgumentException {
Preconditions.checkArgument(
StringUtils.isNotBlank(name), "\"name\" field is required and cannot
be empty");
- Preconditions.checkArgument(
- columns != null && columns.length != 0,
- "\"columns\" field is required and cannot be empty");
if (sortOrders != null) {
Arrays.stream(sortOrders).forEach(sortOrder ->
sortOrder.validate(columns));
diff --git
a/common/src/main/java/org/apache/gravitino/dto/responses/TableResponse.java
b/common/src/main/java/org/apache/gravitino/dto/responses/TableResponse.java
index d3dfa21599..a1cc326295 100644
--- a/common/src/main/java/org/apache/gravitino/dto/responses/TableResponse.java
+++ b/common/src/main/java/org/apache/gravitino/dto/responses/TableResponse.java
@@ -63,9 +63,6 @@ public class TableResponse extends BaseResponse {
Preconditions.checkArgument(table != null, "table must not be null");
Preconditions.checkArgument(
StringUtils.isNotBlank(table.name()), "table 'name' must not be null
and empty");
- Preconditions.checkArgument(
- table.columns() != null && table.columns().length > 0,
- "table 'columns' must not be null and empty");
Preconditions.checkArgument(table.auditInfo() != null, "table 'audit' must
not be null");
Preconditions.checkArgument(
table.partitioning() != null, "table 'partitions' must not be null");
diff --git a/common/src/main/java/org/apache/gravitino/json/JsonUtils.java
b/common/src/main/java/org/apache/gravitino/json/JsonUtils.java
index 5fd2ab3f8f..fb9a33f268 100644
--- a/common/src/main/java/org/apache/gravitino/json/JsonUtils.java
+++ b/common/src/main/java/org/apache/gravitino/json/JsonUtils.java
@@ -715,7 +715,7 @@ public class JsonUtils {
String text = node.asText().toLowerCase();
return text.equals(Types.NullType.get().simpleString())
? Types.NullType.get()
- : fromPrimitiveTypeString(text);
+ : fromPrimitiveTypeString(text, node.asText());
}
if (node.isObject() && node.has(TYPE)) {
@@ -834,49 +834,49 @@ public class JsonUtils {
gen.writeEndObject();
}
- private static Type fromPrimitiveTypeString(String typeString) {
- Type.PrimitiveType primitiveType = TYPES.get(typeString);
+ private static Type fromPrimitiveTypeString(String lowerTypeString, String
orignalTypeString) {
+ Type.PrimitiveType primitiveType = TYPES.get(lowerTypeString);
if (primitiveType != null) {
return primitiveType;
}
- Matcher fixed = FIXED.matcher(typeString);
+ Matcher fixed = FIXED.matcher(lowerTypeString);
if (fixed.matches()) {
return Types.FixedType.of(Integer.parseInt(fixed.group(1)));
}
- Matcher fixedChar = FIXEDCHAR.matcher(typeString);
+ Matcher fixedChar = FIXEDCHAR.matcher(lowerTypeString);
if (fixedChar.matches()) {
return Types.FixedCharType.of(Integer.parseInt(fixedChar.group(1)));
}
- Matcher varchar = VARCHAR.matcher(typeString);
+ Matcher varchar = VARCHAR.matcher(lowerTypeString);
if (varchar.matches()) {
return Types.VarCharType.of(Integer.parseInt(varchar.group(1)));
}
- Matcher decimal = DECIMAL.matcher(typeString);
+ Matcher decimal = DECIMAL.matcher(lowerTypeString);
if (decimal.matches()) {
return Types.DecimalType.of(
Integer.parseInt(decimal.group(1)),
Integer.parseInt(decimal.group(2)));
}
- Matcher time = TIME.matcher(typeString);
+ Matcher time = TIME.matcher(lowerTypeString);
if (time.matches()) {
return Types.TimeType.of(Integer.parseInt(time.group(1)));
}
- Matcher timestampTz = TIMESTAMP_TZ.matcher(typeString);
+ Matcher timestampTz = TIMESTAMP_TZ.matcher(lowerTypeString);
if (timestampTz.matches()) {
return
Types.TimestampType.withTimeZone(Integer.parseInt(timestampTz.group(1)));
}
- Matcher timestamp = TIMESTAMP.matcher(typeString);
+ Matcher timestamp = TIMESTAMP.matcher(lowerTypeString);
if (timestamp.matches()) {
return
Types.TimestampType.withoutTimeZone(Integer.parseInt(timestamp.group(1)));
}
- return Types.UnparsedType.of(typeString);
+ return Types.UnparsedType.of(orignalTypeString);
}
private static Types.StructType readStructType(JsonNode node) {
diff --git
a/common/src/test/java/org/apache/gravitino/dto/rel/TestTableDTO.java
b/common/src/test/java/org/apache/gravitino/dto/rel/TestTableDTO.java
index 5f0733c5ea..44de2e03d4 100644
--- a/common/src/test/java/org/apache/gravitino/dto/rel/TestTableDTO.java
+++ b/common/src/test/java/org/apache/gravitino/dto/rel/TestTableDTO.java
@@ -19,11 +19,11 @@
package org.apache.gravitino.dto.rel;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import java.time.Instant;
import org.apache.gravitino.dto.AuditDTO;
import org.apache.gravitino.rel.types.Types;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
public class TestTableDTO {
@@ -32,7 +32,10 @@ public class TestTableDTO {
AuditDTO audit =
AuditDTO.builder().withCreator("creator").withCreateTime(Instant.now()).build();
TableDTO.Builder<?> builder =
TableDTO.builder().withName("t1").withAudit(audit);
- assertThrows(IllegalArgumentException.class, builder::build);
+ Assertions.assertDoesNotThrow(
+ () -> {
+ builder.build();
+ });
}
@Test
@@ -41,7 +44,11 @@ public class TestTableDTO {
AuditDTO.builder().withCreator("creator").withCreateTime(Instant.now()).build();
TableDTO.Builder<?> builder =
TableDTO.builder().withName("t1").withAudit(audit).withColumns(new
ColumnDTO[0]);
- assertThrows(IllegalArgumentException.class, builder::build);
+
+ Assertions.assertDoesNotThrow(
+ () -> {
+ builder.build();
+ });
}
@Test
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index 1cbab0d6ed..27119f0c99 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -262,6 +262,57 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
tableId = te.id();
}
+ if (isGenericLakehouseCatalog(catalogIdent)) {
+ // For generic lakehouse catalog, we only update the table entity
with basic info.
+ GenericTableEntity genericTableEntity =
+ operateOnEntity(
+ ident, id -> store.get(id, TABLE,
GenericTableEntity.class), "GET", tableId);
+ if (genericTableEntity == null) {
+ throw new NoSuchTableException("No such table: %s", ident);
+ }
+
+ GenericTable genericTable = (GenericTable) alteredTable;
+ GenericTableEntity updatedGenericTableEntity =
+ operateOnEntity(
+ ident,
+ id ->
+ store.update(
+ id,
+ GenericTableEntity.class,
+ TABLE,
+ tableEntity ->
+ GenericTableEntity.getBuilder()
+ .withId(tableEntity.id())
+ .withName(alteredTable.name())
+ .withNamespace(getNewNamespace(ident,
changes))
+ .withFormat(genericTable.format())
+ .withAuditInfo(
+ AuditInfo.builder()
+
.withCreator(tableEntity.auditInfo().creator())
+
.withCreateTime(tableEntity.auditInfo().createTime())
+ .withLastModifier(
+
PrincipalUtils.getCurrentPrincipal().getName())
+
.withLastModifiedTime(Instant.now())
+ .build())
+ .withColumns(tableEntity.columns())
+ .withIndexes(genericTable.index())
+
.withDistribution(genericTable.distribution())
+
.withPartitions(genericTable.partitioning())
+ .withSortOrder(genericTable.sortOrder())
+ .withProperties(genericTable.properties())
+ .withComment(genericTable.comment())
+ .build()),
+ "UPDATE",
+ tableId);
+
+ return EntityCombinedTable.of(alteredTable,
updatedGenericTableEntity)
+ .withHiddenProperties(
+ getHiddenPropertyNames(
+ getCatalogIdentifier(ident),
+ HasPropertyMetadata::tablePropertiesMetadata,
+ alteredTable.properties()));
+ }
+
TableEntity updatedTableEntity =
operateOnEntity(
ident,
diff --git
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceNamespaceOperations.java
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceNamespaceOperations.java
index 226de4dbd7..49141665a5 100644
---
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceNamespaceOperations.java
+++
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceNamespaceOperations.java
@@ -25,6 +25,7 @@ import
com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
import com.lancedb.lance.namespace.model.DropNamespaceRequest;
import com.lancedb.lance.namespace.model.DropNamespaceResponse;
import com.lancedb.lance.namespace.model.ListNamespacesResponse;
+import com.lancedb.lance.namespace.model.ListTablesResponse;
import java.util.Map;
public interface LanceNamespaceOperations {
@@ -47,4 +48,6 @@ public interface LanceNamespaceOperations {
DropNamespaceRequest.BehaviorEnum behavior);
void namespaceExists(String namespaceId, String delimiter) throws
LanceNamespaceException;
+
+ ListTablesResponse listTables(String id, String delimiter, String pageToken,
Integer limit);
}
diff --git
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
index 057dce8fb3..b8a967cd30 100644
---
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
+++
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
@@ -18,11 +18,20 @@
*/
package org.apache.gravitino.lance.common.ops;
-import com.lancedb.lance.namespace.model.ListTablesResponse;
+import com.lancedb.lance.namespace.model.CreateTableResponse;
+import com.lancedb.lance.namespace.model.DescribeTableResponse;
+import java.util.Map;
public interface LanceTableOperations {
- ListTablesResponse listTables(String id, String delimiter, String pageToken,
Integer limit);
+ DescribeTableResponse describeTable(String tableId, String delimiter);
- // todo: add more table operation methods
+ CreateTableResponse createTable(
+ String tableId,
+ String mode,
+ String delimiter,
+ String tableLocation,
+ Map<String, String> tableProperties,
+ String rootCatalog,
+ byte[] arrowStreamBody);
}
diff --git
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/arrow/ArrowRecordBatchList.java
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/arrow/ArrowRecordBatchList.java
new file mode 100644
index 0000000000..b0c6a089d5
--- /dev/null
+++
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/arrow/ArrowRecordBatchList.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.lance.common.ops.arrow;
+
+import java.util.List;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+public class ArrowRecordBatchList {
+ private final Schema schema;
+
+ @SuppressWarnings("unused")
+ private final List<VectorSchemaRoot> recordBatches;
+
+ public Schema getSchema() {
+ return schema;
+ }
+
+ public ArrowRecordBatchList(Schema schema, List<VectorSchemaRoot>
recordBatches) {
+ this.schema = schema;
+ this.recordBatches = recordBatches;
+ }
+}
diff --git
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
index cb1b85752a..1f9e41a2cb 100644
---
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
+++
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
@@ -20,7 +20,9 @@ package org.apache.gravitino.lance.common.ops.gravitino;
import static
org.apache.gravitino.lance.common.config.LanceConfig.METALAKE_NAME;
import static
org.apache.gravitino.lance.common.config.LanceConfig.NAMESPACE_URI;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -28,14 +30,22 @@ import com.google.common.collect.Sets;
import com.lancedb.lance.namespace.LanceNamespaceException;
import com.lancedb.lance.namespace.ObjectIdentifier;
import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
+import com.lancedb.lance.namespace.model.CreateNamespaceRequest.ModeEnum;
import com.lancedb.lance.namespace.model.CreateNamespaceResponse;
+import com.lancedb.lance.namespace.model.CreateTableResponse;
import com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
+import com.lancedb.lance.namespace.model.DescribeTableResponse;
import com.lancedb.lance.namespace.model.DropNamespaceRequest;
import com.lancedb.lance.namespace.model.DropNamespaceResponse;
+import com.lancedb.lance.namespace.model.JsonArrowDataType;
+import com.lancedb.lance.namespace.model.JsonArrowField;
+import com.lancedb.lance.namespace.model.JsonArrowSchema;
import com.lancedb.lance.namespace.model.ListNamespacesResponse;
import com.lancedb.lance.namespace.model.ListTablesResponse;
import com.lancedb.lance.namespace.util.CommonUtil;
import com.lancedb.lance.namespace.util.PageUtil;
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -44,11 +54,27 @@ import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.IntFunction;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
+import org.apache.arrow.vector.types.DateUnit;
+import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.ArrowType.Bool;
+import org.apache.arrow.vector.types.pojo.ArrowType.FloatingPoint;
+import org.apache.arrow.vector.types.pojo.ArrowType.Int;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.CatalogChange;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
import org.apache.gravitino.Schema;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.client.GravitinoClient;
@@ -58,10 +84,18 @@ import
org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NonEmptyCatalogException;
import org.apache.gravitino.exceptions.NonEmptySchemaException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
+import org.apache.gravitino.json.JsonUtils;
import org.apache.gravitino.lance.common.config.LanceConfig;
import org.apache.gravitino.lance.common.ops.LanceNamespaceOperations;
import org.apache.gravitino.lance.common.ops.LanceTableOperations;
import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
+import org.apache.gravitino.lance.common.ops.arrow.ArrowRecordBatchList;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.apache.gravitino.rel.types.Type;
+import org.apache.gravitino.rel.types.Types;
+import org.apache.gravitino.rel.types.Types.FixedType;
+import org.apache.gravitino.rel.types.Types.UnparsedType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -245,12 +279,6 @@ public class GravitinoLanceNamespaceWrapper extends
NamespaceWrapper
}
}
- @Override
- public ListTablesResponse listTables(
- String id, String delimiter, String pageToken, Integer limit) {
- throw new UnsupportedOperationException("Not implemented yet");
- }
-
private boolean isLakehouseCatalog(Catalog catalog) {
return catalog.type().equals(Catalog.Type.RELATIONAL)
&& "generic-lakehouse".equals(catalog.provider());
@@ -467,4 +495,308 @@ public class GravitinoLanceNamespaceWrapper extends
NamespaceWrapper
return Stream.concat(setPropertiesStream,
removePropertiesStream).toArray(arrayCreator);
}
+
+ @Override
+ public ListTablesResponse listTables(
+ String id, String delimiter, String pageToken, Integer limit) {
+ ObjectIdentifier nsId = ObjectIdentifier.of(id, Pattern.quote(delimiter));
+ Preconditions.checkArgument(
+ nsId.levels() <= 2, "Expected at most 2-level namespace but got: %s",
nsId.levels());
+ String catalogName = nsId.levelAtListPos(0);
+ Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
+ String schemaName = nsId.levelAtListPos(1);
+ List<String> tables =
+
Arrays.stream(catalog.asTableCatalog().listTables(Namespace.of(schemaName)))
+ .map(ident -> Joiner.on(delimiter).join(catalogName, schemaName,
ident.name()))
+ .collect(Collectors.toList());
+
+ Collections.sort(tables);
+ PageUtil.Page page = PageUtil.splitPage(tables, pageToken,
PageUtil.normalizePageSize(limit));
+ ListNamespacesResponse response = new ListNamespacesResponse();
+ response.setNamespaces(Sets.newHashSet(page.items()));
+ response.setPageToken(page.nextPageToken());
+
+ return new ListTablesResponse()
+ .tables(response.getNamespaces())
+ .pageToken(response.getPageToken());
+ }
+
+ @Override
+ public DescribeTableResponse describeTable(String tableId, String delimiter)
{
+ ObjectIdentifier nsId = ObjectIdentifier.of(tableId,
Pattern.quote(delimiter));
+ Preconditions.checkArgument(
+ nsId.levels() <= 3, "Expected at most 3-level namespace but got: %s",
nsId.levels());
+
+ String catalogName = nsId.levelAtListPos(0);
+ Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
+ NameIdentifier tableIdentifier =
+ NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
+
+ Table table = catalog.asTableCatalog().loadTable(tableIdentifier);
+ DescribeTableResponse response = new DescribeTableResponse();
+ response.setProperties(table.properties());
+ response.setLocation(table.properties().get("location"));
+ response.setSchema(toJsonArrowSchema(table.columns()));
+ return response;
+ }
+
+ private JsonArrowSchema toJsonArrowSchema(Column[] columns) {
+ List<JsonArrowField> fields = new ArrayList<>();
+ for (Column column : columns) {
+ ArrowType arrowType = fromGravitinoType(column.dataType());
+ FieldType fieldType = new FieldType(column.nullable(), arrowType, null,
null);
+ Field field = new Field(column.name(), fieldType, null);
+
+ JsonArrowDataType jsonArrowDataType = new JsonArrowDataType();
+ // other filed needs to be set accordingly such as list, map, struct
+ jsonArrowDataType.setType(arrowType.toString());
+
+ JsonArrowField arrowField = new JsonArrowField();
+ arrowField.setName(field.getName());
+ arrowField.setType(jsonArrowDataType);
+
+ fields.add(arrowField);
+ }
+
+ JsonArrowSchema jsonArrowSchema = new JsonArrowSchema();
+ jsonArrowSchema.setFields(fields);
+ return jsonArrowSchema;
+ }
+
+ @Override
+ public CreateTableResponse createTable(
+ String tableId,
+ String mode,
+ String delimiter,
+ String tableLocation,
+ Map<String, String> tableProperties,
+ String rootCatalog,
+ byte[] arrowStreamBody) {
+ ObjectIdentifier nsId = ObjectIdentifier.of(tableId,
Pattern.quote(delimiter));
+ Preconditions.checkArgument(
+ nsId.levels() <= 3, "Expected at most 3-level namespace but got: %s",
nsId.levels());
+ if (rootCatalog != null) {
+ List<String> levels = nsId.listStyleId();
+ List<String> newLevels = Lists.newArrayList(rootCatalog);
+ newLevels.addAll(levels);
+ nsId = ObjectIdentifier.of(newLevels);
+ }
+
+ // Parser column information.
+ List<Column> columns = Lists.newArrayList();
+ if (arrowStreamBody != null) {
+ ArrowRecordBatchList recordBatchList =
parseArrowIpcStream(arrowStreamBody);
+ columns = extractColumns(recordBatchList);
+ }
+
+ String catalogName = nsId.levelAtListPos(0);
+ Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
+
+ NameIdentifier tableIdentifier =
+ NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
+
+ Map<String, String> createTableProperties =
Maps.newHashMap(tableProperties);
+ createTableProperties.put("location", tableLocation);
+ createTableProperties.put("mode", mode);
+ // TODO considering the mode (create, exist_ok, overwrite)
+
+ ModeEnum createMode = ModeEnum.fromValue(mode.toLowerCase());
+ switch (createMode) {
+ case EXIST_OK:
+ if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
+ CreateTableResponse response = new CreateTableResponse();
+ Table existingTable =
catalog.asTableCatalog().loadTable(tableIdentifier);
+ response.setProperties(existingTable.properties());
+ response.setLocation(existingTable.properties().get("location"));
+ response.setVersion(0L);
+ return response;
+ }
+ break;
+ case CREATE:
+ if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
+ throw LanceNamespaceException.conflict(
+ "Table already exists: " + tableId,
+ SchemaAlreadyExistsException.class.getSimpleName(),
+ tableId,
+ CommonUtil.formatCurrentStackTrace());
+ }
+ break;
+ case OVERWRITE:
+ if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
+ catalog.asTableCatalog().dropTable(tableIdentifier);
+ }
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown mode: " + mode);
+ }
+
+ Table t =
+ catalog
+ .asTableCatalog()
+ .createTable(
+ tableIdentifier,
+ columns.toArray(new Column[0]),
+ tableLocation,
+ createTableProperties);
+
+ CreateTableResponse response = new CreateTableResponse();
+ response.setProperties(t.properties());
+ response.setLocation(tableLocation);
+ response.setVersion(0L);
+ return response;
+ }
+
+ private ArrowRecordBatchList parseArrowIpcStream(byte[] stream) {
+ try (BufferAllocator allocator = new RootAllocator();
+ ByteArrayInputStream bais = new ByteArrayInputStream(stream);
+ ArrowStreamReader reader = new ArrowStreamReader(bais, allocator)) {
+
+ org.apache.arrow.vector.types.pojo.Schema schema =
reader.getVectorSchemaRoot().getSchema();
+ List<VectorSchemaRoot> batches = new ArrayList<>();
+
+ while (reader.loadNextBatch()) {
+ VectorSchemaRoot root = reader.getVectorSchemaRoot();
+ if (root.getRowCount() > 0) {
+ batches.add(root);
+ }
+ }
+ return new ArrowRecordBatchList(schema, batches);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to parse Arrow IPC stream", e);
+ }
+ }
+
+ private List<Column> extractColumns(ArrowRecordBatchList recordBatchList) {
+ List<Column> columns = new ArrayList<>();
+ org.apache.arrow.vector.types.pojo.Schema arrowSchema =
recordBatchList.getSchema();
+
+ for (org.apache.arrow.vector.types.pojo.Field field :
arrowSchema.getFields()) {
+ columns.add(toGravitinoColumn(field));
+ }
+ return columns;
+ }
+
+ private Column toGravitinoColumn(Field field) {
+ return Column.of(
+ field.getName(),
+ toGravitinoType(field),
+ field.getMetadata().get("comment"),
+ field.isNullable(),
+ false,
+ DEFAULT_VALUE_NOT_SET);
+ }
+
+ private ArrowType fromGravitinoType(Type type) {
+ switch (type.name()) {
+ case BOOLEAN:
+ return Bool.INSTANCE;
+ case BYTE:
+ return new Int(8, true);
+ case SHORT:
+ return new Int(16, true);
+ case INTEGER:
+ return new Int(32, true);
+ case LONG:
+ return new Int(64, true);
+ case FLOAT:
+ return new FloatingPoint(FloatingPointPrecision.SINGLE);
+ case DOUBLE:
+ return new FloatingPoint(FloatingPointPrecision.DOUBLE);
+ case DECIMAL:
+ // Lance uses FIXED_SIZE_BINARY for decimal types
+ return new ArrowType.FixedSizeBinary(16); // assuming 16 bytes for
decimal
+ case DATE:
+ return new ArrowType.Date(DateUnit.DAY);
+ case TIME:
+ return new ArrowType.Time(TimeUnit.MILLISECOND, 32);
+ case TIMESTAMP:
+ return new ArrowType.Timestamp(TimeUnit.MILLISECOND, null);
+ case VARCHAR:
+ case STRING:
+ return new ArrowType.Utf8();
+ case FIXED:
+ FixedType fixedType = (FixedType) type;
+ return new ArrowType.FixedSizeBinary(fixedType.length());
+ case BINARY:
+ return new ArrowType.Binary();
+ case UNPARSED:
+ String typeStr = ((UnparsedType) type).unparsedType().toString();
+ try {
+ Type t = JsonUtils.anyFieldMapper().readValue(typeStr, Type.class);
+ if (t instanceof Types.ListType) {
+ return ArrowType.List.INSTANCE;
+ } else if (t instanceof Types.MapType) {
+ return new ArrowType.Map(false);
+ } else if (t instanceof Types.StructType) {
+ return ArrowType.Struct.INSTANCE;
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported UnparsedType conversion: " + t.simpleString());
+ }
+ } catch (Exception e) {
+ // FixedSizeListArray(integer, 3)
+ if (typeStr.startsWith("FixedSizeListArray")) {
+ int size =
+ Integer.parseInt(
+ typeStr.substring(typeStr.indexOf(',') + 1,
typeStr.indexOf(')')).trim());
+ return new ArrowType.FixedSizeList(size);
+ }
+ throw new UnsupportedOperationException("Failed to parse
UnparsedType: " + typeStr, e);
+ }
+ default:
+ throw new UnsupportedOperationException("Unsupported Gravitino type: "
+ type.name());
+ }
+ }
+
+ private Type toGravitinoType(Field field) {
+ FieldType parentType = field.getFieldType();
+ ArrowType arrowType = parentType.getType();
+ if (arrowType instanceof Bool) {
+ return Types.BooleanType.get();
+ } else if (arrowType instanceof Int) {
+ Int intType = (Int) arrowType;
+ switch (intType.getBitWidth()) {
+ case 8 -> {
+ return Types.ByteType.get();
+ }
+ case 16 -> {
+ return Types.ShortType.get();
+ }
+ case 32 -> {
+ return Types.IntegerType.get();
+ }
+ case 64 -> {
+ return Types.LongType.get();
+ }
+ default -> throw new UnsupportedOperationException(
+ "Unsupported Int bit width: " + intType.getBitWidth());
+ }
+ } else if (arrowType instanceof FloatingPoint) {
+ FloatingPoint floatingPoint = (FloatingPoint) arrowType;
+ switch (floatingPoint.getPrecision()) {
+ case SINGLE:
+ return Types.FloatType.get();
+ case DOUBLE:
+ return Types.DoubleType.get();
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported FloatingPoint precision: " +
floatingPoint.getPrecision());
+ }
+ } else if (arrowType instanceof ArrowType.FixedSizeBinary) {
+ ArrowType.FixedSizeBinary fixedSizeBinary = (ArrowType.FixedSizeBinary)
arrowType;
+ return Types.FixedType.of(fixedSizeBinary.getByteWidth());
+ } else if (arrowType instanceof ArrowType.Date) {
+ return Types.DateType.get();
+ } else if (arrowType instanceof ArrowType.Time) {
+ return Types.TimeType.get();
+ } else if (arrowType instanceof ArrowType.Timestamp) {
+ return Types.TimestampType.withoutTimeZone();
+ } else if (arrowType instanceof ArrowType.Utf8) {
+ return Types.StringType.get();
+ } else if (arrowType instanceof ArrowType.Binary) {
+ return Types.BinaryType.get();
+ } else {
+ return Types.UnparsedType.of(arrowType.toString());
+ }
+ }
}
diff --git
a/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/TestArrowIPC.java
b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/TestArrowIPC.java
new file mode 100644
index 0000000000..71f1bfc587
--- /dev/null
+++
b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/TestArrowIPC.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.lance.common;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.Arrays;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.arrow.vector.util.Text;
+import org.junit.jupiter.api.Test;
+
+public class TestArrowIPC {
+
+ private static final String FILENAME = "/tmp/initial_data.arrow";
+ private static final int RECORD_COUNT = 3;
+
+ @Test
+ void testIPC() throws Exception {
+ try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE)) {
+
+ Schema schema =
+ new Schema(
+ Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("value", new ArrowType.Utf8())));
+
+ try (VectorSchemaRoot root = VectorSchemaRoot.create(schema, allocator))
{
+ IntVector idVector = (IntVector) root.getVector("id");
+ VarCharVector valueVector = (VarCharVector) root.getVector("value");
+
+ idVector.allocateNew();
+ valueVector.allocateNew();
+
+ for (int i = 0; i < RECORD_COUNT; i++) {
+ idVector.setSafe(i, i + 1);
+ valueVector.setSafe(i, new Text("Row_" + (i + 1)));
+ }
+
+ idVector.setValueCount(RECORD_COUNT);
+ valueVector.setValueCount(RECORD_COUNT);
+ root.setRowCount(RECORD_COUNT);
+
+ File outFile = new File(FILENAME);
+ try (FileOutputStream fos = new FileOutputStream(outFile);
+ ArrowStreamWriter writer = new ArrowStreamWriter(root, null, fos))
{
+
+ writer.start();
+ writer.writeBatch();
+ writer.end();
+ }
+
+ System.out.println(
+ "✅ Successfully generated Arrow IPC Stream file: " +
outFile.getAbsolutePath());
+ System.out.println("--- Ready for cURL test ---");
+ }
+ }
+ }
+}
diff --git a/lance/lance-rest-server/build.gradle.kts
b/lance/lance-rest-server/build.gradle.kts
index 4e4ca7db3c..0d7a2c9852 100644
--- a/lance/lance-rest-server/build.gradle.kts
+++ b/lance/lance-rest-server/build.gradle.kts
@@ -38,6 +38,8 @@ dependencies {
}
implementation(project(":lance:lance-common"))
+ implementation(libs.lance)
+ implementation(libs.commons.lang3)
implementation(libs.bundles.jetty)
implementation(libs.bundles.jersey)
diff --git
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
index dd548541ad..493d0acace 100644
---
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
+++
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceNamespaceOperations.java
@@ -28,6 +28,7 @@ import
com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
import com.lancedb.lance.namespace.model.DropNamespaceRequest;
import com.lancedb.lance.namespace.model.DropNamespaceResponse;
import com.lancedb.lance.namespace.model.ListNamespacesResponse;
+import com.lancedb.lance.namespace.model.ListTablesResponse;
import java.util.regex.Pattern;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
@@ -159,4 +160,20 @@ public class LanceNamespaceOperations {
return LanceExceptionMapper.toRESTResponse(namespaceId, e);
}
}
+
+ @GET
+ @Path("{id}/table/list")
+ public Response listTables(
+ @PathParam("id") String namespaceId,
+ @DefaultValue("$") @QueryParam("delimiter") String delimiter,
+ @QueryParam("page_token") String pageToken,
+ @QueryParam("limit") Integer limit) {
+ try {
+ ListTablesResponse response =
+ lanceNamespace.asNamespaceOps().listTables(namespaceId, delimiter,
pageToken, limit);
+ return Response.ok(response).build();
+ } catch (Exception e) {
+ return LanceExceptionMapper.toRESTResponse(namespaceId, e);
+ }
+ }
}
diff --git
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
index 1f30d1b326..359fc94c42 100644
---
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
+++
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
@@ -18,27 +18,27 @@
*/
package org.apache.gravitino.lance.service.rest;
-import static
org.apache.gravitino.lance.common.ops.NamespaceWrapper.NAMESPACE_DELIMITER_DEFAULT;
-
-import com.codahale.metrics.annotation.ResponseMetered;
-import com.codahale.metrics.annotation.Timed;
-import com.lancedb.lance.namespace.model.ListTablesResponse;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.lancedb.lance.namespace.model.CreateTableResponse;
+import com.lancedb.lance.namespace.model.DescribeTableResponse;
+import com.lancedb.lance.namespace.util.JsonUtil;
+import java.util.Map;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
-import javax.ws.rs.Encoded;
-import javax.ws.rs.GET;
+import javax.ws.rs.HeaderParam;
+import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
import org.apache.gravitino.lance.service.LanceExceptionMapper;
-import org.apache.gravitino.metrics.MetricNames;
-@Path("/v1/namespace/{id}/table")
+@Path("/v1/table/{id}")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public class LanceTableOperations {
@@ -50,21 +50,68 @@ public class LanceTableOperations {
this.lanceNamespace = lanceNamespace;
}
- @GET
- @Path("/list")
- @Timed(name = "list-tables." + MetricNames.HTTP_PROCESS_DURATION, absolute =
true)
- @ResponseMetered(name = "list-tables", absolute = true)
- public Response listTables(
- @Encoded @PathParam("id") String namespaceId,
- @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) @QueryParam("delimiter")
String delimiter,
- @QueryParam("page_token") String pageToken,
- @QueryParam("limit") Integer limit) {
+ @POST
+ @Path("/describe")
+ public Response describeTable(
+ @PathParam("id") String tableId,
+ @DefaultValue("$") @QueryParam("delimiter") String delimiter) {
+ try {
+ DescribeTableResponse response =
+ lanceNamespace.asTableOps().describeTable(tableId, delimiter);
+ return Response.ok(response).build();
+ } catch (Exception e) {
+ return LanceExceptionMapper.toRESTResponse(tableId, e);
+ }
+ }
+
+ @POST
+ @Path("/create")
+ @Consumes("application/vnd.apache.arrow.stream")
+ @Produces("application/json")
+ public Response createTable(
+ @PathParam("id") String tableId,
+ @QueryParam("mode") @DefaultValue("create") String mode, // create,
exist_ok, overwrite
+ @QueryParam("delimiter") @DefaultValue("$") String delimiter,
+ @HeaderParam("x-lance-table-location") String tableLocation,
+ @HeaderParam("x-lance-table-properties") String tableProperties,
+ @HeaderParam("x-lance-root-catalog") String rootCatalog,
+ byte[] arrowStreamBody) {
+ try {
+ Map<String, String> props =
+ JsonUtil.mapper().readValue(tableProperties, new
TypeReference<Map<String, String>>() {});
+ CreateTableResponse response =
+ lanceNamespace
+ .asTableOps()
+ .createTable(
+ tableId, mode, delimiter, tableLocation, props, rootCatalog,
arrowStreamBody);
+ return Response.ok(response).build();
+ } catch (Exception e) {
+ return LanceExceptionMapper.toRESTResponse(tableId, e);
+ }
+ }
+
+ @POST
+ @Path("/create-empty")
+ public Response createEmptyTable(
+ @PathParam("id") String tableId,
+ @QueryParam("mode") @DefaultValue("create") String mode, // create,
exist_ok, overwrite
+ @QueryParam("delimiter") @DefaultValue("$") String delimiter,
+ @HeaderParam("x-lance-table-location") String tableLocation,
+ @HeaderParam("x-lance-root-catalog") String rootCatalog,
+ @HeaderParam("x-lance-table-properties") String tableProperties) {
try {
- ListTablesResponse response =
- lanceNamespace.asTableOps().listTables(namespaceId, delimiter,
pageToken, limit);
+ Map<String, String> props =
+ StringUtils.isBlank(tableProperties)
+ ? Map.of()
+ : JsonUtil.mapper()
+ .readValue(tableProperties, new TypeReference<Map<String,
String>>() {});
+ CreateTableResponse response =
+ lanceNamespace
+ .asTableOps()
+ .createTable(tableId, mode, delimiter, tableLocation, props,
rootCatalog, null);
return Response.ok(response).build();
} catch (Exception e) {
- return LanceExceptionMapper.toRESTResponse(namespaceId, e);
+ return LanceExceptionMapper.toRESTResponse(tableId, e);
}
}
}
diff --git
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/jdbc/mysql/MySQLMetadataAdapter.java
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/jdbc/mysql/MySQLMetadataAdapter.java
index 1c8d051cd5..d77fd62016 100644
---
a/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/jdbc/mysql/MySQLMetadataAdapter.java
+++
b/trino-connector/trino-connector/src/main/java/org/apache/gravitino/trino/connector/catalog/jdbc/mysql/MySQLMetadataAdapter.java
@@ -283,6 +283,8 @@ public class MySQLMetadataAdapter extends
CatalogConnectorMetadataAdapter {
.collect(Collectors.toUnmodifiableList());
uniqueKeys.add(String.format("%s:%s", index.name(),
Strings.join(columns, ',')));
break;
+ default:
+ throw new UnsupportedOperationException("Unsupported index type: "
+ index.type());
}
}
if (!primaryKeys.isEmpty()) {