Copilot commented on code in PR #9097:
URL: https://github.com/apache/gravitino/pull/9097#discussion_r2563716728
##########
catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java:
##########
@@ -240,43 +259,135 @@ private org.apache.arrow.vector.types.pojo.Schema
convertColumnsToArrowSchema(Co
return new org.apache.arrow.vector.types.pojo.Schema(fields);
}
- private void addLanceIndex(Table table, List<Index> addedIndexes) {
- String location = table.properties().get(Table.PROPERTY_LOCATION);
+ private TableChange[] modifyAddIndex(TableChange[] tableChanges, List<Index>
addIndex) {
+ int indexCount = 0;
+ for (int i = 0; i < tableChanges.length; i++) {
+ TableChange change = tableChanges[i];
+ if (change instanceof TableChange.AddIndex) {
+ Index index = addIndex.get(indexCount++);
+ tableChanges[i] =
+ new TableChange.AddIndex(
+ index.type(), index.name(), index.fieldNames(),
index.properties());
+ }
+ }
+
+ return Arrays.stream(tableChanges)
+ .map(
+ change -> {
+ if (change instanceof TableChange.AddIndex) {
+ TableChange.AddIndex addIndexChange = (TableChange.AddIndex)
change;
+ // Here we can modify the index properties if needed.
+ return new TableChange.AddIndex(
+ addIndexChange.getType(),
+ addIndexChange.getName(),
+ addIndexChange.getFieldNames(),
+ addIndexChange.getProperties());
+ } else {
+ return change;
+ }
+ })
+ .toArray(TableChange[]::new);
+ }
+
+ private List<Index> addLanceIndex(String location, List<Index> addedIndexes)
{
+ List<Index> newIndexes = Lists.newArrayList();
try (Dataset dataset = Dataset.open(location, new RootAllocator())) {
- // For Lance, we only support adding indexes, so in fact, we can't
handle drop index here.
for (Index index : addedIndexes) {
- IndexType indexType = IndexType.valueOf(index.type().name());
- IndexParams indexParams = getIndexParamsByIndexType(indexType);
-
+ IndexType indexType = getIndexType(index);
+ IndexParams indexParams = generateIndexParams(index);
dataset.createIndex(
Arrays.stream(index.fieldNames())
- .map(field -> String.join(".", field))
+ .map(fieldPath -> String.join(".", fieldPath))
.collect(Collectors.toList()),
indexType,
- Optional.of(index.name()),
+ Optional.ofNullable(index.name()),
indexParams,
- true);
+ false);
+
+ // Currently lance only supports single-field indexes, so we can use
the first field name.
+ // Another point is that we need to ensure the index name is not null
in Gravitino, so we
+ // generate a name if it's null as Lance will generate a name
automatically.
+ String lanceIndexName =
+ index.name() == null ? index.fieldNames()[0][0] + "_idx" :
index.name();
+ newIndexes.add(
+ Indexes.of(index.type(), lanceIndexName, index.fieldNames(),
index.properties()));
}
- } catch (Exception e) {
- throw new RuntimeException(
- "Failed to add indexes to Lance dataset at location " + location, e);
+
+ return newIndexes;
}
}
- private IndexParams getIndexParamsByIndexType(IndexType indexType) {
+ private IndexType getIndexType(Index index) {
+ IndexType indexType = IndexType.valueOf(index.type().name());
+ return switch (indexType) {
+ // API only supports these index types for now, but there are more
index types in Lance.
+ case SCALAR, BTREE, INVERTED, BITMAP -> indexType;
+ // According to real test, we need to map IVF_SQ/IVF_PQ/IVF_HNSW_SQ to
VECTOR type in Lance,
+ // or it will throw exception. For more, please refer to
+ // https://github.com/lancedb/lance/issues/5182#issuecomment-3524372490
+ case IVF_FLAT, IVF_PQ, IVF_HNSW_SQ -> IndexType.VECTOR;
+ default -> throw new IllegalArgumentException("Unsupported index type: "
+ indexType);
+ };
+ }
+
+ private IndexParams generateIndexParams(Index index) {
+ IndexType indexType = IndexType.valueOf(index.type().name());
+
+ String configJson = index.properties().get(LANCE_INDEX_CONFIG_KEY);
+ CreateTableIndexRequest request;
+ try {
+ request = JsonUtil.mapper().readValue(configJson,
CreateTableIndexRequest.class);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Lance index config is invalid", e);
+ }
+
+ IndexParams.Builder builder = IndexParams.builder();
switch (indexType) {
- case SCALAR:
- return IndexParams.builder().build();
- case VECTOR:
- // TODO make these parameters configurable
- int numberOfDimensions = 3; // this value should be determined
dynamically based on the data
- // Add properties to Index to set this value.
- return IndexParams.builder()
- .setVectorIndexParams(
- VectorIndexParams.ivfPq(2, 8, numberOfDimensions,
DistanceType.L2, 2))
- .build();
- default:
- throw new IllegalArgumentException("Unsupported index type: " +
indexType);
+ case SCALAR, BTREE, INVERTED, BITMAP -> builder.setScalarIndexParams(
+ ScalarIndexParams.create(indexType.name()));
+
+ case IVF_FLAT -> builder.setVectorIndexParams(
+ new VectorIndexParams.Builder(new IvfBuildParams.Builder().build())
+ .setDistanceType(toLanceDistanceType(request.getMetricType()))
+ .build());
+ case IVF_PQ -> builder.setVectorIndexParams(
+ new VectorIndexParams.Builder(new IvfBuildParams.Builder().build())
+ .setDistanceType(toLanceDistanceType(request.getMetricType()))
+ .setPqParams(
+ new PQBuildParams.Builder()
+ .setNumSubVectors(1) // others use default value.
+ .build())
+ .build());
+
+ case IVF_SQ -> builder.setVectorIndexParams(
+ new VectorIndexParams.Builder(new IvfBuildParams.Builder().build())
+ .setDistanceType(toLanceDistanceType(request.getMetricType()))
+ .setSqParams(new SQBuildParams.Builder().build())
+ .build());
+
+ case IVF_HNSW_SQ -> builder.setVectorIndexParams(
+ new VectorIndexParams.Builder(new IvfBuildParams.Builder().build())
+ .setDistanceType(toLanceDistanceType(request.getMetricType()))
+ .setHnswParams(new HnswBuildParams.Builder().build())
+ .build());
+ default -> throw new IllegalArgumentException("Unsupported index type: "
+ indexType);
}
Review Comment:
Missing handling for `IVF_HNSW_PQ` index type. The enum is defined in the
API but is not handled in either the `getIndexType()` method (line 328) or the
`generateIndexParams()` method (line 345). This will cause an
`IllegalArgumentException` when users try to create an `IVF_HNSW_PQ` index.
```suggestion
case IVF_HNSW_PQ -> builder.setVectorIndexParams(
new VectorIndexParams.Builder(new IvfBuildParams.Builder().build())
.setDistanceType(toLanceDistanceType(request.getMetricType()))
.setHnswParams(new HnswBuildParams.Builder().build())
.setPqParams(
new PQBuildParams.Builder()
.setNumSubVectors(1) // others use default value.
.build())
.build());
default -> throw new IllegalArgumentException("Unsupported index type:
" + indexType);
```
##########
lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java:
##########
@@ -213,4 +282,22 @@ private void validateDescribeTableRequest(
// We will ignore the id in the request body since it's already provided
in the path param
// No specific fields to validate for now
}
+
+ private void validateCreateTableIndexRequest(
+ @SuppressWarnings("unused") CreateTableIndexRequest request) {
+ // We will ignore the id in the request body since it's already provided
in the path param
+ // No specific fields to validate for now
Review Comment:
The `validateCreateTableIndexRequest` method doesn't validate required
fields like `indexType` and `column`. For a create index request, these fields
should be validated as non-null before processing to provide better error
messages to API consumers.
```suggestion
if (request == null) {
throw new IllegalArgumentException("CreateTableIndexRequest must not
be null");
}
if (request.getIndexType() == null) {
throw new IllegalArgumentException("Field 'indexType' is required and
must not be null");
}
if (request.getColumn() == null) {
throw new IllegalArgumentException("Field 'column' is required and
must not be null");
}
```
##########
catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java:
##########
@@ -240,43 +259,135 @@ private org.apache.arrow.vector.types.pojo.Schema
convertColumnsToArrowSchema(Co
return new org.apache.arrow.vector.types.pojo.Schema(fields);
}
- private void addLanceIndex(Table table, List<Index> addedIndexes) {
- String location = table.properties().get(Table.PROPERTY_LOCATION);
+ private TableChange[] modifyAddIndex(TableChange[] tableChanges, List<Index>
addIndex) {
+ int indexCount = 0;
+ for (int i = 0; i < tableChanges.length; i++) {
+ TableChange change = tableChanges[i];
+ if (change instanceof TableChange.AddIndex) {
+ Index index = addIndex.get(indexCount++);
+ tableChanges[i] =
+ new TableChange.AddIndex(
+ index.type(), index.name(), index.fieldNames(),
index.properties());
+ }
+ }
+
+ return Arrays.stream(tableChanges)
+ .map(
+ change -> {
+ if (change instanceof TableChange.AddIndex) {
+ TableChange.AddIndex addIndexChange = (TableChange.AddIndex)
change;
+ // Here we can modify the index properties if needed.
+ return new TableChange.AddIndex(
+ addIndexChange.getType(),
+ addIndexChange.getName(),
+ addIndexChange.getFieldNames(),
+ addIndexChange.getProperties());
+ } else {
+ return change;
+ }
+ })
+ .toArray(TableChange[]::new);
+ }
+
+ private List<Index> addLanceIndex(String location, List<Index> addedIndexes)
{
+ List<Index> newIndexes = Lists.newArrayList();
try (Dataset dataset = Dataset.open(location, new RootAllocator())) {
- // For Lance, we only support adding indexes, so in fact, we can't
handle drop index here.
for (Index index : addedIndexes) {
- IndexType indexType = IndexType.valueOf(index.type().name());
- IndexParams indexParams = getIndexParamsByIndexType(indexType);
-
+ IndexType indexType = getIndexType(index);
+ IndexParams indexParams = generateIndexParams(index);
dataset.createIndex(
Arrays.stream(index.fieldNames())
- .map(field -> String.join(".", field))
+ .map(fieldPath -> String.join(".", fieldPath))
.collect(Collectors.toList()),
indexType,
- Optional.of(index.name()),
+ Optional.ofNullable(index.name()),
indexParams,
- true);
+ false);
+
+ // Currently lance only supports single-field indexes, so we can use
the first field name.
+ // Another point is that we need to ensure the index name is not null
in Gravitino, so we
+ // generate a name if it's null as Lance will generate a name
automatically.
+ String lanceIndexName =
+ index.name() == null ? index.fieldNames()[0][0] + "_idx" :
index.name();
+ newIndexes.add(
+ Indexes.of(index.type(), lanceIndexName, index.fieldNames(),
index.properties()));
}
- } catch (Exception e) {
- throw new RuntimeException(
- "Failed to add indexes to Lance dataset at location " + location, e);
+
+ return newIndexes;
}
}
- private IndexParams getIndexParamsByIndexType(IndexType indexType) {
+ private IndexType getIndexType(Index index) {
+ IndexType indexType = IndexType.valueOf(index.type().name());
+ return switch (indexType) {
+ // API only supports these index types for now, but there are more
index types in Lance.
+ case SCALAR, BTREE, INVERTED, BITMAP -> indexType;
+ // According to real test, we need to map IVF_SQ/IVF_PQ/IVF_HNSW_SQ to
VECTOR type in Lance,
+ // or it will throw exception. For more, please refer to
+ // https://github.com/lancedb/lance/issues/5182#issuecomment-3524372490
+ case IVF_FLAT, IVF_PQ, IVF_HNSW_SQ -> IndexType.VECTOR;
+ default -> throw new IllegalArgumentException("Unsupported index type: "
+ indexType);
+ };
+ }
+
+ private IndexParams generateIndexParams(Index index) {
+ IndexType indexType = IndexType.valueOf(index.type().name());
+
+ String configJson = index.properties().get(LANCE_INDEX_CONFIG_KEY);
+ CreateTableIndexRequest request;
+ try {
+ request = JsonUtil.mapper().readValue(configJson,
CreateTableIndexRequest.class);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Lance index config is invalid", e);
+ }
+
+ IndexParams.Builder builder = IndexParams.builder();
switch (indexType) {
- case SCALAR:
- return IndexParams.builder().build();
- case VECTOR:
- // TODO make these parameters configurable
- int numberOfDimensions = 3; // this value should be determined
dynamically based on the data
- // Add properties to Index to set this value.
- return IndexParams.builder()
- .setVectorIndexParams(
- VectorIndexParams.ivfPq(2, 8, numberOfDimensions,
DistanceType.L2, 2))
- .build();
- default:
- throw new IllegalArgumentException("Unsupported index type: " +
indexType);
+ case SCALAR, BTREE, INVERTED, BITMAP -> builder.setScalarIndexParams(
+ ScalarIndexParams.create(indexType.name()));
+
+ case IVF_FLAT -> builder.setVectorIndexParams(
+ new VectorIndexParams.Builder(new IvfBuildParams.Builder().build())
+ .setDistanceType(toLanceDistanceType(request.getMetricType()))
+ .build());
+ case IVF_PQ -> builder.setVectorIndexParams(
+ new VectorIndexParams.Builder(new IvfBuildParams.Builder().build())
+ .setDistanceType(toLanceDistanceType(request.getMetricType()))
+ .setPqParams(
+ new PQBuildParams.Builder()
+ .setNumSubVectors(1) // others use default value.
+ .build())
+ .build());
+
+ case IVF_SQ -> builder.setVectorIndexParams(
+ new VectorIndexParams.Builder(new IvfBuildParams.Builder().build())
+ .setDistanceType(toLanceDistanceType(request.getMetricType()))
+ .setSqParams(new SQBuildParams.Builder().build())
+ .build());
+
+ case IVF_HNSW_SQ -> builder.setVectorIndexParams(
+ new VectorIndexParams.Builder(new IvfBuildParams.Builder().build())
+ .setDistanceType(toLanceDistanceType(request.getMetricType()))
+ .setHnswParams(new HnswBuildParams.Builder().build())
+ .build());
+ default -> throw new IllegalArgumentException("Unsupported index type: "
+ indexType);
Review Comment:
Missing handling for `LABEL_LIST` index type. The enum is defined in the API
(line 101) but is not handled in the `getIndexType()` or
`generateIndexParams()` methods, which will cause failures when users try to
create this type of index.
##########
catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java:
##########
@@ -240,43 +259,135 @@ private org.apache.arrow.vector.types.pojo.Schema
convertColumnsToArrowSchema(Co
return new org.apache.arrow.vector.types.pojo.Schema(fields);
}
- private void addLanceIndex(Table table, List<Index> addedIndexes) {
- String location = table.properties().get(Table.PROPERTY_LOCATION);
+ private TableChange[] modifyAddIndex(TableChange[] tableChanges, List<Index>
addIndex) {
+ int indexCount = 0;
+ for (int i = 0; i < tableChanges.length; i++) {
+ TableChange change = tableChanges[i];
+ if (change instanceof TableChange.AddIndex) {
+ Index index = addIndex.get(indexCount++);
+ tableChanges[i] =
+ new TableChange.AddIndex(
+ index.type(), index.name(), index.fieldNames(),
index.properties());
+ }
+ }
+
+ return Arrays.stream(tableChanges)
+ .map(
+ change -> {
+ if (change instanceof TableChange.AddIndex) {
+ TableChange.AddIndex addIndexChange = (TableChange.AddIndex)
change;
+ // Here we can modify the index properties if needed.
+ return new TableChange.AddIndex(
+ addIndexChange.getType(),
+ addIndexChange.getName(),
+ addIndexChange.getFieldNames(),
+ addIndexChange.getProperties());
+ } else {
+ return change;
+ }
+ })
+ .toArray(TableChange[]::new);
+ }
+
+ private List<Index> addLanceIndex(String location, List<Index> addedIndexes)
{
+ List<Index> newIndexes = Lists.newArrayList();
try (Dataset dataset = Dataset.open(location, new RootAllocator())) {
- // For Lance, we only support adding indexes, so in fact, we can't
handle drop index here.
for (Index index : addedIndexes) {
- IndexType indexType = IndexType.valueOf(index.type().name());
- IndexParams indexParams = getIndexParamsByIndexType(indexType);
-
+ IndexType indexType = getIndexType(index);
+ IndexParams indexParams = generateIndexParams(index);
dataset.createIndex(
Arrays.stream(index.fieldNames())
- .map(field -> String.join(".", field))
+ .map(fieldPath -> String.join(".", fieldPath))
.collect(Collectors.toList()),
indexType,
- Optional.of(index.name()),
+ Optional.ofNullable(index.name()),
indexParams,
- true);
+ false);
+
+ // Currently lance only supports single-field indexes, so we can use
the first field name.
+ // Another point is that we need to ensure the index name is not null
in Gravitino, so we
+ // generate a name if it's null as Lance will generate a name
automatically.
+ String lanceIndexName =
+ index.name() == null ? index.fieldNames()[0][0] + "_idx" :
index.name();
+ newIndexes.add(
+ Indexes.of(index.type(), lanceIndexName, index.fieldNames(),
index.properties()));
}
- } catch (Exception e) {
- throw new RuntimeException(
- "Failed to add indexes to Lance dataset at location " + location, e);
+
+ return newIndexes;
}
}
Review Comment:
Resource leak: `RootAllocator` is created but never closed. The
try-with-resources statement only closes the `Dataset`, but the `RootAllocator`
passed to `Dataset.open()` should also be properly managed. Consider creating
the allocator in its own try-with-resources block or ensuring it's closed after
use to prevent memory leaks.
##########
catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java:
##########
@@ -240,43 +259,135 @@ private org.apache.arrow.vector.types.pojo.Schema
convertColumnsToArrowSchema(Co
return new org.apache.arrow.vector.types.pojo.Schema(fields);
}
- private void addLanceIndex(Table table, List<Index> addedIndexes) {
- String location = table.properties().get(Table.PROPERTY_LOCATION);
+ private TableChange[] modifyAddIndex(TableChange[] tableChanges, List<Index>
addIndex) {
+ int indexCount = 0;
+ for (int i = 0; i < tableChanges.length; i++) {
+ TableChange change = tableChanges[i];
+ if (change instanceof TableChange.AddIndex) {
+ Index index = addIndex.get(indexCount++);
+ tableChanges[i] =
+ new TableChange.AddIndex(
+ index.type(), index.name(), index.fieldNames(),
index.properties());
+ }
+ }
+
+ return Arrays.stream(tableChanges)
+ .map(
+ change -> {
+ if (change instanceof TableChange.AddIndex) {
+ TableChange.AddIndex addIndexChange = (TableChange.AddIndex)
change;
+ // Here we can modify the index properties if needed.
+ return new TableChange.AddIndex(
+ addIndexChange.getType(),
+ addIndexChange.getName(),
+ addIndexChange.getFieldNames(),
+ addIndexChange.getProperties());
+ } else {
+ return change;
+ }
+ })
+ .toArray(TableChange[]::new);
+ }
+
+ private List<Index> addLanceIndex(String location, List<Index> addedIndexes)
{
+ List<Index> newIndexes = Lists.newArrayList();
try (Dataset dataset = Dataset.open(location, new RootAllocator())) {
- // For Lance, we only support adding indexes, so in fact, we can't
handle drop index here.
for (Index index : addedIndexes) {
- IndexType indexType = IndexType.valueOf(index.type().name());
- IndexParams indexParams = getIndexParamsByIndexType(indexType);
-
+ IndexType indexType = getIndexType(index);
+ IndexParams indexParams = generateIndexParams(index);
dataset.createIndex(
Arrays.stream(index.fieldNames())
- .map(field -> String.join(".", field))
+ .map(fieldPath -> String.join(".", fieldPath))
.collect(Collectors.toList()),
indexType,
- Optional.of(index.name()),
+ Optional.ofNullable(index.name()),
indexParams,
- true);
+ false);
+
+ // Currently lance only supports single-field indexes, so we can use
the first field name.
+ // Another point is that we need to ensure the index name is not null
in Gravitino, so we
+ // generate a name if it's null as Lance will generate a name
automatically.
+ String lanceIndexName =
+ index.name() == null ? index.fieldNames()[0][0] + "_idx" :
index.name();
+ newIndexes.add(
+ Indexes.of(index.type(), lanceIndexName, index.fieldNames(),
index.properties()));
}
- } catch (Exception e) {
- throw new RuntimeException(
- "Failed to add indexes to Lance dataset at location " + location, e);
+
+ return newIndexes;
}
}
- private IndexParams getIndexParamsByIndexType(IndexType indexType) {
+ private IndexType getIndexType(Index index) {
+ IndexType indexType = IndexType.valueOf(index.type().name());
+ return switch (indexType) {
+ // API only supports these index types for now, but there are more
index types in Lance.
+ case SCALAR, BTREE, INVERTED, BITMAP -> indexType;
+ // According to real test, we need to map IVF_SQ/IVF_PQ/IVF_HNSW_SQ to
VECTOR type in Lance,
+ // or it will throw exception. For more, please refer to
+ // https://github.com/lancedb/lance/issues/5182#issuecomment-3524372490
+ case IVF_FLAT, IVF_PQ, IVF_HNSW_SQ -> IndexType.VECTOR;
+ default -> throw new IllegalArgumentException("Unsupported index type: "
+ indexType);
+ };
+ }
+
+ private IndexParams generateIndexParams(Index index) {
+ IndexType indexType = IndexType.valueOf(index.type().name());
+
+ String configJson = index.properties().get(LANCE_INDEX_CONFIG_KEY);
+ CreateTableIndexRequest request;
+ try {
+ request = JsonUtil.mapper().readValue(configJson,
CreateTableIndexRequest.class);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Lance index config is invalid", e);
+ }
Review Comment:
Missing validation for `LANCE_INDEX_CONFIG_KEY` property. If
`index.properties()` is null or doesn't contain the key, `configJson` will be
null, leading to a `NullPointerException` on line 339. Add null check
validation before attempting to deserialize the config.
##########
lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceTableOperations.java:
##########
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.lance.service.rest;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import com.lancedb.lance.namespace.LanceNamespaceException;
+import com.lancedb.lance.namespace.model.CreateEmptyTableRequest;
+import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
+import com.lancedb.lance.namespace.model.CreateTableIndexRequest;
+import com.lancedb.lance.namespace.model.CreateTableIndexResponse;
+import com.lancedb.lance.namespace.model.CreateTableResponse;
+import com.lancedb.lance.namespace.model.DeregisterTableRequest;
+import com.lancedb.lance.namespace.model.DeregisterTableResponse;
+import com.lancedb.lance.namespace.model.DescribeTableRequest;
+import com.lancedb.lance.namespace.model.DescribeTableResponse;
+import com.lancedb.lance.namespace.model.ErrorResponse;
+import com.lancedb.lance.namespace.model.IndexContent;
+import com.lancedb.lance.namespace.model.ListTableIndicesRequest;
+import com.lancedb.lance.namespace.model.ListTableIndicesResponse;
+import com.lancedb.lance.namespace.model.RegisterTableRequest;
+import com.lancedb.lance.namespace.model.RegisterTableResponse;
+import java.io.IOException;
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.core.Application;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.gravitino.lance.common.ops.LanceTableOperations;
+import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
+import org.apache.gravitino.rest.RESTUtils;
+import org.glassfish.jersey.internal.inject.AbstractBinder;
+import org.glassfish.jersey.server.ResourceConfig;
+import org.glassfish.jersey.test.JerseyTest;
+import org.glassfish.jersey.test.TestProperties;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class TestLanceTableOperations extends JerseyTest {
+
+ private static NamespaceWrapper namespaceWrapper =
mock(NamespaceWrapper.class);
+ private static org.apache.gravitino.lance.common.ops.LanceTableOperations
tableOps =
+ mock(LanceTableOperations.class);
+
+ @Override
+ protected Application configure() {
+ try {
+ forceSet(
+ TestProperties.CONTAINER_PORT,
String.valueOf(RESTUtils.findAvailablePort(2000, 3000)));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ ResourceConfig resourceConfig = new ResourceConfig();
+
resourceConfig.register(org.apache.gravitino.lance.service.rest.LanceTableOperations.class);
+ resourceConfig.register(
+ new AbstractBinder() {
+ @Override
+ protected void configure() {
+ bind(namespaceWrapper).to(NamespaceWrapper.class).ranked(2);
+
bindFactory(MockServletRequestFactory.class).to(HttpServletRequest.class);
+ }
+ });
+
+ return resourceConfig;
+ }
+
+ @BeforeAll
+ public static void setup() {
+ when(namespaceWrapper.asTableOps()).thenReturn(tableOps);
+ }
+
+ @Test
+ void testCreateTable() {
+ String tableIds = "catalog.scheme.create_table";
+ String delimiter = ".";
+
+ // Test normal
+ CreateTableResponse createTableResponse = new CreateTableResponse();
+ when(tableOps.createTable(any(), any(), any(), any(), any(), any()))
+ .thenReturn(createTableResponse);
+
+ byte[] bytes = new byte[] {0x01, 0x02, 0x03};
+ Response resp =
+ target(String.format("/v1/table/%s/create", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(bytes, "application/vnd.apache.arrow.stream"));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ // Test illegal argument
+ when(tableOps.createTable(any(), any(), any(), any(), any(), any()))
+ .thenThrow(new IllegalArgumentException("Illegal argument"));
+
+ resp =
+ target(String.format("/v1/table/%s/create", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(bytes, "application/vnd.apache.arrow.stream"));
+ Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ // Test runtime exception
+ Mockito.reset(tableOps);
+ when(tableOps.createTable(any(), any(), any(), any(), any(), any()))
+ .thenThrow(new RuntimeException("Runtime exception"));
+ resp =
+ target(String.format("/v1/table/%s/create", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(bytes, "application/vnd.apache.arrow.stream"));
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ ErrorResponse errorResp = resp.readEntity(ErrorResponse.class);
+ Assertions.assertEquals("Runtime exception", errorResp.getError());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp.getType());
+ }
+
+ @Test
+ void testCreateEmptyTable() {
+ String tableIds = "catalog.scheme.create_empty_table";
+ String delimiter = ".";
+
+ // Test normal
+ CreateEmptyTableResponse createTableResponse = new
CreateEmptyTableResponse();
+ createTableResponse.setLocation("/path/to/table");
+ createTableResponse.setProperties(ImmutableMap.of("key", "value"));
+ when(tableOps.createEmptyTable(any(), any(), any(),
any())).thenReturn(createTableResponse);
+
+ CreateEmptyTableRequest tableRequest = new CreateEmptyTableRequest();
+ tableRequest.setLocation("/path/to/table");
+
+ Response resp =
+ target(String.format("/v1/table/%s/create-empty", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ CreateEmptyTableResponse response =
resp.readEntity(CreateEmptyTableResponse.class);
+ Assertions.assertEquals(createTableResponse.getLocation(),
response.getLocation());
+ Assertions.assertEquals(createTableResponse.getProperties(),
response.getProperties());
+
+ Mockito.reset(tableOps);
+ // Test illegal argument
+ when(tableOps.createEmptyTable(any(), any(), any(), any()))
+ .thenThrow(new IllegalArgumentException("Illegal argument"));
+
+ resp =
+ target(String.format("/v1/table/%s/create-empty", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+ Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ // Test runtime exception
+ Mockito.reset(tableOps);
+ when(tableOps.createEmptyTable(any(), any(), any(), any()))
+ .thenThrow(new RuntimeException("Runtime exception"));
+ resp =
+ target(String.format("/v1/table/%s/create-empty", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ ErrorResponse errorResp = resp.readEntity(ErrorResponse.class);
+ Assertions.assertEquals("Runtime exception", errorResp.getError());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp.getType());
+ }
+
+ @Test
+ void testRegisterTable() {
+ String tableIds = "catalog.scheme.register_table";
+ String delimiter = ".";
+
+ // Test normal
+ RegisterTableResponse registerTableResponse = new RegisterTableResponse();
+ registerTableResponse.setLocation("/path/to/registered_table");
+ registerTableResponse.setProperties(ImmutableMap.of("key", "value"));
+ when(tableOps.registerTable(any(), any(), any(),
any())).thenReturn(registerTableResponse);
+
+ RegisterTableRequest tableRequest = new RegisterTableRequest();
+ tableRequest.setLocation("/path/to/registered_table");
+ tableRequest.setMode(RegisterTableRequest.ModeEnum.CREATE);
+
+ Response resp =
+ target(String.format("/v1/table/%s/register", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ RegisterTableResponse response =
resp.readEntity(RegisterTableResponse.class);
+ Assertions.assertEquals(registerTableResponse.getLocation(),
response.getLocation());
+ Assertions.assertEquals(registerTableResponse.getProperties(),
response.getProperties());
+
+ // Test illegal argument
+ Mockito.reset(tableOps);
+ when(tableOps.registerTable(any(), any(), any(), any()))
+ .thenThrow(new IllegalArgumentException("Illegal argument"));
+ resp =
+ target(String.format("/v1/table/%s/register", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+ Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ // Test runtime exception
+ Mockito.reset(tableOps);
+ when(tableOps.registerTable(any(), any(), any(), any()))
+ .thenThrow(new RuntimeException("Runtime exception"));
+ resp =
+ target(String.format("/v1/table/%s/register", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ ErrorResponse errorResp = resp.readEntity(ErrorResponse.class);
+ Assertions.assertEquals("Runtime exception", errorResp.getError());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp.getType());
+ }
+
+ @Test
+ void testDeregisterTable() {
+ String tableIds = "catalog.scheme.deregister_table";
+ String delimiter = ".";
+
+ DeregisterTableRequest tableRequest = new DeregisterTableRequest();
+
+ DeregisterTableResponse deregisterTableResponse = new
DeregisterTableResponse();
+ deregisterTableResponse.setLocation("/path/to/deregistered_table");
+ deregisterTableResponse.setProperties(ImmutableMap.of("key", "value"));
+ // Test normal
+ when(tableOps.deregisterTable(any(),
any())).thenReturn(deregisterTableResponse);
+
+ Response resp =
+ target(String.format("/v1/table/%s/deregister", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ DeregisterTableResponse response =
resp.readEntity(DeregisterTableResponse.class);
+ Assertions.assertEquals(deregisterTableResponse.getLocation(),
response.getLocation());
+ Assertions.assertEquals(deregisterTableResponse.getProperties(),
response.getProperties());
+
+ // Test illegal argument
+ Mockito.reset(tableOps);
+ when(tableOps.deregisterTable(any(), any()))
+ .thenThrow(new IllegalArgumentException("Illegal argument"));
+ resp =
+ target(String.format("/v1/table/%s/deregister", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+ Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ // Test not found exception
+ Mockito.reset(tableOps);
+ when(tableOps.deregisterTable(any(), any()))
+ .thenThrow(
+ LanceNamespaceException.notFound(
+ "Table not found", "NoSuchTableException", tableIds, ""));
+ resp =
+ target(String.format("/v1/table/%s/deregister", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+ Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
resp.getStatus());
+
+ // Test runtime exception
+ Mockito.reset(tableOps);
+ when(tableOps.deregisterTable(any(), any()))
+ .thenThrow(new RuntimeException("Runtime exception"));
+ resp =
+ target(String.format("/v1/table/%s/deregister", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ ErrorResponse errorResp = resp.readEntity(ErrorResponse.class);
+ Assertions.assertEquals("Runtime exception", errorResp.getError());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp.getType());
+ }
+
+ @Test
+ void testDescribeTable() {
+ String tableIds = "catalog.scheme.describe_table";
+ String delimiter = ".";
+
+ // Test normal
+ DescribeTableResponse createTableResponse = new DescribeTableResponse();
+ createTableResponse.setLocation("/path/to/describe_table");
+ createTableResponse.setProperties(ImmutableMap.of("key", "value"));
+ when(tableOps.describeTable(any(), any(),
any())).thenReturn(createTableResponse);
+
+ DescribeTableRequest tableRequest = new DescribeTableRequest();
+ Response resp =
+ target(String.format("/v1/table/%s/describe", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ DescribeTableResponse response =
resp.readEntity(DescribeTableResponse.class);
+ Assertions.assertEquals(createTableResponse.getLocation(),
response.getLocation());
+ Assertions.assertEquals(createTableResponse.getProperties(),
response.getProperties());
+
+ // Test not found exception
+ Mockito.reset(tableOps);
+ when(tableOps.describeTable(any(), any(), any()))
+ .thenThrow(
+ LanceNamespaceException.notFound(
+ "Table not found", "NoSuchTableException", tableIds, ""));
+ resp =
+ target(String.format("/v1/table/%s/describe", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+ Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
resp.getStatus());
+
+ // Test runtime exception
+ Mockito.reset(tableOps);
+ when(tableOps.describeTable(any(), any(), any()))
+ .thenThrow(new RuntimeException("Runtime exception"));
+ resp =
+ target(String.format("/v1/table/%s/describe", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ ErrorResponse errorResp = resp.readEntity(ErrorResponse.class);
+ Assertions.assertEquals("Runtime exception", errorResp.getError());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp.getType());
+ }
+
+ @Test
+ void testCreateTableIndex() {
+ String tableIds = "catalog.scheme.to_create_index_table";
+ String delimiter = ".";
+
+ // Test normal
+ CreateTableIndexRequest tableRequest = new CreateTableIndexRequest();
+
+ CreateTableIndexResponse response = new CreateTableIndexResponse();
+ response.setProperties(ImmutableMap.of("key", "value"));
+ when(tableOps.createTableIndex(any(), any(), any())).thenReturn(response);
+
+ Response resp =
+ target(String.format("/v1/table/%s/create_index", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ response = resp.readEntity(CreateTableIndexResponse.class);
+ Assertions.assertEquals(response.getProperties(),
response.getProperties());
Review Comment:
Meaningless assertion on line 404: `response.getProperties()` is compared
with itself (`response.getProperties()`). This should likely be
`Assertions.assertEquals(ImmutableMap.of("key", "value"),
response.getProperties())` to verify the expected properties were returned.
```suggestion
Assertions.assertEquals(ImmutableMap.of("key", "value"),
response.getProperties());
```
##########
catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java:
##########
@@ -240,43 +259,135 @@ private org.apache.arrow.vector.types.pojo.Schema
convertColumnsToArrowSchema(Co
return new org.apache.arrow.vector.types.pojo.Schema(fields);
}
- private void addLanceIndex(Table table, List<Index> addedIndexes) {
- String location = table.properties().get(Table.PROPERTY_LOCATION);
+ private TableChange[] modifyAddIndex(TableChange[] tableChanges, List<Index>
addIndex) {
+ int indexCount = 0;
+ for (int i = 0; i < tableChanges.length; i++) {
+ TableChange change = tableChanges[i];
+ if (change instanceof TableChange.AddIndex) {
+ Index index = addIndex.get(indexCount++);
+ tableChanges[i] =
+ new TableChange.AddIndex(
+ index.type(), index.name(), index.fieldNames(),
index.properties());
+ }
+ }
+
+ return Arrays.stream(tableChanges)
+ .map(
+ change -> {
+ if (change instanceof TableChange.AddIndex) {
+ TableChange.AddIndex addIndexChange = (TableChange.AddIndex)
change;
+ // Here we can modify the index properties if needed.
+ return new TableChange.AddIndex(
+ addIndexChange.getType(),
+ addIndexChange.getName(),
+ addIndexChange.getFieldNames(),
+ addIndexChange.getProperties());
+ } else {
+ return change;
+ }
+ })
+ .toArray(TableChange[]::new);
+ }
+
+ private List<Index> addLanceIndex(String location, List<Index> addedIndexes)
{
+ List<Index> newIndexes = Lists.newArrayList();
try (Dataset dataset = Dataset.open(location, new RootAllocator())) {
- // For Lance, we only support adding indexes, so in fact, we can't
handle drop index here.
for (Index index : addedIndexes) {
- IndexType indexType = IndexType.valueOf(index.type().name());
- IndexParams indexParams = getIndexParamsByIndexType(indexType);
-
+ IndexType indexType = getIndexType(index);
+ IndexParams indexParams = generateIndexParams(index);
dataset.createIndex(
Arrays.stream(index.fieldNames())
- .map(field -> String.join(".", field))
+ .map(fieldPath -> String.join(".", fieldPath))
.collect(Collectors.toList()),
indexType,
- Optional.of(index.name()),
+ Optional.ofNullable(index.name()),
indexParams,
- true);
+ false);
+
+ // Currently lance only supports single-field indexes, so we can use
the first field name.
+ // Another point is that we need to ensure the index name is not null
in Gravitino, so we
+ // generate a name if it's null as Lance will generate a name
automatically.
+ String lanceIndexName =
+ index.name() == null ? index.fieldNames()[0][0] + "_idx" :
index.name();
+ newIndexes.add(
+ Indexes.of(index.type(), lanceIndexName, index.fieldNames(),
index.properties()));
}
- } catch (Exception e) {
- throw new RuntimeException(
- "Failed to add indexes to Lance dataset at location " + location, e);
+
+ return newIndexes;
}
}
- private IndexParams getIndexParamsByIndexType(IndexType indexType) {
+ private IndexType getIndexType(Index index) {
+ IndexType indexType = IndexType.valueOf(index.type().name());
+ return switch (indexType) {
+ // API only supports these index types for now, but there are more
index types in Lance.
+ case SCALAR, BTREE, INVERTED, BITMAP -> indexType;
+ // According to real test, we need to map IVF_SQ/IVF_PQ/IVF_HNSW_SQ to
VECTOR type in Lance,
+ // or it will throw exception. For more, please refer to
+ // https://github.com/lancedb/lance/issues/5182#issuecomment-3524372490
+ case IVF_FLAT, IVF_PQ, IVF_HNSW_SQ -> IndexType.VECTOR;
+ default -> throw new IllegalArgumentException("Unsupported index type: "
+ indexType);
+ };
+ }
+
+ private IndexParams generateIndexParams(Index index) {
+ IndexType indexType = IndexType.valueOf(index.type().name());
+
+ String configJson = index.properties().get(LANCE_INDEX_CONFIG_KEY);
+ CreateTableIndexRequest request;
+ try {
+ request = JsonUtil.mapper().readValue(configJson,
CreateTableIndexRequest.class);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Lance index config is invalid", e);
+ }
+
+ IndexParams.Builder builder = IndexParams.builder();
switch (indexType) {
- case SCALAR:
- return IndexParams.builder().build();
- case VECTOR:
- // TODO make these parameters configurable
- int numberOfDimensions = 3; // this value should be determined
dynamically based on the data
- // Add properties to Index to set this value.
- return IndexParams.builder()
- .setVectorIndexParams(
- VectorIndexParams.ivfPq(2, 8, numberOfDimensions,
DistanceType.L2, 2))
- .build();
- default:
- throw new IllegalArgumentException("Unsupported index type: " +
indexType);
+ case SCALAR, BTREE, INVERTED, BITMAP -> builder.setScalarIndexParams(
+ ScalarIndexParams.create(indexType.name()));
+
+ case IVF_FLAT -> builder.setVectorIndexParams(
+ new VectorIndexParams.Builder(new IvfBuildParams.Builder().build())
+ .setDistanceType(toLanceDistanceType(request.getMetricType()))
+ .build());
+ case IVF_PQ -> builder.setVectorIndexParams(
+ new VectorIndexParams.Builder(new IvfBuildParams.Builder().build())
+ .setDistanceType(toLanceDistanceType(request.getMetricType()))
+ .setPqParams(
+ new PQBuildParams.Builder()
+ .setNumSubVectors(1) // others use default value.
+ .build())
+ .build());
+
+ case IVF_SQ -> builder.setVectorIndexParams(
+ new VectorIndexParams.Builder(new IvfBuildParams.Builder().build())
+ .setDistanceType(toLanceDistanceType(request.getMetricType()))
+ .setSqParams(new SQBuildParams.Builder().build())
+ .build());
Review Comment:
Inconsistency in handling `IVF_SQ` index type. In `getIndexType()` method
(line 328), `IVF_SQ` is not mapped to `IndexType.VECTOR`, but in
`generateIndexParams()` method (line 362), there is a case for `IVF_SQ`. This
mismatch will cause the switch statement at line 362 to never be reached for
`IVF_SQ` indexes since they would be transformed to `VECTOR` type earlier.
##########
catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java:
##########
@@ -240,43 +259,135 @@ private org.apache.arrow.vector.types.pojo.Schema
convertColumnsToArrowSchema(Co
return new org.apache.arrow.vector.types.pojo.Schema(fields);
}
- private void addLanceIndex(Table table, List<Index> addedIndexes) {
- String location = table.properties().get(Table.PROPERTY_LOCATION);
+ private TableChange[] modifyAddIndex(TableChange[] tableChanges, List<Index>
addIndex) {
+ int indexCount = 0;
+ for (int i = 0; i < tableChanges.length; i++) {
+ TableChange change = tableChanges[i];
+ if (change instanceof TableChange.AddIndex) {
+ Index index = addIndex.get(indexCount++);
+ tableChanges[i] =
+ new TableChange.AddIndex(
+ index.type(), index.name(), index.fieldNames(),
index.properties());
+ }
+ }
+
+ return Arrays.stream(tableChanges)
+ .map(
+ change -> {
+ if (change instanceof TableChange.AddIndex) {
+ TableChange.AddIndex addIndexChange = (TableChange.AddIndex)
change;
+ // Here we can modify the index properties if needed.
+ return new TableChange.AddIndex(
+ addIndexChange.getType(),
+ addIndexChange.getName(),
+ addIndexChange.getFieldNames(),
+ addIndexChange.getProperties());
+ } else {
+ return change;
+ }
+ })
+ .toArray(TableChange[]::new);
+ }
+
+ private List<Index> addLanceIndex(String location, List<Index> addedIndexes)
{
+ List<Index> newIndexes = Lists.newArrayList();
try (Dataset dataset = Dataset.open(location, new RootAllocator())) {
- // For Lance, we only support adding indexes, so in fact, we can't
handle drop index here.
for (Index index : addedIndexes) {
- IndexType indexType = IndexType.valueOf(index.type().name());
- IndexParams indexParams = getIndexParamsByIndexType(indexType);
-
+ IndexType indexType = getIndexType(index);
+ IndexParams indexParams = generateIndexParams(index);
dataset.createIndex(
Arrays.stream(index.fieldNames())
- .map(field -> String.join(".", field))
+ .map(fieldPath -> String.join(".", fieldPath))
.collect(Collectors.toList()),
indexType,
- Optional.of(index.name()),
+ Optional.ofNullable(index.name()),
indexParams,
- true);
+ false);
+
+ // Currently lance only supports single-field indexes, so we can use
the first field name.
+ // Another point is that we need to ensure the index name is not null
in Gravitino, so we
+ // generate a name if it's null as Lance will generate a name
automatically.
+ String lanceIndexName =
+ index.name() == null ? index.fieldNames()[0][0] + "_idx" :
index.name();
+ newIndexes.add(
+ Indexes.of(index.type(), lanceIndexName, index.fieldNames(),
index.properties()));
}
- } catch (Exception e) {
- throw new RuntimeException(
- "Failed to add indexes to Lance dataset at location " + location, e);
+
+ return newIndexes;
}
}
- private IndexParams getIndexParamsByIndexType(IndexType indexType) {
+ private IndexType getIndexType(Index index) {
+ IndexType indexType = IndexType.valueOf(index.type().name());
+ return switch (indexType) {
+ // API only supports these index types for now, but there are more
index types in Lance.
+ case SCALAR, BTREE, INVERTED, BITMAP -> indexType;
+ // According to real test, we need to map IVF_SQ/IVF_PQ/IVF_HNSW_SQ to
VECTOR type in Lance,
+ // or it will throw exception. For more, please refer to
+ // https://github.com/lancedb/lance/issues/5182#issuecomment-3524372490
+ case IVF_FLAT, IVF_PQ, IVF_HNSW_SQ -> IndexType.VECTOR;
+ default -> throw new IllegalArgumentException("Unsupported index type: "
+ indexType);
+ };
+ }
+
+ private IndexParams generateIndexParams(Index index) {
+ IndexType indexType = IndexType.valueOf(index.type().name());
+
+ String configJson = index.properties().get(LANCE_INDEX_CONFIG_KEY);
+ CreateTableIndexRequest request;
+ try {
+ request = JsonUtil.mapper().readValue(configJson,
CreateTableIndexRequest.class);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Lance index config is invalid", e);
+ }
+
+ IndexParams.Builder builder = IndexParams.builder();
switch (indexType) {
- case SCALAR:
- return IndexParams.builder().build();
- case VECTOR:
- // TODO make these parameters configurable
- int numberOfDimensions = 3; // this value should be determined
dynamically based on the data
- // Add properties to Index to set this value.
- return IndexParams.builder()
- .setVectorIndexParams(
- VectorIndexParams.ivfPq(2, 8, numberOfDimensions,
DistanceType.L2, 2))
- .build();
- default:
- throw new IllegalArgumentException("Unsupported index type: " +
indexType);
+ case SCALAR, BTREE, INVERTED, BITMAP -> builder.setScalarIndexParams(
+ ScalarIndexParams.create(indexType.name()));
+
+ case IVF_FLAT -> builder.setVectorIndexParams(
+ new VectorIndexParams.Builder(new IvfBuildParams.Builder().build())
+ .setDistanceType(toLanceDistanceType(request.getMetricType()))
+ .build());
+ case IVF_PQ -> builder.setVectorIndexParams(
+ new VectorIndexParams.Builder(new IvfBuildParams.Builder().build())
+ .setDistanceType(toLanceDistanceType(request.getMetricType()))
+ .setPqParams(
+ new PQBuildParams.Builder()
+ .setNumSubVectors(1) // others use default value.
+ .build())
+ .build());
+
+ case IVF_SQ -> builder.setVectorIndexParams(
+ new VectorIndexParams.Builder(new IvfBuildParams.Builder().build())
+ .setDistanceType(toLanceDistanceType(request.getMetricType()))
+ .setSqParams(new SQBuildParams.Builder().build())
+ .build());
+
+ case IVF_HNSW_SQ -> builder.setVectorIndexParams(
+ new VectorIndexParams.Builder(new IvfBuildParams.Builder().build())
+ .setDistanceType(toLanceDistanceType(request.getMetricType()))
+ .setHnswParams(new HnswBuildParams.Builder().build())
+ .build());
+ default -> throw new IllegalArgumentException("Unsupported index type: "
+ indexType);
Review Comment:
Invalid switch statement syntax. The switch cases use arrow syntax (`->`)
but are missing the arrow for the first case (line 346-347). The correct syntax
should be either traditional `case SCALAR: case BTREE: case INVERTED: case
BITMAP:` with breaks, or `case SCALAR, BTREE, INVERTED, BITMAP ->` with an
arrow. The current mixed syntax will cause compilation errors.
##########
lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java:
##########
@@ -266,6 +279,78 @@ public DeregisterTableResponse deregisterTable(String
tableId, String delimiter)
return response;
}
+ // Note: Create indices is an asynchronous operation in Lance Lakehouse.
+ @Override
+ public CreateTableIndexResponse createTableIndex(
+ String tableId, String delimiter, CreateTableIndexRequest request) {
+ ObjectIdentifier nsId = ObjectIdentifier.of(tableId,
Pattern.quote(delimiter));
+ Preconditions.checkArgument(
+ nsId.levels() == 3, "Expected at 3-level namespace but got: %s",
nsId.levels());
+
+ String catalogName = nsId.levelAtListPos(0);
+ Catalog catalog =
namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);
+
+ NameIdentifier tableIdentifier =
+ NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
+
+ // There seem to be missing index name in the request, using
Optional.empty() for now.
+ TableChange tableChange = buildAddIndex(Optional.empty(), request);
Review Comment:
The comment on line 296 states "There seem to be missing index name in the
request", but it's unclear what the actual issue is or what action should be
taken. If the index name is truly missing, consider either adding validation to
throw an error, or documenting why an empty Optional is acceptable. The current
implementation is ambiguous.
```suggestion
// Validate that the index name is present in the request.
String indexName = request.getName();
Preconditions.checkArgument(
indexName != null && !indexName.trim().isEmpty(),
"Index name must be provided in CreateTableIndexRequest");
TableChange tableChange = buildAddIndex(Optional.of(indexName), request);
```
##########
lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java:
##########
@@ -192,13 +198,76 @@ public Response deregisterTable(
}
}
+ @POST
+ @Path("/create_index")
+ @Timed(name = "create-table-index." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
+ @ResponseMetered(name = "create-table-index", absolute = true)
+ public Response createTableIndex(
+ @PathParam("id") String tableId,
+ @QueryParam("delimiter") @DefaultValue(NAMESPACE_DELIMITER_DEFAULT)
String delimiter,
+ @Context HttpHeaders headers,
+ CreateTableIndexRequest createTableIndexRequest) {
+ try {
+ validateCreateTableIndexRequest(createTableIndexRequest);
+ CreateTableIndexResponse response =
+ lanceNamespace.asTableOps().createTableIndex(tableId, delimiter,
createTableIndexRequest);
+ return Response.ok(response).build();
+ } catch (Exception e) {
+ return LanceExceptionMapper.toRESTResponse(tableId, e);
+ }
+ }
+
+ @POST
+ @Path("/index/list")
+ @Timed(name = "list-table-indices." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
+ @ResponseMetered(name = "list-table-indices", absolute = true)
+ public Response listTableIndices(
+ @PathParam("id") String tableId,
+ @QueryParam("delimiter") @DefaultValue(NAMESPACE_DELIMITER_DEFAULT)
String delimiter,
+ @Context HttpHeaders headers,
+ ListTableIndicesRequest listTableIndicesRequest) {
+ try {
+ validateListTableIndicesRequest(listTableIndicesRequest);
+ ListTableIndicesResponse response =
+ lanceNamespace.asTableOps().listTableIndices(tableId, delimiter,
listTableIndicesRequest);
+ return Response.ok(response).build();
+ } catch (Exception e) {
+ return LanceExceptionMapper.toRESTResponse(tableId, e);
+ }
+ }
+
+ @POST
+ @Path("/index/{index_name}/stats")
+ @Timed(name = "describe-table-index." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
+ @ResponseMetered(name = "describe-table-index", absolute = true)
+ public Response describeTableIndex(
+ @PathParam("id") String tableId,
+ @PathParam("index_name") String indexName,
+ @QueryParam("delimiter") @DefaultValue(NAMESPACE_DELIMITER_DEFAULT)
String delimiter,
+ @Context HttpHeaders headers,
+ DescribeTableIndexStatsRequest describeTableIndexStatsRequest) {
+ try {
+ validateDescribeTableIndexRequest(describeTableIndexStatsRequest);
+ DescribeTableIndexStatsResponse response =
+ lanceNamespace
+ .asTableOps()
+ .describeTableIndexStats(
+ tableId, delimiter, indexName,
describeTableIndexStatsRequest);
+ return Response.ok(response).build();
+ } catch (Exception e) {
+ return LanceExceptionMapper.toRESTResponse(tableId, e);
+ }
+ }
Review Comment:
Missing test coverage for the `describeTableIndex` endpoint. The new
endpoint at `/index/{index_name}/stats` (lines 239-260) has been added but
there are no corresponding unit tests in `TestLanceTableOperations.java` to
verify its behavior, error handling, and exception cases.
##########
catalogs/catalog-lakehouse-generic/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceTableOperations.java:
##########
@@ -240,43 +259,135 @@ private org.apache.arrow.vector.types.pojo.Schema
convertColumnsToArrowSchema(Co
return new org.apache.arrow.vector.types.pojo.Schema(fields);
}
- private void addLanceIndex(Table table, List<Index> addedIndexes) {
- String location = table.properties().get(Table.PROPERTY_LOCATION);
+ private TableChange[] modifyAddIndex(TableChange[] tableChanges, List<Index>
addIndex) {
+ int indexCount = 0;
+ for (int i = 0; i < tableChanges.length; i++) {
+ TableChange change = tableChanges[i];
+ if (change instanceof TableChange.AddIndex) {
+ Index index = addIndex.get(indexCount++);
+ tableChanges[i] =
+ new TableChange.AddIndex(
+ index.type(), index.name(), index.fieldNames(),
index.properties());
+ }
+ }
+
+ return Arrays.stream(tableChanges)
+ .map(
+ change -> {
+ if (change instanceof TableChange.AddIndex) {
+ TableChange.AddIndex addIndexChange = (TableChange.AddIndex)
change;
+ // Here we can modify the index properties if needed.
+ return new TableChange.AddIndex(
+ addIndexChange.getType(),
+ addIndexChange.getName(),
+ addIndexChange.getFieldNames(),
+ addIndexChange.getProperties());
+ } else {
+ return change;
+ }
+ })
+ .toArray(TableChange[]::new);
+ }
Review Comment:
The `modifyAddIndex` method performs redundant operations. Lines 263-272
modify the `tableChanges` array in place, but then lines 274-289 create a new
array by streaming through `tableChanges` and applying the same transformation
again. This results in duplicated logic and unnecessary processing. Consider
removing either the initial loop (263-272) or simplifying the stream operation
to avoid redundant transformations.
##########
lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java:
##########
@@ -266,6 +279,78 @@ public DeregisterTableResponse deregisterTable(String
tableId, String delimiter)
return response;
}
+ // Note: Create indices is an asynchronous operation in Lance Lakehouse.
+ @Override
+ public CreateTableIndexResponse createTableIndex(
+ String tableId, String delimiter, CreateTableIndexRequest request) {
+ ObjectIdentifier nsId = ObjectIdentifier.of(tableId,
Pattern.quote(delimiter));
+ Preconditions.checkArgument(
+ nsId.levels() == 3, "Expected at 3-level namespace but got: %s",
nsId.levels());
+
+ String catalogName = nsId.levelAtListPos(0);
+ Catalog catalog =
namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);
+
+ NameIdentifier tableIdentifier =
+ NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
+
+ // There seem to be missing index name in the request, using
Optional.empty() for now.
+ TableChange tableChange = buildAddIndex(Optional.empty(), request);
+
+ Table table = catalog.asTableCatalog().alterTable(tableIdentifier,
tableChange);
+ CreateTableIndexResponse response = new CreateTableIndexResponse();
+ response.setId(nsId.listStyleId());
+ response.setLocation(table.properties().get(LANCE_LOCATION));
+ response.setProperties(table.properties());
+ return response;
+ }
+
+ @Override
+ public ListTableIndicesResponse listTableIndices(
+ String tableId, String delimiter, ListTableIndicesRequest request) {
+ ObjectIdentifier nsId = ObjectIdentifier.of(tableId,
Pattern.quote(delimiter));
+ Preconditions.checkArgument(
+ nsId.levels() == 3, "Expected at 3-level namespace but got: %s",
nsId.levels());
+
+ String catalogName = nsId.levelAtListPos(0);
+ Catalog catalog =
namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);
+ NameIdentifier tableIdentifier =
+ NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
+
+ Table table = catalog.asTableCatalog().loadTable(tableIdentifier);
+ ListTableIndicesResponse response = new ListTableIndicesResponse();
+ List<IndexContent> contents =
+ Arrays.stream(table.index())
+ .map(
+ index -> {
+ IndexContent content = new IndexContent();
+ List<String> columnNames = new ArrayList<>();
+ for (int i = 0; i < index.fieldNames().length; i++) {
+ columnNames.add(index.fieldNames()[i][0]);
+ }
+ content.setColumns(columnNames);
+ content.setIndexName(index.name());
+
+ // Currently there is no API to get index status, setting
all indexes to READY for
+ // simplicity. So please note that this status may not
reflect the actual index
+ // status.
+ content.setIndexUuid(index.name());
Review Comment:
Setting `indexUuid` to the index name is semantically incorrect. A UUID
should be a unique identifier, not simply a copy of the name. If a proper UUID
is not available, consider using a different field name or generating an actual
UUID for this field.
##########
lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java:
##########
@@ -686,6 +715,239 @@ void testDeregisterNonExistingTable() {
Assertions.assertEquals(406, lanceNamespaceException.getCode());
}
+ @Test
+ void testCreateTableIndex() throws IOException {
+ catalog = createCatalog(CATALOG_NAME);
+ createSchema();
+ List<String> ids = List.of(CATALOG_NAME, SCHEMA_NAME,
"non_existing_table");
+
+ // We need to create a table first;
+ org.apache.arrow.vector.types.pojo.Schema schema =
+ new org.apache.arrow.vector.types.pojo.Schema(
+ Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("value", new ArrowType.Utf8()),
+ new Field(
+ "vector",
+ FieldType.nullable(new ArrowType.FixedSizeList(4)),
+ ImmutableList.of(
+ Field.nullable(
+ "fake", new
ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE))))));
+ byte[] body = ArrowUtils.generateIpcStream(schema);
+
+ CreateTableRequest request = new CreateTableRequest();
+ request.setId(ids);
+ request.setLocation(tempDir + "/" + "table_for_index/");
+ request.setProperties(
+ ImmutableMap.of(
+ "key1", "v1",
+ "lance.storage.a", "value_a",
+ "lance.storage.c", "value_c"));
+
+ CreateTableResponse response = ns.createTable(request, body);
+ Assertions.assertEquals(request.getLocation(), response.getLocation());
+
+ writeDataToLance(request.getLocation());
+
+ // Now try to create Btree index on an existing table
+ CreateTableIndexRequest createTableIndexRequest = new
CreateTableIndexRequest();
+ createTableIndexRequest.setId(ids);
+ createTableIndexRequest.setIndexType(IndexTypeEnum.BTREE);
+ createTableIndexRequest.setColumn("id");
+ createTableIndexRequest.setMetricType(MetricTypeEnum.L2);
+ CreateTableIndexResponse createTableIndexResponse =
+ Assertions.assertDoesNotThrow(() ->
ns.createTableIndex(createTableIndexRequest));
+ Assertions.assertNotNull(createTableIndexResponse);
+
+ // Now try to create bitmap index on an existing table
+ createTableIndexRequest.setIndexType(IndexTypeEnum.BITMAP);
+ createTableIndexRequest.setColumn("value");
+ createTableIndexResponse =
+ Assertions.assertDoesNotThrow(() ->
ns.createTableIndex(createTableIndexRequest));
+ Assertions.assertNotNull(createTableIndexResponse);
+ List<String> indices = listIndices(request.getLocation());
+ Assertions.assertEquals(2, indices.size());
+ // Now try to create vector index on an existing table
+ createTableIndexRequest.setIndexType(IndexTypeEnum.IVF_FLAT);
+ createTableIndexRequest.setColumn("vector");
+ createTableIndexResponse =
+ Assertions.assertDoesNotThrow(() ->
ns.createTableIndex(createTableIndexRequest));
+ Assertions.assertNotNull(createTableIndexResponse);
+
+ ListTableIndicesRequest listTableIndicesRequest = new
ListTableIndicesRequest();
+ listTableIndicesRequest.setId(ids);
+ ListTableIndicesResponse listTableIndicesResponse =
+ ns.listTableIndices(listTableIndicesRequest);
+ Assertions.assertEquals(3, listTableIndicesResponse.getIndexes().size());
+ List<String> expectedIndexName = listIndices(request.getLocation());
+ for (IndexContent indexContent : listTableIndicesResponse.getIndexes()) {
+ Assertions.assertTrue(
+ expectedIndexName.contains(indexContent.getIndexName()),
+ "Index name should be in the expected index names.");
+ if (indexContent.getIndexName().equals("id_idx")) {
+ Assertions.assertEquals("id", indexContent.getColumns().get(0));
+ } else if (indexContent.getIndexName().equals("value_idx")) {
+ Assertions.assertEquals("value", indexContent.getColumns().get(0));
+ } else if (indexContent.getIndexName().equals("vector_idx")) {
+ Assertions.assertEquals("vector", indexContent.getColumns().get(0));
+ }
+ }
+
+ // create another table to test other index types
+ ids = List.of(CATALOG_NAME, SCHEMA_NAME, "table_for_other_indexes");
+ request.setId(ids);
+ request.setLocation(tempDir + "/" + "table_for_other_indexes/");
+ response = ns.createTable(request, body);
+ Assertions.assertEquals(request.getLocation(), response.getLocation());
+ writeDataToLance(request.getLocation());
+
+ // Now try to create FTS index on an existing table
+ createTableIndexRequest.setId(ids);
+ createTableIndexRequest.setIndexType(IndexTypeEnum.FTS);
+ createTableIndexRequest.setColumn("value");
+
+ LanceNamespaceException exception =
+ Assertions.assertThrows(
+ LanceNamespaceException.class, () ->
ns.createTableIndex(createTableIndexRequest));
+ // com.lancedb.lance.index.IndexType does not have FTS yet, so it should
throw exception
+ Assertions.assertTrue(
+ exception.getMessage().contains("No enum constant
com.lancedb.lance.index.IndexType.FTS"));
+ }
Review Comment:
The test expects FTS index to fail with a specific error message about "No
enum constant com.lancedb.lance.index.IndexType.FTS". However, the `FTS` enum
has been added to `Index.IndexType` in the API changes. This test should be
updated to either handle FTS properly if Lance now supports it, or the FTS enum
should be removed from the API if it's not yet supported.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]