This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch branch-lance-namepspace-dev
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-lance-namepspace-dev by
this push:
new 4a92762501 [#8919] improve(lance-table): Supports object store
configurations for Lance table (#8920)
4a92762501 is described below
commit 4a92762501a1c1b8e680746414d69c81cd1cda05
Author: mchades <[email protected]>
AuthorDate: Tue Oct 28 11:33:39 2025 +0800
[#8919] improve(lance-table): Supports object store configurations for
Lance table (#8920)
### What changes were proposed in this pull request?
- The properties with the prefix `lance.storage.` will be used for lance
table storage
### Why are the changes needed?
Fix: #8919
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
by hand
---
.../GenericLakehouseCatalogOperations.java | 67 ++++++++++++++++------
.../GenericLakehouseCatalogPropertiesMetadata.java | 10 +++-
.../GenericLakehouseSchemaPropertiesMetadata.java | 10 +++-
.../GenericLakehouseTablePropertiesMetadata.java | 30 ++++++++--
.../lakehouse/lance/LanceCatalogOperations.java | 14 ++++-
5 files changed, 104 insertions(+), 27 deletions(-)
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
index 0f85532e8c..8e823a18a9 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogOperations.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.catalog.lakehouse;
import static org.apache.gravitino.Entity.EntityType.TABLE;
+import static
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LOCATION;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
@@ -26,6 +27,8 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Entity;
@@ -67,15 +70,14 @@ public class GenericLakehouseCatalogOperations
private static final String SLASH = "/";
private final ManagedSchemaOperations managedSchemaOps;
-
- @SuppressWarnings("unused") // todo: remove this after implementing table
operations
- private Optional<Path> catalogLakehouseDir;
-
private static final Map<String, LakehouseCatalogOperations>
SUPPORTED_FORMATS =
Maps.newHashMap();
+ private Optional<Path> catalogLakehouseDir;
+ private Map<String, String> catalogConfig;
private CatalogInfo catalogInfo;
private HasPropertyMetadata propertiesMetadata;
+
/**
* Initializes the generic lakehouse catalog operations with the provided
configuration.
*
@@ -97,6 +99,9 @@ public class GenericLakehouseCatalogOperations
StringUtils.isNotBlank(catalogDir)
?
Optional.of(catalogDir).map(this::ensureTrailingSlash).map(Path::new)
: Optional.empty();
+ this.catalogConfig = conf;
+ this.catalogInfo = info;
+ this.propertiesMetadata = propertiesMetadata;
}
public GenericLakehouseCatalogOperations() {
@@ -193,11 +198,15 @@ public class GenericLakehouseCatalogOperations
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
- String format = properties.getOrDefault("format", "lance");
- String tableLocation = calculateTableLocation(ident, properties);
+ Schema schema = loadSchema(NameIdentifier.of(ident.namespace().levels()));
+ String tableLocation = calculateTableLocation(schema, ident, properties);
+ Map<String, String> tableStorageProps = calculateTableStorageProps(schema,
properties);
+
Map<String, String> newProperties = Maps.newHashMap(properties);
- newProperties.put("location", tableLocation);
+ newProperties.put(LOCATION, tableLocation);
+ newProperties.putAll(tableStorageProps);
+ String format = properties.getOrDefault("format", "lance");
LakehouseCatalogOperations lakehouseCatalogOperations =
SUPPORTED_FORMATS.compute(
format,
@@ -212,22 +221,13 @@ public class GenericLakehouseCatalogOperations
}
private String calculateTableLocation(
- NameIdentifier tableIdent, Map<String, String> tableProperties) {
- String tableLocation = tableProperties.get("location");
+ Schema schema, NameIdentifier tableIdent, Map<String, String>
tableProperties) {
+ String tableLocation = tableProperties.get(LOCATION);
if (StringUtils.isNotBlank(tableLocation)) {
return ensureTrailingSlash(tableLocation);
}
- String schemaLocation;
- try {
- Schema schema =
loadSchema(NameIdentifier.of(tableIdent.namespace().levels()));
- schemaLocation = schema.properties().get("location");
- } catch (NoSuchSchemaException e) {
- throw new RuntimeException(
- String.format(
- "Failed to load schema for table %s to determine default
location.", tableIdent),
- e);
- }
+ String schemaLocation = schema.properties() == null ? null :
schema.properties().get(LOCATION);
// If we do not set location in table properties, and schema location is
set, use schema
// location
@@ -323,4 +323,33 @@ public class GenericLakehouseCatalogOperations
operations.initialize(properties, catalogInfo, propertiesMetadata);
return operations;
}
+
+ /**
+ * Calculate the table storage properties by merging catalog config, schema
properties and table
+ * properties. The precedence is: table properties > schema properties >
catalog config.
+ *
+ * @param schema The schema of the table.
+ * @param tableProps The table properties.
+ * @return The merged table storage properties.
+ */
+ private Map<String, String> calculateTableStorageProps(
+ Schema schema, Map<String, String> tableProps) {
+ Map<String, String> storageProps =
getLanceTableStorageOptions(catalogConfig);
+ storageProps.putAll(getLanceTableStorageOptions(schema.properties()));
+ storageProps.putAll(getLanceTableStorageOptions(tableProps));
+ return storageProps;
+ }
+
+ private Map<String, String> getLanceTableStorageOptions(Map<String, String>
properties) {
+ if (MapUtils.isEmpty(properties)) {
+ return Maps.newHashMap();
+ }
+ return properties.entrySet().stream()
+ .filter(
+ e ->
+ e.getKey()
+ .startsWith(
+
GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
index 01dfc1da17..e381558c32 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseCatalogPropertiesMetadata.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.catalog.lakehouse;
+import static
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;
import static
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
import com.google.common.collect.ImmutableList;
@@ -42,7 +43,14 @@ public class GenericLakehouseCatalogPropertiesMetadata
extends BaseCatalogProper
"The root directory of the lakehouse catalog.",
false /* immutable */,
null, /* defaultValue */
- false /* hidden */));
+ false /* hidden */),
+ PropertyEntry.stringOptionalPropertyPrefixEntry(
+ LANCE_TABLE_STORAGE_OPTION_PREFIX,
+ "The storage options passed to Lance table.",
+ false /* immutable */,
+ null /* default value*/,
+ false /* hidden */,
+ false /* reserved */));
PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries,
PropertyEntry::getName);
}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
index 52a65e7698..a6da0ac2de 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseSchemaPropertiesMetadata.java
@@ -18,6 +18,7 @@
*/
package org.apache.gravitino.catalog.lakehouse;
+import static
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;
import static
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
import com.google.common.collect.ImmutableList;
@@ -41,7 +42,14 @@ public class GenericLakehouseSchemaPropertiesMetadata
extends BasePropertiesMeta
"The root directory of the lakehouse schema.",
false /* immutable */,
null, /* defaultValue */
- false /* hidden */));
+ false /* hidden */),
+ PropertyEntry.stringOptionalPropertyPrefixEntry(
+ LANCE_TABLE_STORAGE_OPTION_PREFIX,
+ "The storage options passed to Lance table.",
+ false /* immutable */,
+ null /* default value*/,
+ false /* hidden */,
+ false /* reserved */));
PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries,
PropertyEntry::getName);
}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
index 362b10dbe4..e9a61a6b0f 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/GenericLakehouseTablePropertiesMetadata.java
@@ -18,21 +18,43 @@
*/
package org.apache.gravitino.catalog.lakehouse;
-import com.google.common.collect.ImmutableMap;
+import static
org.apache.gravitino.connector.PropertyEntry.stringOptionalPropertyEntry;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import java.util.List;
import java.util.Map;
import org.apache.gravitino.connector.BasePropertiesMetadata;
import org.apache.gravitino.connector.PropertyEntry;
public class GenericLakehouseTablePropertiesMetadata extends
BasePropertiesMetadata {
+ public static final String LOCATION = "location";
+ public static final String LANCE_TABLE_STORAGE_OPTION_PREFIX =
"lance.storage.";
- private static final Map<String, PropertyEntry<?>> propertiesMetadata;
+ private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;
static {
- propertiesMetadata = ImmutableMap.of();
+ List<PropertyEntry<?>> propertyEntries =
+ ImmutableList.of(
+ stringOptionalPropertyEntry(
+ LOCATION,
+ "The root directory of the lakehouse table.",
+ true /* immutable */,
+ null, /* defaultValue */
+ false /* hidden */),
+ PropertyEntry.stringOptionalPropertyPrefixEntry(
+ LANCE_TABLE_STORAGE_OPTION_PREFIX,
+ "The storage options passed to Lance table.",
+ false /* immutable */,
+ null /* default value*/,
+ false /* hidden */,
+ false /* reserved */));
+
+ PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries,
PropertyEntry::getName);
}
@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
- return propertiesMetadata;
+ return PROPERTIES_METADATA;
}
}
diff --git
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
index 342826a882..dcfe6bd489 100644
---
a/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
+++
b/catalogs/catalog-generic-lakehouse/src/main/java/org/apache/gravitino/catalog/lakehouse/lance/LanceCatalogOperations.java
@@ -19,6 +19,9 @@
package org.apache.gravitino.catalog.lakehouse.lance;
+import static
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LANCE_TABLE_STORAGE_OPTION_PREFIX;
+import static
org.apache.gravitino.catalog.lakehouse.GenericLakehouseTablePropertiesMetadata.LOCATION;
+
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.lancedb.lance.Dataset;
@@ -114,13 +117,20 @@ public class LanceCatalogOperations implements
LakehouseCatalogOperations {
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
// Ignore partitions, distributions, sortOrders, and indexes for Lance
tables;
- String location = properties.get("location");
+ String location = properties.get(LOCATION);
+ Map<String, String> storageProps =
+ properties.entrySet().stream()
+ .filter(e ->
e.getKey().startsWith(LANCE_TABLE_STORAGE_OPTION_PREFIX))
+ .collect(
+ Collectors.toMap(
+ e ->
e.getKey().substring(LANCE_TABLE_STORAGE_OPTION_PREFIX.length()),
+ Map.Entry::getValue));
try (Dataset dataset =
Dataset.create(
new RootAllocator(),
location,
convertColumnsToSchema(columns),
- new WriteParams.Builder().build())) {
+ new
WriteParams.Builder().withStorageOptions(storageProps).build())) {
GenericLakehouseTable.Builder builder = GenericLakehouseTable.builder();
return builder
.withName(ident.name())