This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch branch-lance-namepspace-dev
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-lance-namepspace-dev by
this push:
new 32d9642e8a [#9015] improvement(lance-rest): Add UTs and ITs for Lance
rest table operations (#9016)
32d9642e8a is described below
commit 32d9642e8a656f1a8e7b0f7da71201951a28b865
Author: Mini Yu <[email protected]>
AuthorDate: Wed Nov 5 21:47:05 2025 +0800
[#9015] improvement(lance-rest): Add UTs and ITs for Lance rest table
operations (#9016)
### What changes were proposed in this pull request?
Add ITs for lance rest service and test lance table operations.
### Why are the changes needed?
To make code robust
Fix: #9015
### Does this PR introduce _any_ user-facing change?
N/A.
### How was this patch tested?
It's self a test.
---
.../GenericLakehouseCatalogOperations.java | 15 +-
.../lakehouse/lance/LanceCatalogOperations.java | 29 +-
.../lance/common/ops/LanceTableOperations.java | 60 +-
...java => GravitinoLanceNameSpaceOperations.java} | 345 +----------
.../gravitino/GravitinoLanceNamespaceWrapper.java | 633 +--------------------
.../gravitino/GravitinoLanceTableOperations.java | 293 ++++++++++
.../gravitino/lance/common/utils/ArrowUtils.java} | 44 +-
.../lance/common/utils/LanceConstants.java} | 9 +-
.../lance/common/utils/LancePropertiesUtils.java} | 30 +-
.../lance/common/utils/SerializationUtils.java | 53 ++
.../lance/common/utils/TestArrowUtils.java} | 27 +-
.../lance/service/rest/LanceTableOperations.java | 83 ++-
.../lance/integration/test/LanceRESTServiceIT.java | 303 ++++++++++
.../service/rest/TestLanceNamespaceOperations.java | 301 ++++++++++
.../src/test/resources/log4j2.properties | 73 +++
15 files changed, 1273 insertions(+), 1025 deletions(-)
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
index 60c0958c14..3860b52d57 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
@@ -33,6 +33,7 @@ import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Entity;
+import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
@@ -304,6 +305,8 @@ public class GenericLakehouseCatalogOperations
getLakehouseCatalogOperations(newProperties);
return lanceCatalogOperations.createTable(
ident, columns, comment, newProperties, partitions, distribution,
sortOrders, indexes);
+ } catch (EntityAlreadyExistsException e) {
+ throw new TableAlreadyExistsException(e, "Table %s already exists",
ident);
} catch (IOException e) {
throw new RuntimeException("Failed to create table " + ident, e);
}
@@ -366,22 +369,22 @@ public class GenericLakehouseCatalogOperations
}
@Override
- public boolean dropTable(NameIdentifier ident) {
+ public boolean purgeTable(NameIdentifier ident) {
try {
TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE,
TableEntity.class);
LakehouseCatalogOperations lakehouseCatalogOperations =
getLakehouseCatalogOperations(tableEntity.getProperties());
- return lakehouseCatalogOperations.dropTable(ident);
+ return lakehouseCatalogOperations.purgeTable(ident);
} catch (NoSuchTableException e) {
- LOG.warn("Table {} does not exist, skip dropping it.", ident);
+ LOG.warn("Table {} does not exist, skip purging it.", ident);
return false;
} catch (IOException e) {
- throw new RuntimeException("Failed to drop table: " + ident, e);
+ throw new RuntimeException("Failed to purge table: " + ident, e);
}
}
@Override
- public boolean purgeTable(NameIdentifier ident) throws
UnsupportedOperationException {
+ public boolean dropTable(NameIdentifier ident) throws
UnsupportedOperationException {
try {
// Only delete the metadata entry here. The physical data will not be
deleted.
if (!tableExists(ident)) {
@@ -389,7 +392,7 @@ public class GenericLakehouseCatalogOperations
}
return store.delete(ident, Entity.EntityType.TABLE);
} catch (IOException e) {
- throw new RuntimeException("Failed to purge table " + ident, e);
+ throw new RuntimeException("Failed to drop table " + ident, e);
}
}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
index 0ed83457a4..16bef5565f 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
@@ -20,9 +20,7 @@
package org.apache.gravitino.catalog.lakehouse.lance;
import static org.apache.gravitino.Entity.EntityType.TABLE;
-import static
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.lancedb.lance.Dataset;
import com.lancedb.lance.WriteParams;
@@ -58,6 +56,7 @@ import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
import org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter;
+import org.apache.gravitino.lance.common.utils.LancePropertiesUtils;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.TableEntity;
import org.apache.gravitino.rel.Column;
@@ -120,13 +119,7 @@ public class LanceCatalogOperations implements
LakehouseCatalogOperations {
throws NoSuchSchemaException, TableAlreadyExistsException {
// Ignore partitions, distributions, sortOrders, and indexes for Lance
tables;
String location =
properties.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
- Map<String, String> storageProps =
- properties.entrySet().stream()
- .filter(e ->
e.getKey().startsWith(LANCE_TABLE_STORAGE_OPTION_PREFIX))
- .collect(
- Collectors.toMap(
- e ->
e.getKey().substring(LANCE_TABLE_STORAGE_OPTION_PREFIX.length()),
- Map.Entry::getValue));
+ Map<String, String> storageProps =
LancePropertiesUtils.getLanceStorageOptions(properties);
try (Dataset ignored =
Dataset.create(
@@ -280,7 +273,7 @@ public class LanceCatalogOperations implements
LakehouseCatalogOperations {
}
@Override
- public boolean dropTable(NameIdentifier ident) {
+ public boolean purgeTable(NameIdentifier ident) {
try {
TableEntity tableEntity = store.get(ident, Entity.EntityType.TABLE,
TableEntity.class);
Map<String, String> lancePropertiesMap = tableEntity.getProperties();
@@ -288,14 +281,24 @@ public class LanceCatalogOperations implements
LakehouseCatalogOperations {
lancePropertiesMap.get(GenericLakehouseTablePropertiesMetadata.LAKEHOUSE_LOCATION);
if (!store.delete(ident, Entity.EntityType.TABLE)) {
- throw new RuntimeException("Failed to drop Lance table: " +
ident.name());
+ throw new RuntimeException("Failed to purge Lance table: " +
ident.name());
}
// Drop the Lance dataset from cloud storage.
- Dataset.drop(location, ImmutableMap.of());
+ Dataset.drop(location,
LancePropertiesUtils.getLanceStorageOptions(lancePropertiesMap));
return true;
} catch (IOException e) {
- throw new RuntimeException("Failed to drop Lance table: " +
ident.name(), e);
+ throw new RuntimeException("Failed to purge Lance table: " +
ident.name(), e);
}
}
+
+ @Override
+ public boolean dropTable(NameIdentifier ident) {
+ // TODO We will handle it in GenericLakehouseCatalogOperations. However,
we need
+ // to figure out it's a external table or not first. we will introduce a
property
+ // to indicate that. Currently, all Lance tables are external tables.
`drop` will
+ // just remove the metadata in metastore and will not delete data in
storage.
+ throw new UnsupportedOperationException(
+ "LanceCatalogOperations does not support dropTable operation.");
+ }
}
diff --git
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
index 8a356fb135..7f2f1df52f 100644
---
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
+++
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/LanceTableOperations.java
@@ -18,26 +18,80 @@
*/
package org.apache.gravitino.lance.common.ops;
+import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
+import com.lancedb.lance.namespace.model.CreateTableRequest;
import com.lancedb.lance.namespace.model.CreateTableResponse;
import com.lancedb.lance.namespace.model.DeregisterTableResponse;
import com.lancedb.lance.namespace.model.DescribeTableResponse;
+import com.lancedb.lance.namespace.model.RegisterTableRequest;
import com.lancedb.lance.namespace.model.RegisterTableResponse;
import java.util.Map;
+import java.util.Optional;
public interface LanceTableOperations {
- DescribeTableResponse describeTable(String tableId, String delimiter);
+ /**
+ * Describe the details of a table.
+ *
+ * @param tableId table ids are in the format of
"{namespace}{delimiter}{table_name}"
+ * @param delimiter the delimiter used in the namespace
+ * @param version the version of the table to describe, if null, describe
the latest version
+ * @return the table description
+ */
+ DescribeTableResponse describeTable(String tableId, String delimiter,
Optional<Long> version);
+ /**
+ * Create a new table.
+ *
+ * @param tableId table ids are in the format of
"{namespace}{delimiter}{table_name}"
+ * @param mode it can be CREATE, OVERWRITE, or EXIST_OK
+ * @param delimiter the delimiter used in the namespace
+ * @param tableLocation the location where the table data will be stored
+ * @param tableProperties the properties of the table
+ * @param arrowStreamBody the arrow stream bytes containing the schema and
data
+ * @return the response of the create table operation
+ */
CreateTableResponse createTable(
String tableId,
- String mode,
+ CreateTableRequest.ModeEnum mode,
String delimiter,
String tableLocation,
Map<String, String> tableProperties,
byte[] arrowStreamBody);
+ /**
+ * Create an new table without schema.
+ *
+ * @param tableId table ids are in the format of
"{namespace}{delimiter}{table_name}"
+ * @param delimiter the delimiter used in the namespace
+ * @param tableLocation the location where the table data will be stored
+ * @param tableProperties the properties of the table
+ * @return the response of the create table operation
+ */
+ CreateEmptyTableResponse createEmptyTable(
+ String tableId, String delimiter, String tableLocation, Map<String,
String> tableProperties);
+
+ /**
+ * Register an existing table.
+ *
+ * @param tableId table ids are in the format of
"{namespace}{delimiter}{table_name}"
+ * @param mode it can be REGISTER or OVERWRITE.
+ * @param delimiter the delimiter used in the namespace
+ * @param tableProperties the properties of the table, it should contain the
table location
+ * @return the response of the register table operation
+ */
RegisterTableResponse registerTable(
- String tableId, String mode, String delimiter, Map<String, String>
tableProperties);
+ String tableId,
+ RegisterTableRequest.ModeEnum mode,
+ String delimiter,
+ Map<String, String> tableProperties);
+ /**
+ * Deregister a table. It will not delete the underlying lance data.
+ *
+ * @param tableId table ids are in the format of
"{namespace}{delimiter}{table_name}"
+ * @param delimiter the delimiter used in the namespace
+ * @return the response of the deregister table operation
+ */
DeregisterTableResponse deregisterTable(String tableId, String delimiter);
}
diff --git
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNameSpaceOperations.java
similarity index 54%
copy from
lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
copy to
lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNameSpaceOperations.java
index dd2c4629b4..961134ec66 100644
---
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
+++
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNameSpaceOperations.java
@@ -1,29 +1,24 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package org.apache.gravitino.lance.common.ops.gravitino;
-import static
org.apache.gravitino.lance.common.config.LanceConfig.METALAKE_NAME;
-import static
org.apache.gravitino.lance.common.config.LanceConfig.NAMESPACE_BACKEND_URI;
-import static
org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter.CONVERTER;
-import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+package org.apache.gravitino.lance.common.ops.gravitino;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -32,24 +27,14 @@ import com.google.common.collect.Sets;
import com.lancedb.lance.namespace.LanceNamespaceException;
import com.lancedb.lance.namespace.ObjectIdentifier;
import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
-import com.lancedb.lance.namespace.model.CreateNamespaceRequest.ModeEnum;
import com.lancedb.lance.namespace.model.CreateNamespaceResponse;
-import com.lancedb.lance.namespace.model.CreateTableResponse;
-import com.lancedb.lance.namespace.model.DeregisterTableResponse;
import com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
-import com.lancedb.lance.namespace.model.DescribeTableResponse;
import com.lancedb.lance.namespace.model.DropNamespaceRequest;
import com.lancedb.lance.namespace.model.DropNamespaceResponse;
-import com.lancedb.lance.namespace.model.JsonArrowSchema;
import com.lancedb.lance.namespace.model.ListNamespacesResponse;
import com.lancedb.lance.namespace.model.ListTablesResponse;
-import com.lancedb.lance.namespace.model.RegisterTableRequest;
-import com.lancedb.lance.namespace.model.RegisterTableResponse;
import com.lancedb.lance.namespace.util.CommonUtil;
-import com.lancedb.lance.namespace.util.JsonArrowSchemaConverter;
import com.lancedb.lance.namespace.util.PageUtil;
-import java.io.ByteArrayInputStream;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -61,14 +46,8 @@ import java.util.function.IntFunction;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.arrow.vector.types.pojo.Field;
-import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.CatalogChange;
-import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.Schema;
import org.apache.gravitino.SchemaChange;
@@ -80,60 +59,16 @@ import
org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NonEmptyCatalogException;
import org.apache.gravitino.exceptions.NonEmptySchemaException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
-import org.apache.gravitino.lance.common.config.LanceConfig;
import org.apache.gravitino.lance.common.ops.LanceNamespaceOperations;
-import org.apache.gravitino.lance.common.ops.LanceTableOperations;
-import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
-import org.apache.gravitino.rel.Column;
-import org.apache.gravitino.rel.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class GravitinoLanceNamespaceWrapper extends NamespaceWrapper
- implements LanceNamespaceOperations, LanceTableOperations {
-
- private static final Logger LOG =
LoggerFactory.getLogger(GravitinoLanceNamespaceWrapper.class);
- private GravitinoClient client;
-
- @VisibleForTesting
- GravitinoLanceNamespaceWrapper() {
- super(null);
- }
-
- public GravitinoLanceNamespaceWrapper(LanceConfig config) {
- super(config);
- }
-
- @Override
- protected void initialize() {
- String uri = config().get(NAMESPACE_BACKEND_URI);
- String metalakeName = config().get(METALAKE_NAME);
- Preconditions.checkArgument(
- StringUtils.isNotBlank(metalakeName),
- "Metalake name must be provided for Lance Gravitino namespace
backend");
- this.client =
GravitinoClient.builder(uri).withMetalake(metalakeName).build();
- }
+public class GravitinoLanceNameSpaceOperations implements
LanceNamespaceOperations {
- @Override
- public LanceNamespaceOperations newNamespaceOps() {
- return this;
- }
+ private final GravitinoLanceNamespaceWrapper namespaceWrapper;
+ private final GravitinoClient client;
- @Override
- protected LanceTableOperations newTableOps() {
- return this;
- }
-
- @Override
- public void close() {
- if (client != null) {
- try {
- client.close();
- } catch (Exception e) {
- LOG.warn("Error closing Gravitino client", e);
- }
- }
+ public GravitinoLanceNameSpaceOperations(GravitinoLanceNamespaceWrapper
namespaceWrapper) {
+ this.namespaceWrapper = namespaceWrapper;
+ this.client = namespaceWrapper.getClient();
}
@Override
@@ -148,13 +83,13 @@ public class GravitinoLanceNamespaceWrapper extends
NamespaceWrapper
case 0:
namespaces =
Arrays.stream(client.listCatalogsInfo())
- .filter(this::isLakehouseCatalog)
+ .filter(namespaceWrapper::isLakehouseCatalog)
.map(Catalog::name)
.collect(Collectors.toList());
break;
case 1:
- Catalog catalog =
loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
+ Catalog catalog =
namespaceWrapper.loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
namespaces = Lists.newArrayList(catalog.asSchemas().listSchemas());
break;
@@ -184,7 +119,7 @@ public class GravitinoLanceNamespaceWrapper extends
NamespaceWrapper
"Expected at most 2-level and at least 1-level namespace but got: %s",
namespaceId);
- Catalog catalog = loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
+ Catalog catalog =
namespaceWrapper.loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
Map<String, String> properties = Maps.newHashMap();
switch (nsId.levels()) {
@@ -261,7 +196,7 @@ public class GravitinoLanceNamespaceWrapper extends
NamespaceWrapper
"Expected at most 2-level and at least 1-level namespace but got: %s",
namespaceId);
- Catalog catalog = loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
+ Catalog catalog =
namespaceWrapper.loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
if (nsId.levels() == 2) {
String schemaName = nsId.levelAtListPos(1);
if (!catalog.asSchemas().schemaExists(schemaName)) {
@@ -274,32 +209,6 @@ public class GravitinoLanceNamespaceWrapper extends
NamespaceWrapper
}
}
- private boolean isLakehouseCatalog(Catalog catalog) {
- return catalog.type().equals(Catalog.Type.RELATIONAL)
- && "generic-lakehouse".equals(catalog.provider());
- }
-
- private Catalog loadAndValidateLakehouseCatalog(String catalogName) {
- Catalog catalog;
- try {
- catalog = client.loadCatalog(catalogName);
- } catch (NoSuchCatalogException e) {
- throw LanceNamespaceException.notFound(
- "Catalog not found: " + catalogName,
- NoSuchCatalogException.class.getSimpleName(),
- catalogName,
- CommonUtil.formatCurrentStackTrace());
- }
- if (!isLakehouseCatalog(catalog)) {
- throw LanceNamespaceException.notFound(
- "Catalog is not a lakehouse catalog: " + catalogName,
- NoSuchCatalogException.class.getSimpleName(),
- catalogName,
- CommonUtil.formatCurrentStackTrace());
- }
- return catalog;
- }
-
private CreateNamespaceResponse createOrUpdateCatalog(
String catalogName, CreateNamespaceRequest.ModeEnum mode, Map<String,
String> properties) {
CreateNamespaceResponse response = new CreateNamespaceResponse();
@@ -322,7 +231,7 @@ public class GravitinoLanceNamespaceWrapper extends
NamespaceWrapper
}
// Catalog exists, validate type
- if (!isLakehouseCatalog(catalog)) {
+ if (!namespaceWrapper.isLakehouseCatalog(catalog)) {
throw LanceNamespaceException.conflict(
"Catalog already exists but is not a lakehouse catalog: " +
catalogName,
CatalogAlreadyExistsException.class.getSimpleName(),
@@ -370,7 +279,7 @@ public class GravitinoLanceNamespaceWrapper extends
NamespaceWrapper
CreateNamespaceRequest.ModeEnum mode,
Map<String, String> properties) {
CreateNamespaceResponse response = new CreateNamespaceResponse();
- Catalog loadedCatalog = loadAndValidateLakehouseCatalog(catalogName);
+ Catalog loadedCatalog =
namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);
Schema schema;
try {
@@ -506,7 +415,7 @@ public class GravitinoLanceNamespaceWrapper extends
NamespaceWrapper
Preconditions.checkArgument(
nsId.levels() == 2, "Expected 2-level namespace but got: %s",
nsId.levels());
String catalogName = nsId.levelAtListPos(0);
- Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
+ Catalog catalog =
namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);
String schemaName = nsId.levelAtListPos(1);
List<String> tables =
Arrays.stream(catalog.asTableCatalog().listTables(Namespace.of(schemaName)))
@@ -523,202 +432,4 @@ public class GravitinoLanceNamespaceWrapper extends
NamespaceWrapper
.tables(response.getNamespaces())
.pageToken(response.getPageToken());
}
-
- @Override
- public DescribeTableResponse describeTable(String tableId, String delimiter)
{
- ObjectIdentifier nsId = ObjectIdentifier.of(tableId,
Pattern.quote(delimiter));
- Preconditions.checkArgument(
- nsId.levels() == 3, "Expected at 3-level namespace but got: %s",
nsId.levels());
-
- String catalogName = nsId.levelAtListPos(0);
- Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
- NameIdentifier tableIdentifier =
- NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
-
- Table table = catalog.asTableCatalog().loadTable(tableIdentifier);
- DescribeTableResponse response = new DescribeTableResponse();
- response.setProperties(table.properties());
- response.setLocation(table.properties().get("location"));
- response.setSchema(toJsonArrowSchema(table.columns()));
- return response;
- }
-
- @Override
- public CreateTableResponse createTable(
- String tableId,
- String mode,
- String delimiter,
- String tableLocation,
- Map<String, String> tableProperties,
- byte[] arrowStreamBody) {
- ObjectIdentifier nsId = ObjectIdentifier.of(tableId,
Pattern.quote(delimiter));
- Preconditions.checkArgument(
- nsId.levels() == 3, "Expected at 3-level namespace but got: %s",
nsId.levels());
-
- // Parser column information.
- List<Column> columns = Lists.newArrayList();
- if (arrowStreamBody != null) {
- org.apache.arrow.vector.types.pojo.Schema schema =
parseArrowIpcStream(arrowStreamBody);
- columns = extractColumns(schema);
- }
-
- String catalogName = nsId.levelAtListPos(0);
- Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
-
- NameIdentifier tableIdentifier =
- NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
-
- Map<String, String> createTableProperties =
Maps.newHashMap(tableProperties);
- createTableProperties.put("location", tableLocation);
- createTableProperties.put("mode", mode);
- // TODO considering the mode (create, exist_ok, overwrite)
-
- ModeEnum createMode = ModeEnum.fromValue(mode.toLowerCase());
- switch (createMode) {
- case EXIST_OK:
- if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
- CreateTableResponse response = new CreateTableResponse();
- Table existingTable =
catalog.asTableCatalog().loadTable(tableIdentifier);
- response.setProperties(existingTable.properties());
- response.setLocation(existingTable.properties().get("location"));
- response.setVersion(0L);
- return response;
- }
- break;
- case CREATE:
- if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
- throw LanceNamespaceException.conflict(
- "Table already exists: " + tableId,
- SchemaAlreadyExistsException.class.getSimpleName(),
- tableId,
- CommonUtil.formatCurrentStackTrace());
- }
- break;
- case OVERWRITE:
- if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
- catalog.asTableCatalog().dropTable(tableIdentifier);
- }
- break;
- default:
- throw new IllegalArgumentException("Unknown mode: " + mode);
- }
-
- Table t =
- catalog
- .asTableCatalog()
- .createTable(
- tableIdentifier,
- columns.toArray(new Column[0]),
- tableLocation,
- createTableProperties);
-
- CreateTableResponse response = new CreateTableResponse();
- response.setProperties(t.properties());
- response.setLocation(tableLocation);
- response.setVersion(0L);
- return response;
- }
-
- @Override
- public RegisterTableResponse registerTable(
- String tableId, String mode, String delimiter, Map<String, String>
tableProperties) {
- ObjectIdentifier nsId = ObjectIdentifier.of(tableId,
Pattern.quote(delimiter));
- Preconditions.checkArgument(
- nsId.levels() == 3, "Expected at 3-level namespace but got: %s",
nsId.levels());
-
- String catalogName = nsId.levelAtListPos(0);
- Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
- NameIdentifier tableIdentifier =
- NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
-
- // TODO Support real register API
- RegisterTableRequest.ModeEnum createMode =
- RegisterTableRequest.ModeEnum.fromValue(mode.toUpperCase());
- if (createMode == RegisterTableRequest.ModeEnum.CREATE
- && catalog.asTableCatalog().tableExists(tableIdentifier)) {
- throw LanceNamespaceException.conflict(
- "Table already exists: " + tableId,
- SchemaAlreadyExistsException.class.getSimpleName(),
- tableId,
- CommonUtil.formatCurrentStackTrace());
- }
-
- if (createMode == RegisterTableRequest.ModeEnum.OVERWRITE
- && catalog.asTableCatalog().tableExists(tableIdentifier)) {
- LOG.info("Overwriting existing table: {}", tableId);
- catalog.asTableCatalog().dropTable(tableIdentifier);
- }
-
- Table t =
- catalog.asTableCatalog().createTable(tableIdentifier, new Column[] {},
"", tableProperties);
-
- RegisterTableResponse response = new RegisterTableResponse();
- response.setProperties(t.properties());
- response.setLocation(t.properties().get("location"));
- return response;
- }
-
- @Override
- public DeregisterTableResponse deregisterTable(String tableId, String
delimiter) {
-
- ObjectIdentifier nsId = ObjectIdentifier.of(tableId,
Pattern.quote(delimiter));
- Preconditions.checkArgument(
- nsId.levels() == 3, "Expected at 3-level namespace but got: %s",
nsId.levels());
-
- String catalogName = nsId.levelAtListPos(0);
- Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
-
- NameIdentifier tableIdentifier =
- NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
- Table t = catalog.asTableCatalog().loadTable(tableIdentifier);
- Map<String, String> properties = t.properties();
- // TODO Support real deregister API.
- catalog.asTableCatalog().purgeTable(tableIdentifier);
-
- DeregisterTableResponse response = new DeregisterTableResponse();
- response.setProperties(properties);
- response.setLocation(properties.get("location"));
- return response;
- }
-
- private JsonArrowSchema toJsonArrowSchema(Column[] columns) {
- List<Field> fields =
- Arrays.stream(columns)
- .map(col -> CONVERTER.toArrowField(col.name(), col.dataType(),
col.nullable()))
- .collect(Collectors.toList());
-
- return JsonArrowSchemaConverter.convertToJsonArrowSchema(
- new org.apache.arrow.vector.types.pojo.Schema(fields));
- }
-
- @VisibleForTesting
- org.apache.arrow.vector.types.pojo.Schema parseArrowIpcStream(byte[] stream)
{
- org.apache.arrow.vector.types.pojo.Schema schema;
- try (BufferAllocator allocator = new RootAllocator();
- ByteArrayInputStream bais = new ByteArrayInputStream(stream);
- ArrowStreamReader reader = new ArrowStreamReader(bais, allocator)) {
- schema = reader.getVectorSchemaRoot().getSchema();
- } catch (Exception e) {
- throw new RuntimeException("Failed to parse Arrow IPC stream", e);
- }
-
- Preconditions.checkArgument(schema != null, "No schema found in Arrow IPC
stream");
- return schema;
- }
-
- private List<Column>
extractColumns(org.apache.arrow.vector.types.pojo.Schema arrowSchema) {
- List<Column> columns = new ArrayList<>();
-
- for (org.apache.arrow.vector.types.pojo.Field field :
arrowSchema.getFields()) {
- columns.add(
- Column.of(
- field.getName(),
- CONVERTER.toGravitino(field),
- null,
- field.isNullable(),
- false,
- DEFAULT_VALUE_NOT_SET));
- }
- return columns;
- }
}
diff --git
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
index dd2c4629b4..a1fee0a73e 100644
---
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
+++
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceNamespaceWrapper.java
@@ -20,81 +20,30 @@ package org.apache.gravitino.lance.common.ops.gravitino;
import static
org.apache.gravitino.lance.common.config.LanceConfig.METALAKE_NAME;
import static
org.apache.gravitino.lance.common.config.LanceConfig.NAMESPACE_BACKEND_URI;
-import static
org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter.CONVERTER;
-import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import com.lancedb.lance.namespace.LanceNamespaceException;
-import com.lancedb.lance.namespace.ObjectIdentifier;
-import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
-import com.lancedb.lance.namespace.model.CreateNamespaceRequest.ModeEnum;
-import com.lancedb.lance.namespace.model.CreateNamespaceResponse;
-import com.lancedb.lance.namespace.model.CreateTableResponse;
-import com.lancedb.lance.namespace.model.DeregisterTableResponse;
-import com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
-import com.lancedb.lance.namespace.model.DescribeTableResponse;
-import com.lancedb.lance.namespace.model.DropNamespaceRequest;
-import com.lancedb.lance.namespace.model.DropNamespaceResponse;
-import com.lancedb.lance.namespace.model.JsonArrowSchema;
-import com.lancedb.lance.namespace.model.ListNamespacesResponse;
-import com.lancedb.lance.namespace.model.ListTablesResponse;
-import com.lancedb.lance.namespace.model.RegisterTableRequest;
-import com.lancedb.lance.namespace.model.RegisterTableResponse;
import com.lancedb.lance.namespace.util.CommonUtil;
-import com.lancedb.lance.namespace.util.JsonArrowSchemaConverter;
-import com.lancedb.lance.namespace.util.PageUtil;
-import java.io.ByteArrayInputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-import java.util.function.IntFunction;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.arrow.memory.BufferAllocator;
-import org.apache.arrow.memory.RootAllocator;
-import org.apache.arrow.vector.ipc.ArrowStreamReader;
-import org.apache.arrow.vector.types.pojo.Field;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
-import org.apache.gravitino.CatalogChange;
-import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.Namespace;
-import org.apache.gravitino.Schema;
-import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.client.GravitinoClient;
-import org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
-import org.apache.gravitino.exceptions.CatalogInUseException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
-import org.apache.gravitino.exceptions.NoSuchSchemaException;
-import org.apache.gravitino.exceptions.NonEmptyCatalogException;
-import org.apache.gravitino.exceptions.NonEmptySchemaException;
-import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.lance.common.config.LanceConfig;
import org.apache.gravitino.lance.common.ops.LanceNamespaceOperations;
import org.apache.gravitino.lance.common.ops.LanceTableOperations;
import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
-import org.apache.gravitino.rel.Column;
-import org.apache.gravitino.rel.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class GravitinoLanceNamespaceWrapper extends NamespaceWrapper
- implements LanceNamespaceOperations, LanceTableOperations {
+public class GravitinoLanceNamespaceWrapper extends NamespaceWrapper {
private static final Logger LOG =
LoggerFactory.getLogger(GravitinoLanceNamespaceWrapper.class);
private GravitinoClient client;
+ private LanceNamespaceOperations namespaceOperations;
+ private LanceTableOperations tableOperations;
+
@VisibleForTesting
GravitinoLanceNamespaceWrapper() {
super(null);
@@ -104,6 +53,10 @@ public class GravitinoLanceNamespaceWrapper extends
NamespaceWrapper
super(config);
}
+ public GravitinoClient getClient() {
+ return client;
+ }
+
@Override
protected void initialize() {
String uri = config().get(NAMESPACE_BACKEND_URI);
@@ -113,16 +66,18 @@ public class GravitinoLanceNamespaceWrapper extends
NamespaceWrapper
"Metalake name must be provided for Lance Gravitino namespace
backend");
this.client =
GravitinoClient.builder(uri).withMetalake(metalakeName).build();
+ this.namespaceOperations = new GravitinoLanceNameSpaceOperations(this);
+ this.tableOperations = new GravitinoLanceTableOperations(this);
}
@Override
public LanceNamespaceOperations newNamespaceOps() {
- return this;
+ return namespaceOperations;
}
@Override
protected LanceTableOperations newTableOps() {
- return this;
+ return tableOperations;
}
@Override
@@ -136,150 +91,12 @@ public class GravitinoLanceNamespaceWrapper extends
NamespaceWrapper
}
}
- @Override
- public ListNamespacesResponse listNamespaces(
- String namespaceId, String delimiter, String pageToken, Integer limit) {
- ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter);
- Preconditions.checkArgument(
- nsId.levels() <= 2, "Expected at most 2-level namespace but got: %s",
namespaceId);
-
- List<String> namespaces;
- switch (nsId.levels()) {
- case 0:
- namespaces =
- Arrays.stream(client.listCatalogsInfo())
- .filter(this::isLakehouseCatalog)
- .map(Catalog::name)
- .collect(Collectors.toList());
- break;
-
- case 1:
- Catalog catalog =
loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
- namespaces = Lists.newArrayList(catalog.asSchemas().listSchemas());
- break;
-
- case 2:
- namespaces = Lists.newArrayList();
- break;
-
- default:
- throw new IllegalArgumentException(
- "Expected at most 2-level namespace but got: " + namespaceId);
- }
-
- Collections.sort(namespaces);
- PageUtil.Page page =
- PageUtil.splitPage(namespaces, pageToken,
PageUtil.normalizePageSize(limit));
- ListNamespacesResponse response = new ListNamespacesResponse();
- response.setNamespaces(Sets.newHashSet(page.items()));
- response.setPageToken(page.nextPageToken());
- return response;
- }
-
- @Override
- public DescribeNamespaceResponse describeNamespace(String namespaceId,
String delimiter) {
- ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter);
- Preconditions.checkArgument(
- nsId.levels() <= 2 && nsId.levels() > 0,
- "Expected at most 2-level and at least 1-level namespace but got: %s",
- namespaceId);
-
- Catalog catalog = loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
- Map<String, String> properties = Maps.newHashMap();
-
- switch (nsId.levels()) {
- case 1:
-
Optional.ofNullable(catalog.properties()).ifPresent(properties::putAll);
- break;
- case 2:
- String schemaName = nsId.levelAtListPos(1);
- Schema schema = catalog.asSchemas().loadSchema(schemaName);
- Optional.ofNullable(schema.properties()).ifPresent(properties::putAll);
- break;
- default:
- throw new IllegalArgumentException(
- "Expected at most 2-level and at least 1-level namespace but got:
" + namespaceId);
- }
-
- DescribeNamespaceResponse response = new DescribeNamespaceResponse();
- response.setProperties(properties);
- return response;
- }
-
- @Override
- public CreateNamespaceResponse createNamespace(
- String namespaceId,
- String delimiter,
- CreateNamespaceRequest.ModeEnum mode,
- Map<String, String> properties) {
- ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter);
- Preconditions.checkArgument(
- nsId.levels() <= 2 && nsId.levels() > 0,
- "Expected at most 2-level and at least 1-level namespace but got: %s",
- namespaceId);
-
- switch (nsId.levels()) {
- case 1:
- return createOrUpdateCatalog(nsId.levelAtListPos(0), mode, properties);
- case 2:
- return createOrUpdateSchema(
- nsId.levelAtListPos(0), nsId.levelAtListPos(1), mode, properties);
- default:
- throw new IllegalArgumentException(
- "Expected at most 2-level and at least 1-level namespace but got:
" + namespaceId);
- }
- }
-
- @Override
- public DropNamespaceResponse dropNamespace(
- String namespaceId,
- String delimiter,
- DropNamespaceRequest.ModeEnum mode,
- DropNamespaceRequest.BehaviorEnum behavior) {
- ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter);
- Preconditions.checkArgument(
- nsId.levels() <= 2 && nsId.levels() > 0,
- "Expected at most 2-level and at least 1-level namespace but got: %s",
- namespaceId);
-
- switch (nsId.levels()) {
- case 1:
- return dropCatalog(nsId.levelAtListPos(0), mode, behavior);
- case 2:
- return dropSchema(nsId.levelAtListPos(0), nsId.levelAtListPos(1),
mode, behavior);
- default:
- throw new IllegalArgumentException(
- "Expected at most 2-level and at least 1-level namespace but got:
" + namespaceId);
- }
- }
-
- @Override
- public void namespaceExists(String namespaceId, String delimiter) throws
LanceNamespaceException {
- ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId, delimiter);
- Preconditions.checkArgument(
- nsId.levels() <= 2 && nsId.levels() > 0,
- "Expected at most 2-level and at least 1-level namespace but got: %s",
- namespaceId);
-
- Catalog catalog = loadAndValidateLakehouseCatalog(nsId.levelAtListPos(0));
- if (nsId.levels() == 2) {
- String schemaName = nsId.levelAtListPos(1);
- if (!catalog.asSchemas().schemaExists(schemaName)) {
- throw LanceNamespaceException.notFound(
- "Schema not found: " + schemaName,
- NoSuchSchemaException.class.getSimpleName(),
- schemaName,
- CommonUtil.formatCurrentStackTrace());
- }
- }
- }
-
- private boolean isLakehouseCatalog(Catalog catalog) {
+ public boolean isLakehouseCatalog(Catalog catalog) {
return catalog.type().equals(Catalog.Type.RELATIONAL)
&& "generic-lakehouse".equals(catalog.provider());
}
- private Catalog loadAndValidateLakehouseCatalog(String catalogName) {
+ public Catalog loadAndValidateLakehouseCatalog(String catalogName) {
Catalog catalog;
try {
catalog = client.loadCatalog(catalogName);
@@ -299,426 +116,4 @@ public class GravitinoLanceNamespaceWrapper extends
NamespaceWrapper
}
return catalog;
}
-
- private CreateNamespaceResponse createOrUpdateCatalog(
- String catalogName, CreateNamespaceRequest.ModeEnum mode, Map<String,
String> properties) {
- CreateNamespaceResponse response = new CreateNamespaceResponse();
-
- Catalog catalog;
- try {
- catalog = client.loadCatalog(catalogName);
- } catch (NoSuchCatalogException e) {
- // Catalog does not exist, create it
- Catalog createdCatalog =
- client.createCatalog(
- catalogName,
- Catalog.Type.RELATIONAL,
- "generic-lakehouse",
- "created by Lance REST server",
- properties);
- response.setProperties(
- createdCatalog.properties() == null ? Maps.newHashMap() :
createdCatalog.properties());
- return response;
- }
-
- // Catalog exists, validate type
- if (!isLakehouseCatalog(catalog)) {
- throw LanceNamespaceException.conflict(
- "Catalog already exists but is not a lakehouse catalog: " +
catalogName,
- CatalogAlreadyExistsException.class.getSimpleName(),
- catalogName,
- CommonUtil.formatCurrentStackTrace());
- }
-
- // Catalog exists, handle based on mode
- switch (mode) {
- case EXIST_OK:
- response.setProperties(
-
Optional.ofNullable(catalog.properties()).orElse(Collections.emptyMap()));
- return response;
- case CREATE:
- throw LanceNamespaceException.conflict(
- "Catalog already exists: " + catalogName,
- CatalogAlreadyExistsException.class.getSimpleName(),
- catalogName,
- CommonUtil.formatCurrentStackTrace());
- case OVERWRITE:
- CatalogChange[] changes =
- buildChanges(
- properties,
- removeInUseProperty(catalog.properties()),
- CatalogChange::setProperty,
- CatalogChange::removeProperty,
- CatalogChange[]::new);
- Catalog alteredCatalog = client.alterCatalog(catalogName, changes);
-
Optional.ofNullable(alteredCatalog.properties()).ifPresent(response::setProperties);
- return response;
- default:
- throw new IllegalArgumentException("Unknown mode: " + mode);
- }
- }
-
- private Map<String, String> removeInUseProperty(Map<String, String>
properties) {
- return properties.entrySet().stream()
- .filter(e -> !e.getKey().equalsIgnoreCase(Catalog.PROPERTY_IN_USE))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- }
-
- private CreateNamespaceResponse createOrUpdateSchema(
- String catalogName,
- String schemaName,
- CreateNamespaceRequest.ModeEnum mode,
- Map<String, String> properties) {
- CreateNamespaceResponse response = new CreateNamespaceResponse();
- Catalog loadedCatalog = loadAndValidateLakehouseCatalog(catalogName);
-
- Schema schema;
- try {
- schema = loadedCatalog.asSchemas().loadSchema(schemaName);
- } catch (NoSuchSchemaException e) {
- // Schema does not exist, create it
- Schema createdSchema =
loadedCatalog.asSchemas().createSchema(schemaName, null, properties);
- response.setProperties(
- createdSchema.properties() == null ? Maps.newHashMap() :
createdSchema.properties());
- return response;
- }
-
- // Schema exists, handle based on mode
- switch (mode) {
- case EXIST_OK:
- response.setProperties(
-
Optional.ofNullable(schema.properties()).orElse(Collections.emptyMap()));
- return response;
- case CREATE:
- throw LanceNamespaceException.conflict(
- "Schema already exists: " + schemaName,
- SchemaAlreadyExistsException.class.getSimpleName(),
- schemaName,
- CommonUtil.formatCurrentStackTrace());
- case OVERWRITE:
- SchemaChange[] changes =
- buildChanges(
- properties,
- schema.properties(),
- SchemaChange::setProperty,
- SchemaChange::removeProperty,
- SchemaChange[]::new);
- Schema alteredSchema =
loadedCatalog.asSchemas().alterSchema(schemaName, changes);
-
Optional.ofNullable(alteredSchema.properties()).ifPresent(response::setProperties);
- return response;
- default:
- throw new IllegalArgumentException("Unknown mode: " + mode);
- }
- }
-
- private DropNamespaceResponse dropCatalog(
- String catalogName,
- DropNamespaceRequest.ModeEnum mode,
- DropNamespaceRequest.BehaviorEnum behavior) {
- try {
- boolean dropped =
- client.dropCatalog(catalogName, behavior ==
DropNamespaceRequest.BehaviorEnum.CASCADE);
- if (dropped) {
- return new DropNamespaceResponse();
- } else {
- // Catalog did not exist
- if (mode == DropNamespaceRequest.ModeEnum.FAIL) {
- throw LanceNamespaceException.notFound(
- "Catalog not found: " + catalogName,
- NoSuchCatalogException.class.getSimpleName(),
- catalogName,
- CommonUtil.formatCurrentStackTrace());
- }
- return new DropNamespaceResponse(); // SKIP mode
- }
- } catch (NonEmptyCatalogException | CatalogInUseException e) {
- throw LanceNamespaceException.badRequest(
- String.format("Catalog %s is not empty or in used", catalogName),
- NonEmptyCatalogException.class.getSimpleName(),
- catalogName,
- CommonUtil.formatCurrentStackTrace());
- }
- }
-
- private DropNamespaceResponse dropSchema(
- String catalogName,
- String schemaName,
- DropNamespaceRequest.ModeEnum mode,
- DropNamespaceRequest.BehaviorEnum behavior) {
- try {
- boolean dropped =
- client
- .loadCatalog(catalogName)
- .asSchemas()
- .dropSchema(schemaName, behavior ==
DropNamespaceRequest.BehaviorEnum.CASCADE);
- if (dropped) {
- return new DropNamespaceResponse();
- } else {
- // Schema did not exist
- if (mode == DropNamespaceRequest.ModeEnum.FAIL) {
- throw LanceNamespaceException.notFound(
- "Schema not found: " + schemaName,
- NoSuchSchemaException.class.getSimpleName(),
- schemaName,
- CommonUtil.formatCurrentStackTrace());
- }
- return new DropNamespaceResponse(); // SKIP mode
- }
- } catch (NoSuchCatalogException e) {
- throw LanceNamespaceException.notFound(
- "Catalog not found: " + catalogName,
- NoSuchCatalogException.class.getSimpleName(),
- catalogName,
- CommonUtil.formatCurrentStackTrace());
- } catch (NonEmptySchemaException e) {
- throw LanceNamespaceException.badRequest(
- String.format("Schema %s is not empty.", schemaName),
- NonEmptySchemaException.class.getSimpleName(),
- schemaName,
- CommonUtil.formatCurrentStackTrace());
- }
- }
-
- private <T> T[] buildChanges(
- Map<String, String> newProps,
- Map<String, String> oldProps,
- BiFunction<String, String, T> setPropertyFunc,
- Function<String, T> removePropertyFunc,
- IntFunction<T[]> arrayCreator) {
- Stream<T> setPropertiesStream =
- newProps.entrySet().stream()
- .map(entry -> setPropertyFunc.apply(entry.getKey(),
entry.getValue()));
-
- Stream<T> removePropertiesStream =
- oldProps == null
- ? Stream.empty()
- : oldProps.keySet().stream()
- .filter(key -> !newProps.containsKey(key))
- .map(removePropertyFunc);
-
- return Stream.concat(setPropertiesStream,
removePropertiesStream).toArray(arrayCreator);
- }
-
- @Override
- public ListTablesResponse listTables(
- String namespaceId, String delimiter, String pageToken, Integer limit) {
- ObjectIdentifier nsId = ObjectIdentifier.of(namespaceId,
Pattern.quote(delimiter));
- Preconditions.checkArgument(
- nsId.levels() == 2, "Expected 2-level namespace but got: %s",
nsId.levels());
- String catalogName = nsId.levelAtListPos(0);
- Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
- String schemaName = nsId.levelAtListPos(1);
- List<String> tables =
-
Arrays.stream(catalog.asTableCatalog().listTables(Namespace.of(schemaName)))
- .map(ident -> Joiner.on(delimiter).join(catalogName, schemaName,
ident.name()))
- .sorted()
- .collect(Collectors.toList());
-
- PageUtil.Page page = PageUtil.splitPage(tables, pageToken,
PageUtil.normalizePageSize(limit));
- ListNamespacesResponse response = new ListNamespacesResponse();
- response.setNamespaces(Sets.newHashSet(page.items()));
- response.setPageToken(page.nextPageToken());
-
- return new ListTablesResponse()
- .tables(response.getNamespaces())
- .pageToken(response.getPageToken());
- }
-
- @Override
- public DescribeTableResponse describeTable(String tableId, String delimiter)
{
- ObjectIdentifier nsId = ObjectIdentifier.of(tableId,
Pattern.quote(delimiter));
- Preconditions.checkArgument(
- nsId.levels() == 3, "Expected at 3-level namespace but got: %s",
nsId.levels());
-
- String catalogName = nsId.levelAtListPos(0);
- Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
- NameIdentifier tableIdentifier =
- NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
-
- Table table = catalog.asTableCatalog().loadTable(tableIdentifier);
- DescribeTableResponse response = new DescribeTableResponse();
- response.setProperties(table.properties());
- response.setLocation(table.properties().get("location"));
- response.setSchema(toJsonArrowSchema(table.columns()));
- return response;
- }
-
- @Override
- public CreateTableResponse createTable(
- String tableId,
- String mode,
- String delimiter,
- String tableLocation,
- Map<String, String> tableProperties,
- byte[] arrowStreamBody) {
- ObjectIdentifier nsId = ObjectIdentifier.of(tableId,
Pattern.quote(delimiter));
- Preconditions.checkArgument(
- nsId.levels() == 3, "Expected at 3-level namespace but got: %s",
nsId.levels());
-
- // Parser column information.
- List<Column> columns = Lists.newArrayList();
- if (arrowStreamBody != null) {
- org.apache.arrow.vector.types.pojo.Schema schema =
parseArrowIpcStream(arrowStreamBody);
- columns = extractColumns(schema);
- }
-
- String catalogName = nsId.levelAtListPos(0);
- Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
-
- NameIdentifier tableIdentifier =
- NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
-
- Map<String, String> createTableProperties =
Maps.newHashMap(tableProperties);
- createTableProperties.put("location", tableLocation);
- createTableProperties.put("mode", mode);
- // TODO considering the mode (create, exist_ok, overwrite)
-
- ModeEnum createMode = ModeEnum.fromValue(mode.toLowerCase());
- switch (createMode) {
- case EXIST_OK:
- if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
- CreateTableResponse response = new CreateTableResponse();
- Table existingTable =
catalog.asTableCatalog().loadTable(tableIdentifier);
- response.setProperties(existingTable.properties());
- response.setLocation(existingTable.properties().get("location"));
- response.setVersion(0L);
- return response;
- }
- break;
- case CREATE:
- if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
- throw LanceNamespaceException.conflict(
- "Table already exists: " + tableId,
- SchemaAlreadyExistsException.class.getSimpleName(),
- tableId,
- CommonUtil.formatCurrentStackTrace());
- }
- break;
- case OVERWRITE:
- if (catalog.asTableCatalog().tableExists(tableIdentifier)) {
- catalog.asTableCatalog().dropTable(tableIdentifier);
- }
- break;
- default:
- throw new IllegalArgumentException("Unknown mode: " + mode);
- }
-
- Table t =
- catalog
- .asTableCatalog()
- .createTable(
- tableIdentifier,
- columns.toArray(new Column[0]),
- tableLocation,
- createTableProperties);
-
- CreateTableResponse response = new CreateTableResponse();
- response.setProperties(t.properties());
- response.setLocation(tableLocation);
- response.setVersion(0L);
- return response;
- }
-
- @Override
- public RegisterTableResponse registerTable(
- String tableId, String mode, String delimiter, Map<String, String>
tableProperties) {
- ObjectIdentifier nsId = ObjectIdentifier.of(tableId,
Pattern.quote(delimiter));
- Preconditions.checkArgument(
- nsId.levels() == 3, "Expected at 3-level namespace but got: %s",
nsId.levels());
-
- String catalogName = nsId.levelAtListPos(0);
- Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
- NameIdentifier tableIdentifier =
- NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
-
- // TODO Support real register API
- RegisterTableRequest.ModeEnum createMode =
- RegisterTableRequest.ModeEnum.fromValue(mode.toUpperCase());
- if (createMode == RegisterTableRequest.ModeEnum.CREATE
- && catalog.asTableCatalog().tableExists(tableIdentifier)) {
- throw LanceNamespaceException.conflict(
- "Table already exists: " + tableId,
- SchemaAlreadyExistsException.class.getSimpleName(),
- tableId,
- CommonUtil.formatCurrentStackTrace());
- }
-
- if (createMode == RegisterTableRequest.ModeEnum.OVERWRITE
- && catalog.asTableCatalog().tableExists(tableIdentifier)) {
- LOG.info("Overwriting existing table: {}", tableId);
- catalog.asTableCatalog().dropTable(tableIdentifier);
- }
-
- Table t =
- catalog.asTableCatalog().createTable(tableIdentifier, new Column[] {},
"", tableProperties);
-
- RegisterTableResponse response = new RegisterTableResponse();
- response.setProperties(t.properties());
- response.setLocation(t.properties().get("location"));
- return response;
- }
-
- @Override
- public DeregisterTableResponse deregisterTable(String tableId, String
delimiter) {
-
- ObjectIdentifier nsId = ObjectIdentifier.of(tableId,
Pattern.quote(delimiter));
- Preconditions.checkArgument(
- nsId.levels() == 3, "Expected at 3-level namespace but got: %s",
nsId.levels());
-
- String catalogName = nsId.levelAtListPos(0);
- Catalog catalog = loadAndValidateLakehouseCatalog(catalogName);
-
- NameIdentifier tableIdentifier =
- NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
- Table t = catalog.asTableCatalog().loadTable(tableIdentifier);
- Map<String, String> properties = t.properties();
- // TODO Support real deregister API.
- catalog.asTableCatalog().purgeTable(tableIdentifier);
-
- DeregisterTableResponse response = new DeregisterTableResponse();
- response.setProperties(properties);
- response.setLocation(properties.get("location"));
- return response;
- }
-
- private JsonArrowSchema toJsonArrowSchema(Column[] columns) {
- List<Field> fields =
- Arrays.stream(columns)
- .map(col -> CONVERTER.toArrowField(col.name(), col.dataType(),
col.nullable()))
- .collect(Collectors.toList());
-
- return JsonArrowSchemaConverter.convertToJsonArrowSchema(
- new org.apache.arrow.vector.types.pojo.Schema(fields));
- }
-
- @VisibleForTesting
- org.apache.arrow.vector.types.pojo.Schema parseArrowIpcStream(byte[] stream)
{
- org.apache.arrow.vector.types.pojo.Schema schema;
- try (BufferAllocator allocator = new RootAllocator();
- ByteArrayInputStream bais = new ByteArrayInputStream(stream);
- ArrowStreamReader reader = new ArrowStreamReader(bais, allocator)) {
- schema = reader.getVectorSchemaRoot().getSchema();
- } catch (Exception e) {
- throw new RuntimeException("Failed to parse Arrow IPC stream", e);
- }
-
- Preconditions.checkArgument(schema != null, "No schema found in Arrow IPC
stream");
- return schema;
- }
-
- private List<Column>
extractColumns(org.apache.arrow.vector.types.pojo.Schema arrowSchema) {
- List<Column> columns = new ArrayList<>();
-
- for (org.apache.arrow.vector.types.pojo.Field field :
arrowSchema.getFields()) {
- columns.add(
- Column.of(
- field.getName(),
- CONVERTER.toGravitino(field),
- null,
- field.isNullable(),
- false,
- DEFAULT_VALUE_NOT_SET));
- }
- return columns;
- }
}
diff --git
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java
new file mode 100644
index 0000000000..d298dbe5e5
--- /dev/null
+++
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/ops/gravitino/GravitinoLanceTableOperations.java
@@ -0,0 +1,293 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.lance.common.ops.gravitino;
+
+import static
org.apache.gravitino.lance.common.ops.gravitino.LanceDataTypeConverter.CONVERTER;
+import static
org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_LOCATION;
+import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.lancedb.lance.namespace.LanceNamespaceException;
+import com.lancedb.lance.namespace.ObjectIdentifier;
+import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
+import com.lancedb.lance.namespace.model.CreateTableRequest;
+import com.lancedb.lance.namespace.model.CreateTableRequest.ModeEnum;
+import com.lancedb.lance.namespace.model.CreateTableResponse;
+import com.lancedb.lance.namespace.model.DeregisterTableResponse;
+import com.lancedb.lance.namespace.model.DescribeTableResponse;
+import com.lancedb.lance.namespace.model.JsonArrowSchema;
+import com.lancedb.lance.namespace.model.RegisterTableRequest;
+import com.lancedb.lance.namespace.model.RegisterTableResponse;
+import com.lancedb.lance.namespace.util.CommonUtil;
+import com.lancedb.lance.namespace.util.JsonArrowSchemaConverter;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.exceptions.NoSuchTableException;
+import org.apache.gravitino.exceptions.TableAlreadyExistsException;
+import org.apache.gravitino.lance.common.ops.LanceTableOperations;
+import org.apache.gravitino.lance.common.utils.ArrowUtils;
+import org.apache.gravitino.lance.common.utils.LancePropertiesUtils;
+import org.apache.gravitino.rel.Column;
+import org.apache.gravitino.rel.Table;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class GravitinoLanceTableOperations implements LanceTableOperations {
+ private static final Logger LOG =
LoggerFactory.getLogger(GravitinoLanceTableOperations.class);
+
+ private final GravitinoLanceNamespaceWrapper namespaceWrapper;
+
+ public GravitinoLanceTableOperations(GravitinoLanceNamespaceWrapper
namespaceWrapper) {
+ this.namespaceWrapper = namespaceWrapper;
+ }
+
+ @Override
+ public DescribeTableResponse describeTable(
+ String tableId, String delimiter, Optional<Long> version) {
+ if (!version.isEmpty()) {
+ throw new UnsupportedOperationException(
+ "Describing specific table version is not supported. It should be
null to indicate the"
+ + " latest version.");
+ }
+
+ ObjectIdentifier nsId = ObjectIdentifier.of(tableId,
Pattern.quote(delimiter));
+ Preconditions.checkArgument(
+ nsId.levels() == 3, "Expected at 3-level namespace but got: %s",
nsId.levels());
+
+ String catalogName = nsId.levelAtListPos(0);
+ Catalog catalog =
namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);
+ NameIdentifier tableIdentifier =
+ NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
+
+ Table table = catalog.asTableCatalog().loadTable(tableIdentifier);
+ DescribeTableResponse response = new DescribeTableResponse();
+ response.setProperties(table.properties());
+ response.setLocation(table.properties().get(LANCE_LOCATION));
+ response.setSchema(toJsonArrowSchema(table.columns()));
+ response.setVersion(null);
+
response.setStorageOptions(LancePropertiesUtils.getLanceStorageOptions(table.properties()));
+ return response;
+ }
+
+ @Override
+ public CreateTableResponse createTable(
+ String tableId,
+ CreateTableRequest.ModeEnum mode,
+ String delimiter,
+ String tableLocation,
+ Map<String, String> tableProperties,
+ byte[] arrowStreamBody) {
+ ObjectIdentifier nsId = ObjectIdentifier.of(tableId,
Pattern.quote(delimiter));
+ Preconditions.checkArgument(
+ nsId.levels() == 3, "Expected at 3-level namespace but got: %s",
nsId.levels());
+
+ // Parser column information.
+ List<Column> columns = Lists.newArrayList();
+ if (arrowStreamBody != null) {
+ org.apache.arrow.vector.types.pojo.Schema schema =
+ ArrowUtils.parseArrowIpcStream(arrowStreamBody);
+ columns = extractColumns(schema);
+ }
+
+ String catalogName = nsId.levelAtListPos(0);
+ Catalog catalog =
namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);
+
+ NameIdentifier tableIdentifier =
+ NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
+
+ Map<String, String> createTableProperties =
Maps.newHashMap(tableProperties);
+ if (tableLocation != null) {
+ createTableProperties.put(LANCE_LOCATION, tableLocation);
+ }
+ // The format is defined in GenericLakehouseCatalog
+ createTableProperties.put("format", "lance");
+
+ Table t;
+ try {
+ t =
+ catalog
+ .asTableCatalog()
+ .createTable(
+ tableIdentifier, columns.toArray(new Column[0]), null,
createTableProperties);
+ } catch (TableAlreadyExistsException exception) {
+ if (mode == CreateTableRequest.ModeEnum.CREATE) {
+ throw LanceNamespaceException.conflict(
+ "Table already exists: " + tableId,
+ TableAlreadyExistsException.class.getSimpleName(),
+ tableId,
+ CommonUtil.formatCurrentStackTrace());
+ } else if (mode == CreateTableRequest.ModeEnum.OVERWRITE) {
+ LOG.info("Overwriting existing table: {}", tableId);
+ catalog.asTableCatalog().purgeTable(tableIdentifier);
+
+ t =
+ catalog
+ .asTableCatalog()
+ .createTable(
+ tableIdentifier, columns.toArray(new Column[0]), null,
createTableProperties);
+ } else { // EXIST_OK
+ CreateTableResponse response = new CreateTableResponse();
+ Table existingTable =
catalog.asTableCatalog().loadTable(tableIdentifier);
+ response.setProperties(existingTable.properties());
+ response.setLocation(existingTable.properties().get(LANCE_LOCATION));
+ response.setVersion(null);
+ response.setStorageOptions(
+
LancePropertiesUtils.getLanceStorageOptions(existingTable.properties()));
+ return response;
+ }
+ }
+
+ CreateTableResponse response = new CreateTableResponse();
+ response.setProperties(t.properties());
+ response.setLocation(tableLocation);
+ // Extract storage options from table properties. All storage options
stores in table
+ // properties.
+
response.setStorageOptions(LancePropertiesUtils.getLanceStorageOptions(t.properties()));
+ response.setVersion(null);
+ response.setLocation(t.properties().get(LANCE_LOCATION));
+ response.setProperties(t.properties());
+ return response;
+ }
+
+ @Override
+ public CreateEmptyTableResponse createEmptyTable(
+ String tableId, String delimiter, String tableLocation, Map<String,
String> tableProperties) {
+ CreateTableResponse response =
+ createTable(tableId, ModeEnum.CREATE, delimiter, tableLocation,
tableProperties, null);
+ CreateEmptyTableResponse emptyTableResponse = new
CreateEmptyTableResponse();
+ emptyTableResponse.setProperties(response.getProperties());
+ emptyTableResponse.setLocation(response.getLocation());
+ emptyTableResponse.setStorageOptions(response.getStorageOptions());
+ return emptyTableResponse;
+ }
+
+ @Override
+ public RegisterTableResponse registerTable(
+ String tableId,
+ RegisterTableRequest.ModeEnum mode,
+ String delimiter,
+ Map<String, String> tableProperties) {
+ ObjectIdentifier nsId = ObjectIdentifier.of(tableId,
Pattern.quote(delimiter));
+ Preconditions.checkArgument(
+ nsId.levels() == 3, "Expected at 3-level namespace but got: %s",
nsId.levels());
+
+ String catalogName = nsId.levelAtListPos(0);
+ Catalog catalog =
namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);
+ NameIdentifier tableIdentifier =
+ NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
+
+ Map<String, String> copiedTableProperties =
Maps.newHashMap(tableProperties);
+ copiedTableProperties.put("format", "lance");
+ Table t = null;
+ try {
+ t =
+ catalog
+ .asTableCatalog()
+ .createTable(tableIdentifier, new Column[] {}, null,
copiedTableProperties);
+ } catch (TableAlreadyExistsException exception) {
+ if (mode == RegisterTableRequest.ModeEnum.CREATE) {
+ throw LanceNamespaceException.conflict(
+ "Table already exists: " + tableId,
+ TableAlreadyExistsException.class.getSimpleName(),
+ tableId,
+ CommonUtil.formatCurrentStackTrace());
+ } else if (mode == RegisterTableRequest.ModeEnum.OVERWRITE) {
+ LOG.info("Overwriting existing table: {}", tableId);
+ catalog.asTableCatalog().dropTable(tableIdentifier);
+
+ t =
+ catalog
+ .asTableCatalog()
+ .createTable(tableIdentifier, new Column[] {}, null,
copiedTableProperties);
+ }
+ }
+
+ RegisterTableResponse response = new RegisterTableResponse();
+ response.setProperties(t.properties());
+ response.setLocation(t.properties().get(LANCE_LOCATION));
+ return response;
+ }
+
+ @Override
+ public DeregisterTableResponse deregisterTable(String tableId, String
delimiter) {
+ ObjectIdentifier nsId = ObjectIdentifier.of(tableId,
Pattern.quote(delimiter));
+ Preconditions.checkArgument(
+ nsId.levels() == 3, "Expected at 3-level namespace but got: %s",
nsId.levels());
+
+ String catalogName = nsId.levelAtListPos(0);
+ Catalog catalog =
namespaceWrapper.loadAndValidateLakehouseCatalog(catalogName);
+
+ NameIdentifier tableIdentifier =
+ NameIdentifier.of(nsId.levelAtListPos(1), nsId.levelAtListPos(2));
+ Table t = catalog.asTableCatalog().loadTable(tableIdentifier);
+ Map<String, String> properties = t.properties();
+ // TODO Support real deregister API.
+ boolean result = catalog.asTableCatalog().dropTable(tableIdentifier);
+ if (!result) {
+ throw LanceNamespaceException.notFound(
+ "Table not found: " + tableId,
+ NoSuchTableException.class.getSimpleName(),
+ tableId,
+ CommonUtil.formatCurrentStackTrace());
+ }
+
+ DeregisterTableResponse response = new DeregisterTableResponse();
+ response.setProperties(properties);
+ response.setLocation(properties.get(LANCE_LOCATION));
+ response.setId(nsId.listStyleId());
+ return response;
+ }
+
+ private List<Column>
extractColumns(org.apache.arrow.vector.types.pojo.Schema arrowSchema) {
+ List<Column> columns = new ArrayList<>();
+
+ for (org.apache.arrow.vector.types.pojo.Field field :
arrowSchema.getFields()) {
+ columns.add(
+ Column.of(
+ field.getName(),
+ CONVERTER.toGravitino(field),
+ null,
+ field.isNullable(),
+ false,
+ DEFAULT_VALUE_NOT_SET));
+ }
+ return columns;
+ }
+
+ private JsonArrowSchema toJsonArrowSchema(Column[] columns) {
+ List<Field> fields =
+ Arrays.stream(columns)
+ .map(col -> CONVERTER.toArrowField(col.name(), col.dataType(),
col.nullable()))
+ .collect(Collectors.toList());
+
+ return JsonArrowSchemaConverter.convertToJsonArrowSchema(
+ new org.apache.arrow.vector.types.pojo.Schema(fields));
+ }
+}
diff --git
a/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestGravitinoLanceNamespaceWrapper.java
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/ArrowUtils.java
similarity index 68%
rename from
lance/lance-common/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestGravitinoLanceNamespaceWrapper.java
rename to
lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/ArrowUtils.java
index b0ddb980ab..5d8508ee45 100644
---
a/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/ops/gravitino/TestGravitinoLanceNamespaceWrapper.java
+++
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/ArrowUtils.java
@@ -16,40 +16,22 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.gravitino.lance.common.ops.gravitino;
+package org.apache.gravitino.lance.common.utils;
+import com.google.common.base.Preconditions;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
-import java.util.Arrays;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.ipc.ArrowStreamReader;
import org.apache.arrow.vector.ipc.ArrowStreamWriter;
-import org.apache.arrow.vector.types.pojo.ArrowType;
-import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
-public class TestGravitinoLanceNamespaceWrapper {
-
- @Test
- public void testParseArrowIpcStream() throws Exception {
- Schema schema =
- new Schema(
- Arrays.asList(
- Field.nullable("id", new ArrowType.Int(32, true)),
- Field.nullable("value", new ArrowType.Utf8())));
-
- GravitinoLanceNamespaceWrapper wrapper = new
GravitinoLanceNamespaceWrapper();
- byte[] ipcStream = generateIpcStream(schema);
- Schema parsedSchema = wrapper.parseArrowIpcStream(ipcStream);
-
- Assertions.assertEquals(schema, parsedSchema);
- }
-
- private byte[] generateIpcStream(Schema arrowSchema) throws IOException {
+public class ArrowUtils {
+ public static byte[] generateIpcStream(Schema arrowSchema) throws
IOException {
try (BufferAllocator allocator = new RootAllocator()) {
// Create an empty VectorSchemaRoot with the schema
@@ -73,4 +55,18 @@ public class TestGravitinoLanceNamespaceWrapper {
throw new IOException("Failed to create empty Arrow IPC stream: " +
e.getMessage(), e);
}
}
+
+ public static Schema parseArrowIpcStream(byte[] stream) {
+ Schema schema;
+ try (BufferAllocator allocator = new RootAllocator();
+ ByteArrayInputStream bais = new ByteArrayInputStream(stream);
+ ArrowStreamReader reader = new ArrowStreamReader(bais, allocator)) {
+ schema = reader.getVectorSchemaRoot().getSchema();
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Failed to parse Arrow IPC stream",
e);
+ }
+
+ Preconditions.checkArgument(schema != null, "No schema found in Arrow IPC
stream");
+ return schema;
+ }
}
diff --git
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LanceConstants.java
similarity index 77%
copy from
lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
copy to
lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LanceConstants.java
index f39ea2e684..c34a7be58a 100644
---
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
+++
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LanceConstants.java
@@ -17,13 +17,18 @@
* under the License.
*/
-package org.apache.gravitino.lance.service;
+package org.apache.gravitino.lance.common.utils;
-public class ServiceConstants {
+public class LanceConstants {
public static final String LANCE_HTTP_HEADER_PREFIX = "x-lance-";
public static final String LANCE_TABLE_LOCATION_HEADER =
LANCE_HTTP_HEADER_PREFIX + "table-location";
public static final String LANCE_TABLE_PROPERTIES_PREFIX_HEADER =
LANCE_HTTP_HEADER_PREFIX + "table-properties";
+ // Key for table location in table properties map
+ public static final String LANCE_LOCATION = "location";
+
+ // Prefix for storage options in LanceConfig
+ public static final String LANCE_STORAGE_OPTIONS_PREFIX = "lance.storage.";
}
diff --git
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LancePropertiesUtils.java
similarity index 50%
copy from
lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
copy to
lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LancePropertiesUtils.java
index f39ea2e684..e674a7266a 100644
---
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
+++
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/LancePropertiesUtils.java
@@ -15,15 +15,31 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
+ *
*/
+package org.apache.gravitino.lance.common.utils;
+
+import static
org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_STORAGE_OPTIONS_PREFIX;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class LancePropertiesUtils {
-package org.apache.gravitino.lance.service;
+ private LancePropertiesUtils() {
+ // Private constructor to prevent instantiation
+ }
-public class ServiceConstants {
- public static final String LANCE_HTTP_HEADER_PREFIX = "x-lance-";
+ public static Map<String, String> getLanceStorageOptions(Map<String, String>
tableProperties) {
+ if (tableProperties == null) {
+ return Map.of();
+ }
- public static final String LANCE_TABLE_LOCATION_HEADER =
- LANCE_HTTP_HEADER_PREFIX + "table-location";
- public static final String LANCE_TABLE_PROPERTIES_PREFIX_HEADER =
- LANCE_HTTP_HEADER_PREFIX + "table-properties";
+ return tableProperties.entrySet().stream()
+ .filter(e -> e.getKey().startsWith(LANCE_STORAGE_OPTIONS_PREFIX))
+ .collect(
+ Collectors.toMap(
+ e ->
e.getKey().substring(LANCE_STORAGE_OPTIONS_PREFIX.length()),
+ Map.Entry::getValue));
+ }
}
diff --git
a/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/SerializationUtils.java
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/SerializationUtils.java
new file mode 100644
index 0000000000..8e5fb2494b
--- /dev/null
+++
b/lance/lance-common/src/main/java/org/apache/gravitino/lance/common/utils/SerializationUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.gravitino.lance.common.utils;
+
+import com.google.common.collect.ImmutableMap;
+import com.lancedb.lance.namespace.util.JsonUtil;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.StringUtils;
+
+public class SerializationUtils {
+
+ private SerializationUtils() {
+ // Utility class
+ }
+
+ // Lance REST uses a unique way to serialize and serialize table, please see:
+ // see
https://github.com/lancedb/lance-namespace/blob/2033b2fca126e87e56ba0d5ec19c5ec010c7a98f/
+ //
java/lance-namespace-core/src/main/java/com/lancedb/lance/namespace/rest/RestNamespace.java#L207-L208
+ public static Map<String, String> deserializeProperties(String
serializedProperties) {
+ return StringUtils.isBlank(serializedProperties)
+ ? ImmutableMap.of()
+ : JsonUtil.parse(
+ serializedProperties,
+ jsonNode -> {
+ Map<String, String> map = new HashMap<>();
+ jsonNode
+ .fields()
+ .forEachRemaining(
+ entry -> {
+ map.put(entry.getKey(), entry.getValue().asText());
+ });
+ return map;
+ });
+ }
+}
diff --git
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/utils/TestArrowUtils.java
similarity index 50%
rename from
lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
rename to
lance/lance-common/src/test/java/org/apache/gravitino/lance/common/utils/TestArrowUtils.java
index f39ea2e684..43f0bf6ec6 100644
---
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/ServiceConstants.java
+++
b/lance/lance-common/src/test/java/org/apache/gravitino/lance/common/utils/TestArrowUtils.java
@@ -16,14 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.gravitino.lance.common.utils;
-package org.apache.gravitino.lance.service;
+import java.util.Arrays;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
-public class ServiceConstants {
- public static final String LANCE_HTTP_HEADER_PREFIX = "x-lance-";
+public class TestArrowUtils {
- public static final String LANCE_TABLE_LOCATION_HEADER =
- LANCE_HTTP_HEADER_PREFIX + "table-location";
- public static final String LANCE_TABLE_PROPERTIES_PREFIX_HEADER =
- LANCE_HTTP_HEADER_PREFIX + "table-properties";
+ @Test
+ public void testParseArrowIpcStream() throws Exception {
+ Schema schema =
+ new Schema(
+ Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("value", new ArrowType.Utf8())));
+ byte[] ipcStream = ArrowUtils.generateIpcStream(schema);
+ Schema parsedSchema = ArrowUtils.parseArrowIpcStream(ipcStream);
+
+ Assertions.assertEquals(schema, parsedSchema);
+ }
}
diff --git
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
index 5590eef9bd..290730f39a 100644
---
a/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
+++
b/lance/lance-rest-server/src/main/java/org/apache/gravitino/lance/service/rest/LanceTableOperations.java
@@ -19,21 +19,26 @@
package org.apache.gravitino.lance.service.rest;
import static
org.apache.gravitino.lance.common.ops.NamespaceWrapper.NAMESPACE_DELIMITER_DEFAULT;
-import static
org.apache.gravitino.lance.service.ServiceConstants.LANCE_TABLE_LOCATION_HEADER;
-import static
org.apache.gravitino.lance.service.ServiceConstants.LANCE_TABLE_PROPERTIES_PREFIX_HEADER;
+import static
org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_LOCATION;
+import static
org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_LOCATION_HEADER;
+import static
org.apache.gravitino.lance.common.utils.LanceConstants.LANCE_TABLE_PROPERTIES_PREFIX_HEADER;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
-import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.collect.Maps;
+import com.lancedb.lance.namespace.model.CreateEmptyTableRequest;
+import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
+import com.lancedb.lance.namespace.model.CreateTableRequest;
import com.lancedb.lance.namespace.model.CreateTableResponse;
import com.lancedb.lance.namespace.model.DeregisterTableRequest;
import com.lancedb.lance.namespace.model.DeregisterTableResponse;
+import com.lancedb.lance.namespace.model.DescribeTableRequest;
import com.lancedb.lance.namespace.model.DescribeTableResponse;
import com.lancedb.lance.namespace.model.RegisterTableRequest;
+import com.lancedb.lance.namespace.model.RegisterTableRequest.ModeEnum;
import com.lancedb.lance.namespace.model.RegisterTableResponse;
-import com.lancedb.lance.namespace.util.JsonUtil;
import java.util.Map;
+import java.util.Optional;
import javax.inject.Inject;
import javax.ws.rs.Consumes;
import javax.ws.rs.DefaultValue;
@@ -47,8 +52,8 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
-import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
+import org.apache.gravitino.lance.common.utils.SerializationUtils;
import org.apache.gravitino.lance.service.LanceExceptionMapper;
import org.apache.gravitino.metrics.MetricNames;
@@ -70,10 +75,14 @@ public class LanceTableOperations {
@ResponseMetered(name = "describe-table", absolute = true)
public Response describeTable(
@PathParam("id") String tableId,
- @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) @QueryParam("delimiter")
String delimiter) {
+ @DefaultValue(NAMESPACE_DELIMITER_DEFAULT) @QueryParam("delimiter")
String delimiter,
+ DescribeTableRequest request) {
try {
+ validateDescribeTableRequest(request);
DescribeTableResponse response =
- lanceNamespace.asTableOps().describeTable(tableId, delimiter);
+ lanceNamespace
+ .asTableOps()
+ .describeTable(tableId, delimiter,
Optional.ofNullable(request.getVersion()));
return Response.ok(response).build();
} catch (Exception e) {
return LanceExceptionMapper.toRESTResponse(tableId, e);
@@ -97,13 +106,12 @@ public class LanceTableOperations {
MultivaluedMap<String, String> headersMap = headers.getRequestHeaders();
String tableLocation = headersMap.getFirst(LANCE_TABLE_LOCATION_HEADER);
String tableProperties =
headersMap.getFirst(LANCE_TABLE_PROPERTIES_PREFIX_HEADER);
-
- Map<String, String> props =
- JsonUtil.mapper().readValue(tableProperties, new TypeReference<>()
{});
+ CreateTableRequest.ModeEnum modeEnum =
CreateTableRequest.ModeEnum.fromValue(mode);
+ Map<String, String> props =
SerializationUtils.deserializeProperties(tableProperties);
CreateTableResponse response =
lanceNamespace
.asTableOps()
- .createTable(tableId, mode, delimiter, tableLocation, props,
arrowStreamBody);
+ .createTable(tableId, modeEnum, delimiter, tableLocation, props,
arrowStreamBody);
return Response.ok(response).build();
} catch (Exception e) {
return LanceExceptionMapper.toRESTResponse(tableId, e);
@@ -112,27 +120,25 @@ public class LanceTableOperations {
@POST
@Path("/create-empty")
+ @Produces("application/json")
@Timed(name = "create-empty-table." + MetricNames.HTTP_PROCESS_DURATION,
absolute = true)
@ResponseMetered(name = "create-empty-table", absolute = true)
public Response createEmptyTable(
@PathParam("id") String tableId,
- @QueryParam("mode") @DefaultValue("create") String mode, // create,
exist_ok, overwrite
@QueryParam("delimiter") @DefaultValue(NAMESPACE_DELIMITER_DEFAULT)
String delimiter,
+ CreateEmptyTableRequest request,
@Context HttpHeaders headers) {
try {
- // Extract table properties from header
- MultivaluedMap<String, String> headersMap = headers.getRequestHeaders();
- String tableLocation = headersMap.getFirst(LANCE_TABLE_LOCATION_HEADER);
- String tableProperties =
headersMap.getFirst(LANCE_TABLE_PROPERTIES_PREFIX_HEADER);
+ validateCreateEmptyTableRequest(request);
+ String tableLocation = request.getLocation();
Map<String, String> props =
- StringUtils.isBlank(tableProperties)
- ? Map.of()
- : JsonUtil.mapper().readValue(tableProperties, new
TypeReference<>() {});
- CreateTableResponse response =
- lanceNamespace
- .asTableOps()
- .createTable(tableId, mode, delimiter, tableLocation, props,
null);
+ request.getProperties() == null
+ ? Maps.newHashMap()
+ : Maps.newHashMap(request.getProperties());
+
+ CreateEmptyTableResponse response =
+ lanceNamespace.asTableOps().createEmptyTable(tableId, delimiter,
tableLocation, props);
return Response.ok(response).build();
} catch (Exception e) {
return LanceExceptionMapper.toRESTResponse(tableId, e);
@@ -145,18 +151,18 @@ public class LanceTableOperations {
@ResponseMetered(name = "register-table", absolute = true)
public Response registerTable(
@PathParam("id") String tableId,
- @QueryParam("mode") @DefaultValue("create") String mode, // overwrite or
@QueryParam("delimiter") @DefaultValue("$") String delimiter,
@Context HttpHeaders headers,
RegisterTableRequest registerTableRequest) {
try {
+ validateRegisterTableRequest(registerTableRequest);
+
Map<String, String> props =
registerTableRequest.getProperties() == null
? Maps.newHashMap()
: Maps.newHashMap(registerTableRequest.getProperties());
- props.put("register", "true");
- props.put("location", registerTableRequest.getLocation());
- props.put("format", "lance");
+ props.put(LANCE_LOCATION, registerTableRequest.getLocation());
+ ModeEnum mode = registerTableRequest.getMode();
RegisterTableResponse response =
lanceNamespace.asTableOps().registerTable(tableId, mode, delimiter,
props);
@@ -176,6 +182,7 @@ public class LanceTableOperations {
@Context HttpHeaders headers,
DeregisterTableRequest deregisterTableRequest) {
try {
+ validateDeregisterTableRequest(deregisterTableRequest);
DeregisterTableResponse response =
lanceNamespace.asTableOps().deregisterTable(tableId, delimiter);
return Response.ok(response).build();
@@ -183,4 +190,26 @@ public class LanceTableOperations {
return LanceExceptionMapper.toRESTResponse(tableId, e);
}
}
+
+ private void validateCreateEmptyTableRequest(
+ @SuppressWarnings("unused") CreateEmptyTableRequest request) {
+ // No specific fields to validate for now
+ }
+
+ private void validateRegisterTableRequest(
+ @SuppressWarnings("unused") RegisterTableRequest request) {
+ // No specific fields to validate for now
+ }
+
+ private void validateDeregisterTableRequest(
+ @SuppressWarnings("unused") DeregisterTableRequest request) {
+ // We will ignore the id in the request body since it's already provided
in the path param.
+ // No specific fields to validate for now
+ }
+
+ private void validateDescribeTableRequest(
+ @SuppressWarnings("unused") DescribeTableRequest request) {
+ // We will ignore the id in the request body since it's already provided
in the path param
+ // No specific fields to validate for now
+ }
}
diff --git
a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
index f1fe008782..c7454442b6 100644
---
a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
+++
b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/integration/test/LanceRESTServiceIT.java
@@ -24,38 +24,68 @@ import com.google.common.collect.Sets;
import com.lancedb.lance.namespace.LanceNamespace;
import com.lancedb.lance.namespace.LanceNamespaceException;
import com.lancedb.lance.namespace.LanceNamespaces;
+import com.lancedb.lance.namespace.client.apache.ApiException;
+import com.lancedb.lance.namespace.model.CreateEmptyTableRequest;
+import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
import com.lancedb.lance.namespace.model.CreateNamespaceResponse;
+import com.lancedb.lance.namespace.model.CreateTableRequest;
+import com.lancedb.lance.namespace.model.CreateTableResponse;
+import com.lancedb.lance.namespace.model.DeregisterTableRequest;
+import com.lancedb.lance.namespace.model.DeregisterTableResponse;
import com.lancedb.lance.namespace.model.DescribeNamespaceRequest;
import com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
+import com.lancedb.lance.namespace.model.DescribeTableRequest;
+import com.lancedb.lance.namespace.model.DescribeTableResponse;
import com.lancedb.lance.namespace.model.DropNamespaceRequest;
import com.lancedb.lance.namespace.model.DropNamespaceResponse;
+import com.lancedb.lance.namespace.model.ErrorResponse;
+import com.lancedb.lance.namespace.model.JsonArrowField;
import com.lancedb.lance.namespace.model.ListNamespacesRequest;
import com.lancedb.lance.namespace.model.ListNamespacesResponse;
+import com.lancedb.lance.namespace.model.ListTablesRequest;
import com.lancedb.lance.namespace.model.NamespaceExistsRequest;
+import com.lancedb.lance.namespace.model.RegisterTableRequest;
+import com.lancedb.lance.namespace.model.RegisterTableRequest.ModeEnum;
+import com.lancedb.lance.namespace.model.RegisterTableResponse;
import com.lancedb.lance.namespace.rest.RestNamespaceConfig;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Schema;
import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.integration.test.util.BaseIT;
import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.gravitino.lance.common.utils.ArrowUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.testcontainers.shaded.com.google.common.base.Joiner;
public class LanceRESTServiceIT extends BaseIT {
+ private static final String CATALOG_NAME =
GravitinoITUtils.genRandomName("lance_rest_catalog");
+ private static final String SCHEMA_NAME =
GravitinoITUtils.genRandomName("lance_rest_schema");
private GravitinoMetalake metalake;
+ private Catalog catalog;
private Map<String, String> properties =
new HashMap<>() {
{
@@ -372,6 +402,272 @@ public class LanceRESTServiceIT extends BaseIT {
Assertions.assertEquals(404, exception.getCode());
}
+ @Test
+ void testCreateEmptyTable() throws ApiException {
+ catalog = createCatalog(CATALOG_NAME);
+ createSchema();
+
+ CreateEmptyTableRequest request = new CreateEmptyTableRequest();
+ String location = tempDir + "/" + "empty_table/";
+ request.setLocation(location);
+ request.setProperties(
+ ImmutableMap.of(
+ "key1", "v1",
+ "lance.storage.a", "value_a",
+ "lance.storage.b", "value_b"));
+ request.setId(List.of(CATALOG_NAME, SCHEMA_NAME, "empty_table"));
+
+ CreateEmptyTableResponse response = ns.createEmptyTable(request);
+ Assertions.assertNotNull(response);
+ Assertions.assertEquals(location, response.getLocation());
+ Assertions.assertEquals("v1", response.getProperties().get("key1"));
+ Assertions.assertEquals("value_a", response.getStorageOptions().get("a"));
+ Assertions.assertEquals("value_b", response.getStorageOptions().get("b"));
+
+ DescribeTableRequest describeTableRequest = new DescribeTableRequest();
+ describeTableRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME,
"empty_table"));
+
+ DescribeTableResponse loadTable = ns.describeTable(describeTableRequest);
+ Assertions.assertNotNull(loadTable);
+ Assertions.assertEquals(location, loadTable.getLocation());
+
+ // Try to create the same table again should fail
+ LanceNamespaceException exception =
+ Assertions.assertThrows(
+ LanceNamespaceException.class,
+ () -> {
+ ns.createEmptyTable(request);
+ });
+ Assertions.assertTrue(exception.getMessage().contains("Table already
exists"));
+ Assertions.assertEquals(409, exception.getCode());
+ }
+
+ @Test
+ void testCreateTable() throws IOException, ApiException {
+ catalog = createCatalog(CATALOG_NAME);
+ createSchema();
+
+ String location = tempDir + "/" + "table/";
+ List<String> ids = List.of(CATALOG_NAME, SCHEMA_NAME, "table");
+ org.apache.arrow.vector.types.pojo.Schema schema =
+ new org.apache.arrow.vector.types.pojo.Schema(
+ Arrays.asList(
+ Field.nullable("id", new ArrowType.Int(32, true)),
+ Field.nullable("value", new ArrowType.Utf8())));
+ byte[] body = ArrowUtils.generateIpcStream(schema);
+
+ CreateTableRequest request = new CreateTableRequest();
+ request.setId(ids);
+ request.setLocation(location);
+ request.setProperties(
+ ImmutableMap.of(
+ "key1", "v1",
+ "lance.storage.a", "value_a",
+ "lance.storage.b", "value_b"));
+
+ CreateTableResponse response = ns.createTable(request, body);
+ Assertions.assertNotNull(response);
+ Assertions.assertEquals(location, response.getLocation());
+ Assertions.assertEquals("v1", response.getProperties().get("key1"));
+ Assertions.assertEquals("value_a", response.getStorageOptions().get("a"));
+ Assertions.assertEquals("value_b", response.getStorageOptions().get("b"));
+
+ DescribeTableRequest describeTableRequest = new DescribeTableRequest();
+ describeTableRequest.setId(ids);
+ DescribeTableResponse loadTable = ns.describeTable(describeTableRequest);
+ Assertions.assertNotNull(loadTable);
+ Assertions.assertEquals(location, loadTable.getLocation());
+
+ List<JsonArrowField> jsonArrowFields = loadTable.getSchema().getFields();
+ for (int i = 0; i < jsonArrowFields.size(); i++) {
+ JsonArrowField jsonArrowField = jsonArrowFields.get(i);
+ Field originalField = schema.getFields().get(i);
+ Assertions.assertEquals(originalField.getName(),
jsonArrowField.getName());
+
+ if (i == 0) {
+ Assertions.assertEquals("int32", jsonArrowField.getType().getType());
+ } else if (i == 1) {
+ Assertions.assertEquals("utf8", jsonArrowField.getType().getType());
+ }
+ }
+ // Check the location exists
+ Assertions.assertTrue(new File(location).exists());
+ Assertions.assertEquals("v1", loadTable.getProperties().get("key1"));
+ Assertions.assertEquals("value_a", loadTable.getStorageOptions().get("a"));
+ Assertions.assertEquals("value_b", loadTable.getStorageOptions().get("b"));
+
+ // Check overwrite mode
+ String newLocation = tempDir + "/" + "table_new/";
+ request.setLocation(newLocation);
+ request.setMode(CreateTableRequest.ModeEnum.OVERWRITE);
+ request.setProperties(
+ ImmutableMap.of(
+ "key1", "v2",
+ "lance.storage.a", "value_va",
+ "lance.storage.b", "value_vb"));
+
+ response = Assertions.assertDoesNotThrow(() -> ns.createTable(request,
body));
+
+ Assertions.assertNotNull(response);
+ Assertions.assertEquals(newLocation, response.getLocation());
+ Assertions.assertTrue(response.getProperties().get("key1").equals("v2"));
+ Assertions.assertEquals("value_va", response.getStorageOptions().get("a"));
+ Assertions.assertEquals("value_vb", response.getStorageOptions().get("b"));
+ Assertions.assertTrue(new File(newLocation).exists());
+ Assertions.assertFalse(new File(location).exists());
+
+ // Check exist_ok mode
+ request.setMode(CreateTableRequest.ModeEnum.EXIST_OK);
+ response = Assertions.assertDoesNotThrow(() -> ns.createTable(request,
body));
+
+ Assertions.assertNotNull(response);
+ Assertions.assertEquals("v2", response.getProperties().get("key1"));
+ Assertions.assertEquals("value_va", response.getStorageOptions().get("a"));
+ Assertions.assertEquals("value_vb", response.getStorageOptions().get("b"));
+ Assertions.assertEquals(newLocation, response.getLocation());
+ Assertions.assertTrue(new File(newLocation).exists());
+
+ // Create table again without overwrite or exist_ok should fail
+ request.setMode(CreateTableRequest.ModeEnum.CREATE);
+ LanceNamespaceException exception =
+ Assertions.assertThrows(LanceNamespaceException.class, () ->
ns.createTable(request, body));
+ Assertions.assertTrue(exception.getMessage().contains("already exists"));
+ Assertions.assertEquals(409, exception.getCode());
+
+ // Create a table without location should fail
+ CreateTableRequest noLocationRequest = new CreateTableRequest();
+ noLocationRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME,
"no_location_table"));
+ LanceNamespaceException noLocationException =
+ Assertions.assertThrows(
+ LanceNamespaceException.class, () ->
ns.createTable(noLocationRequest, body));
+ Assertions.assertTrue(
+ noLocationException.getMessage().contains("No location specified for
table"));
+
+ // Create table with invalid schema should fail
+ byte[] invalidBody = "".getBytes(Charset.defaultCharset());
+ CreateTableRequest invalidRequest = new CreateTableRequest();
+ invalidRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME, "invalid_table"));
+ invalidRequest.setLocation(tempDir + "/" + "invalid_table/");
+ LanceNamespaceException apiException =
+ Assertions.assertThrows(
+ LanceNamespaceException.class, () ->
ns.createTable(invalidRequest, invalidBody));
+ Assertions.assertTrue(apiException.getMessage().contains("Failed to parse
Arrow IPC stream"));
+ Assertions.assertEquals(400, apiException.getCode());
+
+ // Create table with wrong ids should fail
+ CreateTableRequest wrongIdRequest = new CreateTableRequest();
+ wrongIdRequest.setId(List.of(CATALOG_NAME, "wrong_schema")); // This is a
schema NOT a table.
+ wrongIdRequest.setLocation(tempDir + "/" + "wrong_id_table/");
+ LanceNamespaceException wrongIdException =
+ Assertions.assertThrows(
+ LanceNamespaceException.class, () ->
ns.createTable(wrongIdRequest, body));
+ Assertions.assertTrue(wrongIdException.getMessage().contains("Expected at
3-level namespace"));
+ Assertions.assertEquals(400, wrongIdException.getCode());
+
+ // Now test list tables
+ ListTablesRequest listRequest = new ListTablesRequest();
+ listRequest.setId(List.of(CATALOG_NAME, SCHEMA_NAME));
+ var listResponse = ns.listTables(listRequest);
+ Set<String> stringSet = listResponse.getTables();
+ Assertions.assertEquals(1, stringSet.size());
+ Assertions.assertTrue(stringSet.contains(Joiner.on(".").join(ids)));
+ }
+
+ @Test
+ void testRegisterTable() {
+ catalog = createCatalog(CATALOG_NAME);
+ createSchema();
+
+ String location = tempDir + "/" + "register/";
+ List<String> ids = List.of(CATALOG_NAME, SCHEMA_NAME, "table_register");
+ RegisterTableRequest registerTableRequest = new RegisterTableRequest();
+ registerTableRequest.setLocation(location);
+ registerTableRequest.setMode(ModeEnum.CREATE);
+ registerTableRequest.setId(ids);
+ registerTableRequest.setProperties(ImmutableMap.of("key1", "value1"));
+
+ RegisterTableResponse response = ns.registerTable(registerTableRequest);
+ Assertions.assertNotNull(response);
+
+ DescribeTableRequest describeTableRequest = new DescribeTableRequest();
+ describeTableRequest.setId(ids);
+ DescribeTableResponse loadTable = ns.describeTable(describeTableRequest);
+ Assertions.assertNotNull(loadTable);
+ Assertions.assertEquals(location, loadTable.getLocation());
+ Assertions.assertTrue(loadTable.getProperties().containsKey("key1"));
+
+ // Test register again with OVERWRITE mode
+ String newLocation = tempDir + "/" + "register_new/";
+ registerTableRequest.setMode(ModeEnum.OVERWRITE);
+ registerTableRequest.setLocation(newLocation);
+ response = Assertions.assertDoesNotThrow(() ->
ns.registerTable(registerTableRequest));
+ Assertions.assertNotNull(response);
+ Assertions.assertEquals(newLocation, response.getLocation());
+
+ // Test deregister table
+ DeregisterTableRequest deregisterTableRequest = new
DeregisterTableRequest();
+ deregisterTableRequest.setId(ids);
+ DeregisterTableResponse deregisterTableResponse =
ns.deregisterTable(deregisterTableRequest);
+ Assertions.assertNotNull(deregisterTableResponse);
+ Assertions.assertEquals(newLocation,
deregisterTableResponse.getLocation());
+ }
+
+ @Test
+ void testDeregisterNonExistingTable() {
+ catalog = createCatalog(CATALOG_NAME);
+ createSchema();
+
+ List<String> ids = List.of(CATALOG_NAME, SCHEMA_NAME,
"non_existing_table");
+ DeregisterTableRequest deregisterTableRequest = new
DeregisterTableRequest();
+ deregisterTableRequest.setId(ids);
+
+ LanceNamespaceException exception =
+ Assertions.assertThrows(
+ LanceNamespaceException.class, () ->
ns.deregisterTable(deregisterTableRequest));
+ Assertions.assertEquals(404, exception.getCode());
+ Assertions.assertTrue(exception.getMessage().contains("does not exist"));
+ Optional<ErrorResponse> responseOptional = exception.getErrorResponse();
+ Assertions.assertTrue(responseOptional.isPresent());
+ Assertions.assertEquals(
+ NoSuchTableException.class.getSimpleName(),
responseOptional.get().getType());
+
+ // Try to create a table and then deregister table
+ CreateEmptyTableRequest createEmptyTableRequest = new
CreateEmptyTableRequest();
+ String location = tempDir + "/" + "to_be_deregistered_table/";
+ ids = List.of(CATALOG_NAME, SCHEMA_NAME, "to_be_deregistered_table");
+ createEmptyTableRequest.setLocation(location);
+ createEmptyTableRequest.setProperties(ImmutableMap.of());
+ createEmptyTableRequest.setId(ids);
+ CreateEmptyTableResponse response =
+ Assertions.assertDoesNotThrow(() ->
ns.createEmptyTable(createEmptyTableRequest));
+ Assertions.assertNotNull(response);
+ Assertions.assertEquals(location, response.getLocation());
+
+ // Now try to deregister
+ deregisterTableRequest.setId(ids);
+ DeregisterTableResponse deregisterTableResponse =
+ Assertions.assertDoesNotThrow(() ->
ns.deregisterTable(deregisterTableRequest));
+ Assertions.assertNotNull(deregisterTableResponse);
+ Assertions.assertEquals(location, deregisterTableResponse.getLocation());
+ Assertions.assertTrue(Objects.equals(ids,
deregisterTableResponse.getId()));
+ Assertions.assertTrue(
+ new File(location).exists(), "Data should still exist after
deregistering the table.");
+
+ // Now try to describe the table, should fail
+ DescribeTableRequest describeTableRequest = new DescribeTableRequest();
+ describeTableRequest.setId(ids);
+ LanceNamespaceException lanceNamespaceException =
+ Assertions.assertThrows(
+ LanceNamespaceException.class, () ->
ns.describeTable(describeTableRequest));
+ Assertions.assertEquals(404, lanceNamespaceException.getCode());
+
+ describeTableRequest.setVersion(1L);
+ lanceNamespaceException =
+ Assertions.assertThrows(
+ LanceNamespaceException.class, () ->
ns.describeTable(describeTableRequest));
+ Assertions.assertEquals(406, lanceNamespaceException.getCode());
+ }
+
private GravitinoMetalake createMetalake(String metalakeName) {
return client.createMetalake(metalakeName, "metalake for lance rest
service tests", null);
}
@@ -385,6 +681,13 @@ public class LanceRESTServiceIT extends BaseIT {
properties);
}
+ private void createSchema() {
+ Map<String, String> schemaProperties = Maps.newHashMap();
+ String comment = "comment";
+ catalog.asSchemas().createSchema(SCHEMA_NAME, comment, schemaProperties);
+ catalog.asSchemas().loadSchema(SCHEMA_NAME);
+ }
+
private String getLanceRestServiceUrl() {
return String.format("http://%s:%d/lance", "localhost",
getLanceRESTServerPort());
}
diff --git
a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java
b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java
index 0ba8bf79b7..efe1f90436 100644
---
a/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java
+++
b/lance/lance-rest-server/src/test/java/org/apache/gravitino/lance/service/rest/TestLanceNamespaceOperations.java
@@ -27,13 +27,23 @@ import static org.mockito.Mockito.when;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
+import com.lancedb.lance.namespace.LanceNamespaceException;
+import com.lancedb.lance.namespace.model.CreateEmptyTableRequest;
+import com.lancedb.lance.namespace.model.CreateEmptyTableResponse;
import com.lancedb.lance.namespace.model.CreateNamespaceRequest;
import com.lancedb.lance.namespace.model.CreateNamespaceResponse;
+import com.lancedb.lance.namespace.model.CreateTableResponse;
+import com.lancedb.lance.namespace.model.DeregisterTableRequest;
+import com.lancedb.lance.namespace.model.DeregisterTableResponse;
import com.lancedb.lance.namespace.model.DescribeNamespaceResponse;
+import com.lancedb.lance.namespace.model.DescribeTableRequest;
+import com.lancedb.lance.namespace.model.DescribeTableResponse;
import com.lancedb.lance.namespace.model.DropNamespaceRequest;
import com.lancedb.lance.namespace.model.DropNamespaceResponse;
import com.lancedb.lance.namespace.model.ErrorResponse;
import com.lancedb.lance.namespace.model.ListNamespacesResponse;
+import com.lancedb.lance.namespace.model.RegisterTableRequest;
+import com.lancedb.lance.namespace.model.RegisterTableResponse;
import java.io.IOException;
import java.util.regex.Pattern;
import javax.servlet.http.HttpServletRequest;
@@ -42,6 +52,7 @@ import javax.ws.rs.core.Application;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
+import org.apache.gravitino.lance.common.ops.LanceTableOperations;
import org.apache.gravitino.lance.common.ops.NamespaceWrapper;
import org.apache.gravitino.rest.RESTUtils;
import org.glassfish.jersey.internal.inject.AbstractBinder;
@@ -66,6 +77,7 @@ public class TestLanceNamespaceOperations extends JerseyTest {
private static NamespaceWrapper namespaceWrapper =
mock(NamespaceWrapper.class);
private static
org.apache.gravitino.lance.common.ops.LanceNamespaceOperations namespaceOps =
mock(org.apache.gravitino.lance.common.ops.LanceNamespaceOperations.class);
+ private static LanceTableOperations tableOps =
mock(LanceTableOperations.class);
@Override
protected Application configure() {
@@ -78,6 +90,7 @@ public class TestLanceNamespaceOperations extends JerseyTest {
ResourceConfig resourceConfig = new ResourceConfig();
resourceConfig.register(LanceNamespaceOperations.class);
+
resourceConfig.register(org.apache.gravitino.lance.service.rest.LanceTableOperations.class);
resourceConfig.register(
new AbstractBinder() {
@Override
@@ -93,6 +106,7 @@ public class TestLanceNamespaceOperations extends JerseyTest
{
@BeforeAll
public static void setup() {
when(namespaceWrapper.asNamespaceOps()).thenReturn(namespaceOps);
+ when(namespaceWrapper.asTableOps()).thenReturn(tableOps);
}
@Test
@@ -323,4 +337,291 @@ public class TestLanceNamespaceOperations extends
JerseyTest {
Assertions.assertEquals("Test exception", errorResp.getError());
Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp.getType());
}
+
+ @Test
+ void testCreateTable() {
+ String tableIds = "catalog.scheme.create_table";
+ String delimiter = ".";
+
+ // Test normal
+ CreateTableResponse createTableResponse = new CreateTableResponse();
+ when(tableOps.createTable(any(), any(), any(), any(), any(), any()))
+ .thenReturn(createTableResponse);
+
+ byte[] bytes = new byte[] {0x01, 0x02, 0x03};
+ Response resp =
+ target(String.format("/v1/table/%s/create", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(bytes, "application/vnd.apache.arrow.stream"));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ // Test illegal argument
+ when(tableOps.createTable(any(), any(), any(), any(), any(), any()))
+ .thenThrow(new IllegalArgumentException("Illegal argument"));
+
+ resp =
+ target(String.format("/v1/table/%s/create", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(bytes, "application/vnd.apache.arrow.stream"));
+ Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ // Test runtime exception
+ Mockito.reset(tableOps);
+ when(tableOps.createTable(any(), any(), any(), any(), any(), any()))
+ .thenThrow(new RuntimeException("Runtime exception"));
+ resp =
+ target(String.format("/v1/table/%s/create", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(bytes, "application/vnd.apache.arrow.stream"));
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ ErrorResponse errorResp = resp.readEntity(ErrorResponse.class);
+ Assertions.assertEquals("Runtime exception", errorResp.getError());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp.getType());
+ }
+
+ @Test
+ void testCreateEmptyTable() {
+ String tableIds = "catalog.scheme.create_empty_table";
+ String delimiter = ".";
+
+ // Test normal
+ CreateEmptyTableResponse createTableResponse = new
CreateEmptyTableResponse();
+ createTableResponse.setLocation("/path/to/table");
+ createTableResponse.setProperties(ImmutableMap.of("key", "value"));
+ when(tableOps.createEmptyTable(any(), any(), any(),
any())).thenReturn(createTableResponse);
+
+ CreateEmptyTableRequest tableRequest = new CreateEmptyTableRequest();
+ tableRequest.setLocation("/path/to/table");
+
+ Response resp =
+ target(String.format("/v1/table/%s/create-empty", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ CreateEmptyTableResponse response =
resp.readEntity(CreateEmptyTableResponse.class);
+ Assertions.assertEquals(createTableResponse.getLocation(),
response.getLocation());
+ Assertions.assertEquals(createTableResponse.getProperties(),
response.getProperties());
+
+ Mockito.reset(tableOps);
+ // Test illegal argument
+ when(tableOps.createEmptyTable(any(), any(), any(), any()))
+ .thenThrow(new IllegalArgumentException("Illegal argument"));
+
+ resp =
+ target(String.format("/v1/table/%s/create-empty", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+ Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ // Test runtime exception
+ Mockito.reset(tableOps);
+ when(tableOps.createEmptyTable(any(), any(), any(), any()))
+ .thenThrow(new RuntimeException("Runtime exception"));
+ resp =
+ target(String.format("/v1/table/%s/create-empty", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ ErrorResponse errorResp = resp.readEntity(ErrorResponse.class);
+ Assertions.assertEquals("Runtime exception", errorResp.getError());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp.getType());
+ }
+
+ @Test
+ void testRegisterTable() {
+ String tableIds = "catalog.scheme.register_table";
+ String delimiter = ".";
+
+ // Test normal
+ RegisterTableResponse registerTableResponse = new RegisterTableResponse();
+ registerTableResponse.setLocation("/path/to/registered_table");
+ registerTableResponse.setProperties(ImmutableMap.of("key", "value"));
+ when(tableOps.registerTable(any(), any(), any(),
any())).thenReturn(registerTableResponse);
+
+ RegisterTableRequest tableRequest = new RegisterTableRequest();
+ tableRequest.setLocation("/path/to/registered_table");
+ tableRequest.setMode(RegisterTableRequest.ModeEnum.CREATE);
+
+ Response resp =
+ target(String.format("/v1/table/%s/register", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ RegisterTableResponse response =
resp.readEntity(RegisterTableResponse.class);
+ Assertions.assertEquals(registerTableResponse.getLocation(),
response.getLocation());
+ Assertions.assertEquals(registerTableResponse.getProperties(),
response.getProperties());
+
+ // Test illegal argument
+ Mockito.reset(tableOps);
+ when(tableOps.registerTable(any(), any(), any(), any()))
+ .thenThrow(new IllegalArgumentException("Illegal argument"));
+ resp =
+ target(String.format("/v1/table/%s/register", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+ Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ // Test runtime exception
+ Mockito.reset(tableOps);
+ when(tableOps.registerTable(any(), any(), any(), any()))
+ .thenThrow(new RuntimeException("Runtime exception"));
+ resp =
+ target(String.format("/v1/table/%s/register", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ ErrorResponse errorResp = resp.readEntity(ErrorResponse.class);
+ Assertions.assertEquals("Runtime exception", errorResp.getError());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp.getType());
+ }
+
+ @Test
+ void testDeregisterTable() {
+ String tableIds = "catalog.scheme.deregister_table";
+ String delimiter = ".";
+
+ DeregisterTableRequest tableRequest = new DeregisterTableRequest();
+
+ DeregisterTableResponse deregisterTableResponse = new
DeregisterTableResponse();
+ deregisterTableResponse.setLocation("/path/to/deregistered_table");
+ deregisterTableResponse.setProperties(ImmutableMap.of("key", "value"));
+ // Test normal
+ when(tableOps.deregisterTable(any(),
any())).thenReturn(deregisterTableResponse);
+
+ Response resp =
+ target(String.format("/v1/table/%s/deregister", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ DeregisterTableResponse response =
resp.readEntity(DeregisterTableResponse.class);
+ Assertions.assertEquals(deregisterTableResponse.getLocation(),
response.getLocation());
+ Assertions.assertEquals(deregisterTableResponse.getProperties(),
response.getProperties());
+
+ // Test illegal argument
+ Mockito.reset(tableOps);
+ when(tableOps.deregisterTable(any(), any()))
+ .thenThrow(new IllegalArgumentException("Illegal argument"));
+ resp =
+ target(String.format("/v1/table/%s/deregister", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+ Assertions.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+
+ // Test not found exception
+ Mockito.reset(tableOps);
+ when(tableOps.deregisterTable(any(), any()))
+ .thenThrow(
+ LanceNamespaceException.notFound(
+ "Table not found", "NoSuchTableException", tableIds, ""));
+ resp =
+ target(String.format("/v1/table/%s/deregister", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+ Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
resp.getStatus());
+
+ // Test runtime exception
+ Mockito.reset(tableOps);
+ when(tableOps.deregisterTable(any(), any()))
+ .thenThrow(new RuntimeException("Runtime exception"));
+ resp =
+ target(String.format("/v1/table/%s/deregister", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ ErrorResponse errorResp = resp.readEntity(ErrorResponse.class);
+ Assertions.assertEquals("Runtime exception", errorResp.getError());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp.getType());
+ }
+
+ @Test
+ void testDescribeTable() {
+ String tableIds = "catalog.scheme.describe_table";
+ String delimiter = ".";
+
+ // Test normal
+ DescribeTableResponse createTableResponse = new DescribeTableResponse();
+ createTableResponse.setLocation("/path/to/describe_table");
+ createTableResponse.setProperties(ImmutableMap.of("key", "value"));
+ when(tableOps.describeTable(any(), any(),
any())).thenReturn(createTableResponse);
+
+ DescribeTableRequest tableRequest = new DescribeTableRequest();
+ Response resp =
+ target(String.format("/v1/table/%s/describe", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(Response.Status.OK.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ DescribeTableResponse response =
resp.readEntity(DescribeTableResponse.class);
+ Assertions.assertEquals(createTableResponse.getLocation(),
response.getLocation());
+ Assertions.assertEquals(createTableResponse.getProperties(),
response.getProperties());
+
+ // Test not found exception
+ Mockito.reset(tableOps);
+ when(tableOps.describeTable(any(), any(), any()))
+ .thenThrow(
+ LanceNamespaceException.notFound(
+ "Table not found", "NoSuchTableException", tableIds, ""));
+ resp =
+ target(String.format("/v1/table/%s/describe", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+ Assertions.assertEquals(Response.Status.NOT_FOUND.getStatusCode(),
resp.getStatus());
+
+ // Test runtime exception
+ Mockito.reset(tableOps);
+ when(tableOps.describeTable(any(), any(), any()))
+ .thenThrow(new RuntimeException("Runtime exception"));
+ resp =
+ target(String.format("/v1/table/%s/describe", tableIds))
+ .queryParam("delimiter", delimiter)
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(tableRequest,
MediaType.APPLICATION_JSON_TYPE));
+
+ Assertions.assertEquals(
+ Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
resp.getStatus());
+ Assertions.assertEquals(MediaType.APPLICATION_JSON_TYPE,
resp.getMediaType());
+ ErrorResponse errorResp = resp.readEntity(ErrorResponse.class);
+ Assertions.assertEquals("Runtime exception", errorResp.getError());
+ Assertions.assertEquals(RuntimeException.class.getSimpleName(),
errorResp.getType());
+ }
}
diff --git a/lance/lance-rest-server/src/test/resources/log4j2.properties
b/lance/lance-rest-server/src/test/resources/log4j2.properties
new file mode 100644
index 0000000000..b5db8a1ffc
--- /dev/null
+++ b/lance/lance-rest-server/src/test/resources/log4j2.properties
@@ -0,0 +1,73 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Set to debug or trace if log4j initialization is failing
+status = info
+
+# Name of the configuration
+name = ConsoleLogConfig
+
+# Console appender configuration
+appender.console.type = Console
+appender.console.name = consoleLogger
+appender.console.layout.type = PatternLayout
+appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p [%t] %c{1}:%L -
%m%n
+
+# Log files location
+property.logPath =
${sys:gravitino.log.path:-build/lance-rest-integration-test.log}
+
+# File appender configuration
+appender.file.type = File
+appender.file.name = fileLogger
+appender.file.fileName = ${logPath}
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %-5p %c - %m%n
+
+# Root logger level
+rootLogger.level = info
+
+# Root logger referring to console and file appenders
+rootLogger.appenderRef.stdout.ref = consoleLogger
+rootLogger.appenderRef.file.ref = fileLogger
+
+# File appender configuration for testcontainers
+appender.testcontainersFile.type = File
+appender.testcontainersFile.name = testcontainersLogger
+appender.testcontainersFile.fileName = build/testcontainers.log
+appender.testcontainersFile.layout.type = PatternLayout
+appender.testcontainersFile.layout.pattern = %d{yyyy-MM-dd HH:mm:ss.SSS} [%t]
%-5p %c - %m%n
+
+# Logger for testcontainers
+logger.testcontainers.name = org.testcontainers
+logger.testcontainers.level = debug
+logger.testcontainers.additivity = false
+logger.testcontainers.appenderRef.file.ref = testcontainersLogger
+
+logger.tc.name = tc
+logger.tc.level = debug
+logger.tc.additivity = false
+logger.tc.appenderRef.file.ref = testcontainersLogger
+
+logger.docker.name = com.github.dockerjava
+logger.docker.level = warn
+logger.docker.additivity = false
+logger.docker.appenderRef.file.ref = testcontainersLogger
+
+logger.http.name =
com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.wire
+logger.http.level = off