This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 31e7c4b088 [#11103] feat(core): Add the hierarchy conversion layer
(#11074)
31e7c4b088 is described below
commit 31e7c4b088a7d94c1ca7e050fc2706ee6ccdea65
Author: roryqi <[email protected]>
AuthorDate: Mon May 18 17:30:14 2026 +0800
[#11103] feat(core): Add the hierarchy conversion layer (#11074)
### What changes were proposed in this pull request?
Add abstract class BasePOStorageOps to let all the PO logic to one class
Add
SchemaPOStorageOps,TablePOStorageOps,FunctionPOStorageOps,ViewPOStorageOps.
Add Hierachical convention class
### Why are the changes needed?
Fix: #11103
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added tests.
---
.../fileset/TestFilesetCatalogOperations.java | 2 +-
.../gravitino/SupportsRelationOperations.java | 28 ++
.../gravitino/storage/relational/JDBCBackend.java | 23 ++
.../storage/relational/RelationalEntityStore.java | 22 +-
.../relational/service/BasePOStorageOps.java | 78 +++++
.../relational/service/FunctionMetaService.java | 127 ++------
.../relational/service/FunctionPOStorageOps.java | 98 ++++++
.../HierarchicalConversionPOStorageOps.java | 193 ++++++++++++
.../relational/service/MetadataObjectService.java | 33 ++-
.../relational/service/POStorageReadRouting.java | 118 ++++++++
.../relational/service/SchemaMetaService.java | 222 +++++---------
.../relational/service/SchemaPOStorageOps.java | 112 +++++++
.../relational/service/TableMetaService.java | 139 ++-------
.../relational/service/TablePOStorageOps.java | 128 ++++++++
.../relational/service/ViewMetaService.java | 122 ++------
.../relational/service/ViewPOStorageOps.java | 120 ++++++++
.../relational/TestJDBCBackendBatchInsert.java | 170 +++++++++++
.../TestHierarchicalConversionPOStorageOps.java | 327 +++++++++++++++++++++
.../service/TestPOStorageReadRouting.java | 152 ++++++++++
.../relational/service/TestSchemaMetaService.java | 38 +--
20 files changed, 1764 insertions(+), 488 deletions(-)
diff --git
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
index a0dbce64a2..492cdcc38b 100644
---
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
+++
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
@@ -325,7 +325,7 @@ public class TestFilesetCatalogOperations {
MockedStatic<CatalogMetaService> catalogMetaServiceMockedStatic =
Mockito.mockStatic(CatalogMetaService.class);
MockedStatic<SchemaMetaService> schemaMetaServiceMockedStatic =
- Mockito.mockStatic(SchemaMetaService.class);
+ Mockito.mockStatic(SchemaMetaService.class,
Mockito.CALLS_REAL_METHODS);
metalakeMetaServiceMockedStatic
.when(MetalakeMetaService::getInstance)
diff --git
a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
index 854760060d..25c4fc60b1 100644
--- a/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
+++ b/core/src/main/java/org/apache/gravitino/SupportsRelationOperations.java
@@ -165,6 +165,34 @@ public interface SupportsRelationOperations {
boolean override)
throws IOException;
+ /**
+ * Batch inserts the same relation from many source entities to one
destination. Parameter order
+ * matches {@link #insertRelation}. All sources share one {@code srcType}.
Semantics match
+ * repeated {@link #insertRelation} with the same {@code relType}; a
relational backend may use
+ * fewer round-trips for some relation types (e.g. {@link Type#OWNER_REL}).
+ *
+ * @param relType the relation type (e.g. {@link Type#OWNER_REL})
+ * @param srcIdentifiers identifiers of the source side for each relation row
+ * @param srcType entity type shared by every identifier in {@code
srcIdentifiers}
+ * @param dstIdentifier destination entity identifier (shared by all rows)
+ * @param dstType destination entity type
+ * @param override if true, replace existing relations of each source entity
first, per {@link
+ * #insertRelation}
+ * @throws IOException if the storage operation fails
+ */
+ default void batchInsertRelations(
+ Type relType,
+ List<NameIdentifier> srcIdentifiers,
+ Entity.EntityType srcType,
+ NameIdentifier dstIdentifier,
+ Entity.EntityType dstType,
+ boolean override)
+ throws IOException {
+ for (NameIdentifier srcIdent : srcIdentifiers) {
+ insertRelation(relType, srcIdent, srcType, dstIdentifier, dstType,
override);
+ }
+ }
+
/**
* Updates the relations for a given entity by adding a set of new relations
and removing another
* set of relations.
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
index 22c7c98330..f401cd50be 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/JDBCBackend.java
@@ -733,6 +733,29 @@ public class JDBCBackend implements RelationalBackend {
}
}
+ @Override
+ public void batchInsertRelations(
+ Type relType,
+ List<NameIdentifier> srcIdentifiers,
+ Entity.EntityType srcType,
+ NameIdentifier dstIdentifier,
+ Entity.EntityType dstType,
+ boolean override)
+ throws IOException {
+ if (srcIdentifiers == null || srcIdentifiers.isEmpty()) {
+ return;
+ }
+ switch (relType) {
+ case OWNER_REL:
+ OwnerMetaService.getInstance()
+ .batchSetOwners(srcIdentifiers, srcType, dstIdentifier, dstType);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Doesn't support batch insert for the relation type
%s", relType));
+ }
+ }
+
@Override
public <E extends Entity & HasIdentifier> List<E> updateEntityRelations(
Type relType,
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
index d89c6c678d..27381659d6 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/RelationalEntityStore.java
@@ -55,7 +55,7 @@ import org.slf4j.LoggerFactory;
/**
* Relation store to store entities. This means we can store entities in a
relational store. I.e.,
* MySQL, PostgreSQL, etc. If you want to use a different backend, you can
implement the {@link
- * RelationalBackend} interface
+ * RelationalBackend} interface. The default JDBC backend is {@link
JDBCBackend}.
*/
public class RelationalEntityStore implements EntityStore,
SupportsRelationOperations {
private static final Logger LOGGER =
LoggerFactory.getLogger(RelationalEntityStore.class);
@@ -327,6 +327,26 @@ public class RelationalEntityStore implements EntityStore,
SupportsRelationOpera
cache.invalidate(dstIdentifier, dstType, relType);
}
+ @Override
+ public void batchInsertRelations(
+ Type relType,
+ List<NameIdentifier> srcIdentifiers,
+ Entity.EntityType srcType,
+ NameIdentifier dstIdentifier,
+ Entity.EntityType dstType,
+ boolean override)
+ throws IOException {
+ if (srcIdentifiers == null || srcIdentifiers.isEmpty()) {
+ return;
+ }
+ backend.batchInsertRelations(
+ relType, srcIdentifiers, srcType, dstIdentifier, dstType, override);
+ for (NameIdentifier ident : srcIdentifiers) {
+ cache.invalidate(ident, srcType, relType);
+ }
+ cache.invalidate(dstIdentifier, dstType, relType);
+ }
+
@Override
public <E extends Entity & HasIdentifier> List<E> updateEntityRelations(
Type relType,
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/BasePOStorageOps.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/BasePOStorageOps.java
new file mode 100644
index 0000000000..645b239963
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/BasePOStorageOps.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.util.List;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+
+public abstract class BasePOStorageOps<PO, Mapper> {
+ public void insertPO(Mapper mapper, PO po, boolean overwrite) {
+ throw new UnsupportedOperationException(
+ "insertPO is not supported by " + getClass().getSimpleName());
+ }
+
+ public void batchInsertPOs(Mapper mapper, List<PO> pos, boolean overwrite) {
+ throw new UnsupportedOperationException(
+ "batchInsertPOs is not supported by " + getClass().getSimpleName());
+ }
+
+ public Integer updatePO(Mapper mapper, PO newPO, PO oldPO) {
+ throw new UnsupportedOperationException(
+ "updatePO is not supported by " + getClass().getSimpleName());
+ }
+
+ /**
+ * When {@code true} and the entity-id cache is enabled, callers may resolve
rows by parent entity
+ * id plus short name via {@link #getPO} and {@link #listPOs}; see {@link
POStorageReadRouting}.
+ */
+ public boolean supportsParentIdRelationalRead() {
+ return false;
+ }
+
+ public PO getPO(Mapper mapper, Long parentId, String name) {
+ throw new UnsupportedOperationException(
+ "getPO by parent id is not supported by " +
getClass().getSimpleName());
+ }
+
+ public List<PO> listPOs(Mapper mapper, Long parentId) {
+ throw new UnsupportedOperationException(
+ "listPOs by parent id is not supported by " +
getClass().getSimpleName());
+ }
+
+ public List<PO> listPOs(Mapper mapper, Namespace namespace, List<String>
names) {
+ throw new UnsupportedOperationException(
+ "listPOs by namespace and names is not supported by " +
getClass().getSimpleName());
+ }
+
+ public List<PO> listPOs(Mapper mapper, List<Long> entityIds) {
+ throw new UnsupportedOperationException(
+ "listPOs by entityIds is not supported by " +
getClass().getSimpleName());
+ }
+
+ public PO getPOByFullName(Mapper mapper, NameIdentifier identifier) {
+ throw new UnsupportedOperationException(
+ "getPOByFullName is not supported by " + getClass().getSimpleName());
+ }
+
+ public List<PO> listPOsByNSFullName(Mapper mapper, Namespace namespace) {
+ throw new UnsupportedOperationException(
+ "listPOsByNSFullName is not supported by " +
getClass().getSimpleName());
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java
index 0377afed00..33532edad2 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionMetaService.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.gravitino.Entity;
-import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.HasIdentifier;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.NameIdentifier;
@@ -56,14 +55,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FunctionMetaService {
+ private static final Logger LOG =
LoggerFactory.getLogger(FunctionMetaService.class);
+ private static final FunctionMetaService INSTANCE = new
FunctionMetaService();
+ private BasePOStorageOps<FunctionPO, FunctionMetaMapper> ops;
+
public static FunctionMetaService getInstance() {
return INSTANCE;
}
- private static final Logger LOG =
LoggerFactory.getLogger(FunctionMetaService.class);
- private static final FunctionMetaService INSTANCE = new
FunctionMetaService();
-
- private FunctionMetaService() {}
+ private FunctionMetaService() {
+ this.ops = new HierarchicalConversionPOStorageOps<>(new
FunctionPOStorageOps());
+ }
@Monitored(
metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
@@ -87,18 +89,17 @@ public class FunctionMetaService {
metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
baseMetricName = "getFunctionIdBySchemaIdAndFunctionName")
public Long getFunctionIdBySchemaIdAndFunctionName(Long schemaId, String
functionName) {
- Long functionId =
+ FunctionPO functionPO =
SessionUtils.getWithoutCommit(
- FunctionMetaMapper.class,
- mapper ->
mapper.selectFunctionIdBySchemaIdAndFunctionName(schemaId, functionName));
+ FunctionMetaMapper.class, mapper -> ops.getPO(mapper, schemaId,
functionName));
- if (functionId == null) {
+ if (functionPO == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.FUNCTION.name().toLowerCase(Locale.ROOT),
functionName);
}
- return functionId;
+ return functionPO.functionId();
}
@Monitored(
@@ -115,14 +116,7 @@ public class FunctionMetaService {
SessionUtils.doMultipleWithCommit(
() ->
SessionUtils.doWithoutCommit(
- FunctionMetaMapper.class,
- mapper -> {
- if (overwrite) {
- mapper.insertFunctionMetaOnDuplicateKeyUpdate(po);
- } else {
- mapper.insertFunctionMeta(po);
- }
- }),
+ FunctionMetaMapper.class, mapper -> ops.insertPO(mapper, po,
overwrite)),
() ->
SessionUtils.doWithoutCommit(
FunctionVersionMetaMapper.class,
@@ -231,8 +225,18 @@ public class FunctionMetaService {
baseMetricName = "getFunctionPOByIdentifier")
FunctionPO getFunctionPOByIdentifier(NameIdentifier ident) {
NameIdentifierUtil.checkFunction(ident);
+ FunctionPO functionPO =
+ SessionUtils.getWithoutCommit(
+ FunctionMetaMapper.class,
+ mapper -> POStorageReadRouting.getPO(mapper, ident, ops,
Entity.EntityType.FUNCTION));
- return functionPOFetcher().apply(ident);
+ if (functionPO == null) {
+ throw new NoSuchEntityException(
+ NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+ Entity.EntityType.FUNCTION.name().toLowerCase(Locale.ROOT),
+ ident.name());
+ }
+ return functionPO;
}
@Monitored(
@@ -260,7 +264,7 @@ public class FunctionMetaService {
() ->
SessionUtils.doWithoutCommit(
FunctionMetaMapper.class,
- mapper -> mapper.updateFunctionMeta(newFunctionPO,
oldFunctionPO)));
+ mapper -> ops.updatePO(mapper, newFunctionPO,
oldFunctionPO)));
return newEntity;
} catch (RuntimeException re) {
@@ -270,89 +274,14 @@ public class FunctionMetaService {
}
}
- private Function<NameIdentifier, FunctionPO> functionPOFetcher() {
- return GravitinoEnv.getInstance().cacheEnabled()
- ? this::getFunctionPOBySchemaId
- : this::getFunctionPOByFullQualifiedName;
- }
-
- private FunctionPO getFunctionPOBySchemaId(NameIdentifier ident) {
- Long schemaId =
- EntityIdService.getEntityId(
- NameIdentifier.of(ident.namespace().levels()),
Entity.EntityType.SCHEMA);
-
- FunctionPO functionPO =
- SessionUtils.getWithoutCommit(
- FunctionMetaMapper.class,
- mapper -> mapper.selectFunctionMetaBySchemaIdAndName(schemaId,
ident.name()));
-
- if (functionPO == null) {
- throw new NoSuchEntityException(
- NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
- Entity.EntityType.FUNCTION.name().toLowerCase(Locale.ROOT),
- ident.toString());
- }
- return functionPO;
- }
-
- private FunctionPO getFunctionPOByFullQualifiedName(NameIdentifier ident) {
- String[] namespaceLevels = ident.namespace().levels();
- FunctionPO functionPO =
- SessionUtils.getWithoutCommit(
- FunctionMetaMapper.class,
- mapper ->
- mapper.selectFunctionMetaByFullQualifiedName(
- namespaceLevels[0], namespaceLevels[1],
namespaceLevels[2], ident.name()));
-
- if (functionPO == null || functionPO.functionId() == null) {
- throw new NoSuchEntityException(
- NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
- Entity.EntityType.FUNCTION.name().toLowerCase(Locale.ROOT),
- ident.name());
- }
-
- if (functionPO.schemaId() == null) {
- throw new NoSuchEntityException(
- NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
- Entity.EntityType.SCHEMA.name().toLowerCase(),
- namespaceLevels[2]);
- }
- return functionPO;
+ public BasePOStorageOps<FunctionPO, FunctionMetaMapper> ops() {
+ return ops;
}
private List<FunctionPO> listFunctionPOs(Namespace namespace) {
- return functionListFetcher().apply(namespace);
- }
-
- private List<FunctionPO> listFunctionPOsBySchemaId(Namespace namespace) {
- Long schemaId =
- EntityIdService.getEntityId(
- NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
return SessionUtils.getWithoutCommit(
- FunctionMetaMapper.class, mapper ->
mapper.listFunctionPOsBySchemaId(schemaId));
- }
-
- private Function<Namespace, List<FunctionPO>> functionListFetcher() {
- return GravitinoEnv.getInstance().cacheEnabled()
- ? this::listFunctionPOsBySchemaId
- : this::listFunctionPOsByFullQualifiedName;
- }
-
- private List<FunctionPO> listFunctionPOsByFullQualifiedName(Namespace
namespace) {
- String[] namespaceLevels = namespace.levels();
- List<FunctionPO> functionPOs =
- SessionUtils.getWithoutCommit(
- FunctionMetaMapper.class,
- mapper ->
- mapper.listFunctionPOsByFullQualifiedName(
- namespaceLevels[0], namespaceLevels[1],
namespaceLevels[2]));
- if (functionPOs.isEmpty() || functionPOs.get(0).schemaId() == null) {
- throw new NoSuchEntityException(
- NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
- Entity.EntityType.SCHEMA.name().toLowerCase(),
- namespaceLevels[2]);
- }
- return functionPOs.stream().filter(po -> po.functionId() !=
null).collect(Collectors.toList());
+ FunctionMetaMapper.class,
+ mapper -> POStorageReadRouting.listPOs(mapper, namespace, ops,
Entity.EntityType.FUNCTION));
}
private void fillFunctionPOBuilderParentEntityId(
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionPOStorageOps.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionPOStorageOps.java
new file mode 100644
index 0000000000..45f22a6b3e
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/FunctionPOStorageOps.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.storage.relational.mapper.FunctionMetaMapper;
+import org.apache.gravitino.storage.relational.po.FunctionPO;
+
+public class FunctionPOStorageOps extends BasePOStorageOps<FunctionPO,
FunctionMetaMapper> {
+
+ public FunctionPOStorageOps() {}
+
+ @Override
+ public void insertPO(FunctionMetaMapper mapper, FunctionPO functionPO,
boolean overwrite) {
+ if (overwrite) {
+ mapper.insertFunctionMetaOnDuplicateKeyUpdate(functionPO);
+ } else {
+ mapper.insertFunctionMeta(functionPO);
+ }
+ }
+
+ @Override
+ public Integer updatePO(FunctionMetaMapper mapper, FunctionPO newPO,
FunctionPO oldPO) {
+ return mapper.updateFunctionMeta(newPO, oldPO);
+ }
+
+ @Override
+ public FunctionPO getPO(FunctionMetaMapper mapper, Long parentId, String
name) {
+ return mapper.selectFunctionMetaBySchemaIdAndName(parentId, name);
+ }
+
+ @Override
+ public FunctionPO getPOByFullName(FunctionMetaMapper mapper, NameIdentifier
identifier) {
+ Namespace namespace = identifier.namespace();
+ return mapper.selectFunctionMetaByFullQualifiedName(
+ namespace.level(0), namespace.level(1), namespace.level(2),
identifier.name());
+ }
+
+ @Override
+ public List<FunctionPO> listPOs(FunctionMetaMapper mapper, Long parentId) {
+ return mapper.listFunctionPOsBySchemaId(parentId);
+ }
+
+ @Override
+ public List<FunctionPO> listPOs(FunctionMetaMapper mapper, List<Long>
entityIds) {
+ return mapper.listFunctionPOsByFunctionIds(entityIds);
+ }
+
+ @Override
+ public List<FunctionPO> listPOsByNSFullName(FunctionMetaMapper mapper,
Namespace namespace) {
+ List<FunctionPO> pos =
+ mapper.listFunctionPOsByFullQualifiedName(
+ namespace.level(0), namespace.level(1), namespace.level(2));
+ // INNER JOIN on metalake/catalog: an empty result means the metalake or
catalog does not exist.
+ if (pos.isEmpty()) {
+ throw new NoSuchEntityException(
+ NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+ Entity.EntityType.CATALOG.name().toLowerCase(),
+ namespace.level(1));
+ }
+ // LEFT JOIN on schema_meta: rows with non-null catalogId but null
schemaId mean the
+ // catalog exists but the schema does not.
+ if (pos.get(0).schemaId() == null) {
+ throw new NoSuchEntityException(
+ NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+ Entity.EntityType.SCHEMA.name().toLowerCase(),
+ namespace.level(2));
+ }
+ // LEFT JOIN on function_meta: filter out the placeholder row for a schema
without functions.
+ return pos.stream().filter(po -> po.functionId() !=
null).collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean supportsParentIdRelationalRead() {
+ return true;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/HierarchicalConversionPOStorageOps.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/HierarchicalConversionPOStorageOps.java
new file mode 100644
index 0000000000..35699c8377
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/HierarchicalConversionPOStorageOps.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.util.List;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.utils.HierarchicalSchemaUtil;
+
+/**
+ * Wraps a {@link BasePOStorageOps} to bridge the hierarchical schema naming
conversation.
+ * Identifiers and namespace segments in logical form (logical separator) are
translated to physical
+ * form (physical separator) before delegating. Two optional PO rewriters
allow callers to translate
+ * a PO field across the boundary: {@code physicalToLogicalRewriter} is
applied to POs returned from
+ * read methods (typically physical→logical), and {@code
logicalToPhysicalRewriter} is applied to
+ * POs passed into write methods (typically logical→physical) so the SQL still
receives storage-form
+ * values.
+ *
+ * @param <PO> persistent object type
+ * @param <Mapper> MyBatis mapper type
+ */
+public class HierarchicalConversionPOStorageOps<PO, Mapper> extends
BasePOStorageOps<PO, Mapper> {
+
+ private final BasePOStorageOps<PO, Mapper> delegate;
+ private final UnaryOperator<PO> physicalToLogicalRewriter;
+ private final UnaryOperator<PO> logicalToPhysicalRewriter;
+
+ public HierarchicalConversionPOStorageOps(BasePOStorageOps<PO, Mapper>
delegate) {
+ this(delegate, UnaryOperator.identity(), UnaryOperator.identity());
+ }
+
+ public HierarchicalConversionPOStorageOps(
+ BasePOStorageOps<PO, Mapper> delegate,
+ UnaryOperator<PO> physicalToLogicalRewriter,
+ UnaryOperator<PO> logicalToPhysicalRewriter) {
+ this.delegate = delegate;
+ this.physicalToLogicalRewriter = physicalToLogicalRewriter;
+ this.logicalToPhysicalRewriter = logicalToPhysicalRewriter;
+ }
+
+ @Override
+ public void insertPO(Mapper mapper, PO po, boolean overwrite) {
+ delegate.insertPO(mapper, logicalToPhysicalRewriter.apply(po), overwrite);
+ }
+
+ @Override
+ public void batchInsertPOs(Mapper mapper, List<PO> pos, boolean overwrite) {
+ delegate.batchInsertPOs(mapper, convertLogicalToPhysical(pos), overwrite);
+ }
+
+ @Override
+ public Integer updatePO(Mapper mapper, PO newPO, PO oldPO) {
+ return delegate.updatePO(
+ mapper, logicalToPhysicalRewriter.apply(newPO),
logicalToPhysicalRewriter.apply(oldPO));
+ }
+
+ @Override
+ public PO getPO(Mapper mapper, Long parentId, String name) {
+ return convertPhysicalToLogical(
+ delegate.getPO(mapper, parentId, toPhysicalIfHierarchical(name)));
+ }
+
+ @Override
+ public List<PO> listPOs(Mapper mapper, Long parentId) {
+ return convertPhysicalToLogical(delegate.listPOs(mapper, parentId));
+ }
+
+ @Override
+ public List<PO> listPOs(Mapper mapper, Namespace logicalNamespace,
List<String> names) {
+ Namespace physicalNamespace = logicalToPhysicalNamespace(logicalNamespace);
+ List<String> physicalNames =
+ names.stream()
+ .map(HierarchicalConversionPOStorageOps::toPhysicalIfHierarchical)
+ .collect(Collectors.toList());
+ return convertPhysicalToLogical(delegate.listPOs(mapper,
physicalNamespace, physicalNames));
+ }
+
+ @Override
+ public List<PO> listPOs(Mapper mapper, List<Long> entityIds) {
+ return convertPhysicalToLogical(delegate.listPOs(mapper, entityIds));
+ }
+
+ @Override
+ public PO getPOByFullName(Mapper mapper, NameIdentifier logical) {
+ NameIdentifier physical = logicalToPhysicalIdentifier(logical);
+ return convertPhysicalToLogical(delegate.getPOByFullName(mapper,
physical));
+ }
+
+ @Override
+ public List<PO> listPOsByNSFullName(Mapper mapper, Namespace logical) {
+ Namespace physical = logicalToPhysicalNamespace(logical);
+ return convertPhysicalToLogical(delegate.listPOsByNSFullName(mapper,
physical));
+ }
+
+ @Override
+ public boolean supportsParentIdRelationalRead() {
+ return delegate.supportsParentIdRelationalRead();
+ }
+
+ private PO convertPhysicalToLogical(PO po) {
+ return po == null ? null : physicalToLogicalRewriter.apply(po);
+ }
+
+ private List<PO> convertPhysicalToLogical(List<PO> pos) {
+ if (pos == null || pos.isEmpty()) {
+ return pos;
+ }
+ return
pos.stream().map(this::convertPhysicalToLogical).collect(Collectors.toList());
+ }
+
+ private List<PO> convertLogicalToPhysical(List<PO> pos) {
+ if (pos == null || pos.isEmpty()) {
+ return pos;
+ }
+ return
pos.stream().map(logicalToPhysicalRewriter).collect(Collectors.toList());
+ }
+
+ private static String toPhysicalIfHierarchical(String name) {
+ if (StringUtils.isBlank(name)) {
+ return name;
+ }
+ String sep = HierarchicalSchemaUtil.schemaSeparator();
+ if (!name.contains(sep)) {
+ return name;
+ }
+ return HierarchicalSchemaUtil.logicalToPhysical(name, sep);
+ }
+
+ private static NameIdentifier logicalToPhysicalIdentifier(NameIdentifier
logical) {
+ String[] levels = logical.namespace().levels();
+ if (levels.length == 2) {
+ String rawName = logical.name();
+ String physicalName =
+ StringUtils.isNotBlank(rawName)
+ ? HierarchicalSchemaUtil.logicalToPhysical(
+ rawName, HierarchicalSchemaUtil.schemaSeparator())
+ : rawName;
+ if (physicalName.equals(logical.name())) {
+ return logical;
+ }
+ return NameIdentifier.of(logical.namespace(), physicalName);
+ }
+ if (levels.length == 3) {
+ String rawSeg = levels[2];
+ String physicalSchema =
+ StringUtils.isNotBlank(rawSeg)
+ ? HierarchicalSchemaUtil.logicalToPhysical(
+ rawSeg, HierarchicalSchemaUtil.schemaSeparator())
+ : rawSeg;
+ if (physicalSchema.equals(levels[2])) {
+ return logical;
+ }
+ return NameIdentifier.of(Namespace.of(levels[0], levels[1],
physicalSchema), logical.name());
+ }
+ return logical;
+ }
+
+ private static Namespace logicalToPhysicalNamespace(Namespace logical) {
+ String[] levels = logical.levels();
+ if (levels.length != 3) {
+ return logical;
+ }
+ String rawSeg = levels[2];
+ String physicalSchema =
+ StringUtils.isNotBlank(rawSeg)
+ ? HierarchicalSchemaUtil.logicalToPhysical(
+ rawSeg, HierarchicalSchemaUtil.schemaSeparator())
+ : rawSeg;
+ if (physicalSchema.equals(levels[2])) {
+ return logical;
+ }
+ return Namespace.of(levels[0], levels[1], physicalSchema);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
index ee8f7ff8ad..634f63be01 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/MetadataObjectService.java
@@ -96,6 +96,14 @@ public class MetadataObjectService {
MetadataObjectService::getJobTemplateObjectsFullName)
.build();
+ static final Map<MetadataObject.Type, BasePOStorageOps<?, ?>>
TYPE_TO_STORAGE_OPS_MAP =
+ ImmutableMap.<MetadataObject.Type, BasePOStorageOps<?, ?>>builder()
+ .put(MetadataObject.Type.SCHEMA,
SchemaMetaService.getInstance().ops())
+ .put(MetadataObject.Type.TABLE, TableMetaService.getInstance().ops())
+ .put(MetadataObject.Type.VIEW, ViewMetaService.getInstance().ops())
+ .put(MetadataObject.Type.FUNCTION,
FunctionMetaService.getInstance().ops())
+ .build();
+
private static Map<Long, String> getPolicyObjectsFullName(List<Long>
policyIds) {
if (policyIds == null || policyIds.isEmpty()) {
return Maps.newHashMap();
@@ -345,9 +353,13 @@ public class MetadataObjectService {
return Maps.newHashMap();
}
+ @SuppressWarnings("unchecked")
+ BasePOStorageOps<FunctionPO, FunctionMetaMapper> ops =
+ (BasePOStorageOps<FunctionPO, FunctionMetaMapper>)
+ TYPE_TO_STORAGE_OPS_MAP.get(MetadataObject.Type.FUNCTION);
List<FunctionPO> functionPOs =
SessionUtils.getWithoutCommit(
- FunctionMetaMapper.class, mapper ->
mapper.listFunctionPOsByFunctionIds(functionIds));
+ FunctionMetaMapper.class, mapper -> ops.listPOs(mapper,
functionIds));
if (functionPOs == null || functionPOs.isEmpty()) {
return new HashMap<>();
@@ -395,9 +407,13 @@ public class MetadataObjectService {
return Maps.newHashMap();
}
+ @SuppressWarnings("unchecked")
+ BasePOStorageOps<TablePO, TableMetaMapper> ops =
+ (BasePOStorageOps<TablePO, TableMetaMapper>)
+ TYPE_TO_STORAGE_OPS_MAP.get(MetadataObject.Type.TABLE);
List<TablePO> tablePOs =
SessionUtils.getWithoutCommit(
- TableMetaMapper.class, mapper ->
mapper.listTablePOsByTableIds(tableIds));
+ TableMetaMapper.class, mapper -> ops.listPOs(mapper, tableIds));
if (tablePOs == null || tablePOs.isEmpty()) {
return Maps.newHashMap();
@@ -534,9 +550,12 @@ public class MetadataObjectService {
metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
baseMetricName = "getViewObjectsFullName")
public static Map<Long, String> getViewObjectsFullName(List<Long> viewIds) {
+ @SuppressWarnings("unchecked")
+ BasePOStorageOps<ViewPO, ViewMetaMapper> ops =
+ (BasePOStorageOps<ViewPO, ViewMetaMapper>)
+ TYPE_TO_STORAGE_OPS_MAP.get(MetadataObject.Type.VIEW);
List<ViewPO> viewPOs =
- SessionUtils.getWithoutCommit(
- ViewMetaMapper.class, mapper ->
mapper.listViewPOsByViewIds(viewIds));
+ SessionUtils.getWithoutCommit(ViewMetaMapper.class, mapper ->
ops.listPOs(mapper, viewIds));
if (viewPOs == null || viewPOs.isEmpty()) {
return new HashMap<>();
}
@@ -601,9 +620,13 @@ public class MetadataObjectService {
metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
baseMetricName = "getSchemaObjectsFullName")
public static Map<Long, String> getSchemaObjectsFullName(List<Long>
schemaIds) {
+ @SuppressWarnings("unchecked")
+ BasePOStorageOps<SchemaPO, SchemaMetaMapper> ops =
+ (BasePOStorageOps<SchemaPO, SchemaMetaMapper>)
+ TYPE_TO_STORAGE_OPS_MAP.get(MetadataObject.Type.SCHEMA);
List<SchemaPO> schemaPOs =
SessionUtils.getWithoutCommit(
- SchemaMetaMapper.class, mapper ->
mapper.listSchemaPOsBySchemaIds(schemaIds));
+ SchemaMetaMapper.class, mapper -> ops.listPOs(mapper, schemaIds));
if (schemaPOs == null || schemaPOs.isEmpty()) {
return new HashMap<>();
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/POStorageReadRouting.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/POStorageReadRouting.java
new file mode 100644
index 0000000000..3b41ecb59c
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/POStorageReadRouting.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.util.List;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+
+/**
+ * Routes relational PO reads between parent-id based SQL (when the entity-id
cache is enabled) and
+ * full qualified name SQL. Callers keep cache policy in one place instead of
embedding it in {@link
+ * BasePOStorageOps}.
+ */
+public final class POStorageReadRouting {
+
+ private POStorageReadRouting() {}
+
+ /**
+ * Loads a PO using parent-id based SQL when the entity-id cache is enabled
and {@code ops}
+ * supports that path; otherwise uses full qualified name SQL.
+ *
+ * @param mapper MyBatis mapper session
+ * @param identifier entity name identifier (logical naming when wrapped by
hierarchical ops)
+ * @param ops storage operations delegate
+ * @param entityType entity type for {@code identifier} (used to resolve the
parent id)
+ * @param <PO> persistent object type
+ * @param <Mapper> mapper type
+ * @return loaded PO or null when the delegate returns null
+ */
+ public static <PO, Mapper> PO getPO(
+ Mapper mapper,
+ NameIdentifier identifier,
+ BasePOStorageOps<PO, Mapper> ops,
+ Entity.EntityType entityType) {
+ return getPO(mapper, identifier, ops, entityType,
GravitinoEnv.getInstance().cacheEnabled());
+ }
+
+ /**
+ * Same as {@link #getPO} but uses an explicit cache flag (typically for
tests).
+ *
+ * @param cacheEnabled when true, prefer parent-id based reads when supported
+ */
+ public static <PO, Mapper> PO getPO(
+ Mapper mapper,
+ NameIdentifier identifier,
+ BasePOStorageOps<PO, Mapper> ops,
+ Entity.EntityType entityType,
+ boolean cacheEnabled) {
+ if (cacheEnabled && ops.supportsParentIdRelationalRead()) {
+ Long parentId =
+ EntityIdService.getEntityId(
+ NameIdentifier.parse(identifier.namespace().toString()),
+ NameIdentifierUtil.parentEntityType(entityType));
+ return ops.getPO(mapper, parentId, identifier.name());
+ }
+ return ops.getPOByFullName(mapper, identifier);
+ }
+
+ /**
+ * Lists POs under a namespace using parent-id based SQL when the entity-id
cache is enabled and
+ * {@code ops} supports that path; otherwise uses full qualified namespace
SQL.
+ *
+ * @param mapper MyBatis mapper session
+ * @param namespace parent namespace for the listed entities
+ * @param ops storage operations delegate
+ * @param entityType entity type stored under {@code namespace} (used to
resolve the parent id)
+ * @param <PO> persistent object type
+ * @param <Mapper> mapper type
+ * @return list from the delegate (may be empty)
+ */
+ public static <PO, Mapper> List<PO> listPOs(
+ Mapper mapper,
+ Namespace namespace,
+ BasePOStorageOps<PO, Mapper> ops,
+ Entity.EntityType entityType) {
+ return listPOs(mapper, namespace, ops, entityType,
GravitinoEnv.getInstance().cacheEnabled());
+ }
+
+ /**
+ * Same as {@link #listPOs} but uses an explicit cache flag (typically for
tests).
+ *
+ * @param cacheEnabled when true, prefer parent-id based reads when supported
+ */
+ public static <PO, Mapper> List<PO> listPOs(
+ Mapper mapper,
+ Namespace namespace,
+ BasePOStorageOps<PO, Mapper> ops,
+ Entity.EntityType entityType,
+ boolean cacheEnabled) {
+ if (cacheEnabled && ops.supportsParentIdRelationalRead()) {
+ Long parentId =
+ EntityIdService.getEntityId(
+ NameIdentifier.parse(namespace.toString()),
+ NameIdentifierUtil.parentEntityType(entityType));
+ return ops.listPOs(mapper, parentId);
+ }
+ return ops.listPOsByNSFullName(mapper, namespace);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
index c7edb6d26f..c8e6972834 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaMetaService.java
@@ -33,7 +33,6 @@ import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.gravitino.Entity;
-import org.apache.gravitino.Entity.EntityType;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.HasIdentifier;
import org.apache.gravitino.MetadataObject;
@@ -80,29 +79,18 @@ import org.apache.gravitino.utils.NamespaceUtil;
/** The service class for schema metadata. It provides the basic database
operations for schema. */
public class SchemaMetaService {
private static final SchemaMetaService INSTANCE = new SchemaMetaService();
+ private BasePOStorageOps<SchemaPO, SchemaMetaMapper> ops;
public static SchemaMetaService getInstance() {
return INSTANCE;
}
- private SchemaMetaService() {}
-
- @Monitored(
- metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
- baseMetricName = "getSchemaPOByCatalogIdAndName")
- public SchemaPO getSchemaPOByCatalogIdAndName(Long catalogId, String
schemaName) {
- SchemaPO schemaPO =
- SessionUtils.getWithoutCommit(
- SchemaMetaMapper.class,
- mapper -> mapper.selectSchemaMetaByCatalogIdAndName(catalogId,
schemaName));
-
- if (schemaPO == null) {
- throw new NoSuchEntityException(
- NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
- Entity.EntityType.SCHEMA.name().toLowerCase(),
- schemaName);
- }
- return schemaPO;
+ private SchemaMetaService() {
+ this.ops =
+ new HierarchicalConversionPOStorageOps<>(
+ new SchemaPOStorageOps(),
+ SchemaMetaService::physicalToLogicalSchemaPO,
+ SchemaMetaService::logicalToPhysicalSchemaPO);
}
@Monitored(
@@ -110,39 +98,21 @@ public class SchemaMetaService {
baseMetricName = "getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName")
public SchemaIds getSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
String metalakeName, String catalogName, String schemaName) {
- SchemaIds schemaIds =
+ NameIdentifier identifier = NameIdentifier.of(metalakeName, catalogName,
schemaName);
+ SchemaPO schemaPO =
SessionUtils.getWithoutCommit(
SchemaMetaMapper.class,
mapper ->
- mapper.selectSchemaIdByMetalakeNameAndCatalogNameAndSchemaName(
- metalakeName, catalogName, schemaName));
+ POStorageReadRouting.getPO(mapper, identifier, ops,
Entity.EntityType.SCHEMA));
- if (schemaIds == null) {
+ if (schemaPO == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.SCHEMA.name().toLowerCase(),
schemaName);
}
- return schemaIds;
- }
-
- @Monitored(
- metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
- baseMetricName = "getSchemaIdByCatalogIdAndName")
- public Long getSchemaIdByCatalogIdAndName(Long catalogId, String schemaName)
{
- Long schemaId =
- SessionUtils.getWithoutCommit(
- SchemaMetaMapper.class,
- mapper -> mapper.selectSchemaIdByCatalogIdAndName(catalogId,
schemaName));
-
- if (schemaId == null) {
- throw new NoSuchEntityException(
- NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
- Entity.EntityType.SCHEMA.name().toLowerCase(),
- schemaName);
- }
- return schemaId;
+ return new SchemaIds(schemaPO.getMetalakeId(), schemaPO.getCatalogId(),
schemaPO.getSchemaId());
}
@Monitored(
@@ -169,23 +139,24 @@ public class SchemaMetaService {
public void insertSchema(SchemaEntity schemaEntity, boolean overwrite)
throws IOException {
try {
NameIdentifierUtil.checkSchema(schemaEntity.nameIdentifier());
- // Callers above this service (e.g. JDBCBackend + naming bridge) pass
storage-form schema
- // names: nested paths use the internal physical separator, not the
external logical one.
- String physicalSep = HierarchicalSchemaUtil.physicalSeparator();
+ // SchemaEntity arrives in API/logical form (separator =
HierarchicalSchemaUtil
+ // .schemaSeparator()). We split here on the logical separator and build
ancestor rows in
+ // logical form. HierarchicalConversionPOStorageOps.batchInsertPOs
applies its write
+ // rewriter to translate each PO's name to storage form before SQL
execution.
+ String logicalSep = HierarchicalSchemaUtil.schemaSeparator();
String schemaName = schemaEntity.name();
List<SchemaEntity> rowsToInsert = new ArrayList<>();
- if (schemaName == null || !schemaName.contains(physicalSep)) {
+ if (schemaName == null || !schemaName.contains(logicalSep)) {
rowsToInsert.add(schemaEntity);
} else {
- // Segments of the storage-form name; e.g. [A, B, C] -> ancestor rows
"A", "A"+sep+"B", then
- // leaf.
- String[] parts = schemaName.split(Pattern.quote(physicalSep), -1);
+ // Segments of the logical name; e.g. "A:B:C" -> ancestor rows "A",
"A:B", then leaf.
+ String[] parts = schemaName.split(Pattern.quote(logicalSep), -1);
for (int nSeg = 1; nSeg < parts.length; nSeg++) {
- String ancestorPhysical = String.join(physicalSep,
Arrays.copyOf(parts, nSeg));
+ String ancestorLogical = String.join(logicalSep,
Arrays.copyOf(parts, nSeg));
SchemaEntity ancestor =
SchemaEntity.builder()
.withId(nextIdForNestedAncestor())
- .withName(ancestorPhysical)
+ .withName(ancestorLogical)
.withNamespace(schemaEntity.namespace())
.withComment(null)
.withProperties(Collections.emptyMap())
@@ -204,19 +175,16 @@ public class SchemaMetaService {
if (n > 1) {
SchemaEntity firstAncestor = rowsToInsert.get(0);
Namespace ancestorNs = firstAncestor.namespace();
- List<String> ancestorPhysicalNames =
+ List<String> ancestorNames =
rowsToInsert.subList(0, n - 1).stream()
.map(SchemaEntity::name)
.collect(Collectors.toList());
- Set<String> existingAncestorNames =
- mapper
- .batchSelectSchemaByIdentifier(
- ancestorNs.level(0), ancestorNs.level(1),
ancestorPhysicalNames)
- .stream()
+ Set<String> existingLogicalNames =
+ ops.listPOs(mapper, ancestorNs, ancestorNames).stream()
.map(SchemaPO::getSchemaName)
.collect(Collectors.toSet());
for (SchemaEntity row : rowsToInsert.subList(0, n - 1)) {
- if (existingAncestorNames.contains(row.name())) {
+ if (existingLogicalNames.contains(row.name())) {
continue;
}
SchemaPO.Builder builder = SchemaPO.builder();
@@ -230,11 +198,7 @@ public class SchemaMetaService {
SchemaPO leafPO =
POConverters.initializeSchemaPOWithVersion(leafRow, leafBuilder);
List<SchemaPO> schemaPosToInsert = new
ArrayList<>(missingAncestorPOs);
schemaPosToInsert.add(leafPO);
- if (overwrite) {
-
mapper.batchInsertSchemaMetaOnDuplicateKeyUpdate(schemaPosToInsert);
- } else {
- mapper.batchInsertSchemaMeta(schemaPosToInsert);
- }
+ ops.batchInsertPOs(mapper, schemaPosToInsert, overwrite);
});
} catch (RuntimeException re) {
ExceptionUtils.checkSQLException(
@@ -271,7 +235,8 @@ public class SchemaMetaService {
SessionUtils.getWithoutCommit(
SchemaMetaMapper.class,
mapper ->
- mapper.updateSchemaMeta(
+ ops.updatePO(
+ mapper,
POConverters.updateSchemaPOWithVersion(oldSchemaPO, newEntity),
oldSchemaPO))),
() -> {
@@ -494,96 +459,24 @@ public class SchemaMetaService {
private SchemaPO getSchemaPOByIdentifier(NameIdentifier identifier) {
NameIdentifierUtil.checkSchema(identifier);
- return schemaPOFetcher().apply(identifier);
- }
-
- private SchemaPO getSchemaByFullQualifiedName(
- String metalakeName, String catalogName, String schemaName) {
SchemaPO schemaPO =
SessionUtils.getWithoutCommit(
SchemaMetaMapper.class,
mapper ->
- mapper.selectSchemaByFullQualifiedName(metalakeName,
catalogName, schemaName));
+ POStorageReadRouting.getPO(mapper, identifier, ops,
Entity.EntityType.SCHEMA));
if (schemaPO == null) {
- throw new NoSuchEntityException(
- NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
- EntityType.CATALOG.name().toLowerCase(),
- schemaName);
- }
-
- return schemaPO;
- }
-
- private List<SchemaPO> listSchemaPOs(Namespace namespace) {
- return schemaListFetcher().apply(namespace);
- }
-
- private List<SchemaPO> listSchemaPOsByCatalogId(Namespace namespace) {
- Long catalogId =
- EntityIdService.getEntityId(
- NameIdentifier.of(namespace.levels()), Entity.EntityType.CATALOG);
-
- return SessionUtils.getWithoutCommit(
- SchemaMetaMapper.class, mapper ->
mapper.listSchemaPOsByCatalogId(catalogId));
- }
-
- private List<SchemaPO> listSchemaPOsByFullQualifiedName(Namespace namespace)
{
- String[] namespaceLevels = namespace.levels();
- List<SchemaPO> schemaPOs =
- SessionUtils.getWithoutCommit(
- SchemaMetaMapper.class,
- mapper ->
- mapper.listSchemaPOsByFullQualifiedName(namespaceLevels[0],
namespaceLevels[1]));
- if (schemaPOs.isEmpty() || schemaPOs.get(0).getCatalogId() == null) {
- throw new NoSuchEntityException(
- NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
- Entity.EntityType.CATALOG.name().toLowerCase(),
- namespaceLevels[1]);
- }
- return schemaPOs.stream().filter(po -> po.getSchemaId() !=
null).collect(Collectors.toList());
- }
-
- private SchemaPO getSchemaPOByCatalogId(NameIdentifier identifier) {
- Long catalogId =
- EntityIdService.getEntityId(
- NameIdentifier.of(identifier.namespace().levels()),
Entity.EntityType.CATALOG);
- return getSchemaPOByCatalogIdAndName(catalogId, identifier.name());
- }
-
- private SchemaPO getSchemaPOByFullQualifiedName(NameIdentifier identifier) {
- String[] namespaceLevels = identifier.namespace().levels();
- SchemaPO schemaPO =
- getSchemaByFullQualifiedName(namespaceLevels[0], namespaceLevels[1],
identifier.name());
-
- if (schemaPO.getCatalogId() == null) {
- throw new NoSuchEntityException(
- NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
- Entity.EntityType.CATALOG.name().toLowerCase(),
- namespaceLevels[1]);
- }
-
- if (schemaPO.getSchemaId() == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.SCHEMA.name().toLowerCase(),
identifier.name());
}
-
return schemaPO;
}
- private Function<Namespace, List<SchemaPO>> schemaListFetcher() {
- // If cache is enabled, we can use catalog id to fetch schemas faster or
else use full qualified
- // name to join several tables to get the schema list.
- return GravitinoEnv.getInstance().cacheEnabled()
- ? this::listSchemaPOsByCatalogId
- : this::listSchemaPOsByFullQualifiedName;
- }
-
- private Function<NameIdentifier, SchemaPO> schemaPOFetcher() {
- return GravitinoEnv.getInstance().cacheEnabled()
- ? this::getSchemaPOByCatalogId
- : this::getSchemaPOByFullQualifiedName;
+ private List<SchemaPO> listSchemaPOs(Namespace namespace) {
+ return SessionUtils.getWithoutCommit(
+ SchemaMetaMapper.class,
+ mapper -> POStorageReadRouting.listPOs(mapper, namespace, ops,
Entity.EntityType.SCHEMA));
}
private void fillSchemaPOBuilderParentEntityId(SchemaPO.Builder builder,
Namespace namespace) {
@@ -605,16 +498,22 @@ public class SchemaMetaService {
List<String> schemaNames =
identifiers.stream().map(NameIdentifier::name).collect(Collectors.toList());
- return SessionUtils.doWithCommitAndFetchResult(
+ return SessionUtils.getWithoutCommit(
SchemaMetaMapper.class,
mapper -> {
List<SchemaPO> schemaPOs =
- mapper.batchSelectSchemaByIdentifier(
- catalogIdent.namespace().level(0), catalogIdent.name(),
schemaNames);
+ ops.listPOs(
+ mapper,
+ Namespace.of(catalogIdent.namespace().levels()[0],
catalogIdent.name()),
+ schemaNames);
return POConverters.fromSchemaPOs(schemaPOs, firstIdent.namespace());
});
}
+ public BasePOStorageOps<SchemaPO, SchemaMetaMapper> ops() {
+ return ops;
+ }
+
private static long nextIdForNestedAncestor() {
IdGenerator generator = GravitinoEnv.getInstance().idGenerator();
if (generator == null) {
@@ -623,4 +522,39 @@ public class SchemaMetaService {
}
return generator.nextId();
}
+
+ private static SchemaPO physicalToLogicalSchemaPO(SchemaPO po) {
+ String name = po.getSchemaName();
+ if (name == null ||
!name.contains(HierarchicalSchemaUtil.physicalSeparator())) {
+ return po;
+ }
+ return copySchemaPOWithName(
+ po,
+ HierarchicalSchemaUtil.physicalToLogical(name,
HierarchicalSchemaUtil.schemaSeparator()));
+ }
+
+ private static SchemaPO logicalToPhysicalSchemaPO(SchemaPO po) {
+ String name = po.getSchemaName();
+ if (name == null ||
!name.contains(HierarchicalSchemaUtil.schemaSeparator())) {
+ return po;
+ }
+ return copySchemaPOWithName(
+ po,
+ HierarchicalSchemaUtil.logicalToPhysical(name,
HierarchicalSchemaUtil.schemaSeparator()));
+ }
+
+ private static SchemaPO copySchemaPOWithName(SchemaPO po, String name) {
+ return SchemaPO.builder()
+ .withSchemaId(po.getSchemaId())
+ .withSchemaName(name)
+ .withMetalakeId(po.getMetalakeId())
+ .withCatalogId(po.getCatalogId())
+ .withSchemaComment(po.getSchemaComment())
+ .withProperties(po.getProperties())
+ .withAuditInfo(po.getAuditInfo())
+ .withCurrentVersion(po.getCurrentVersion())
+ .withLastVersion(po.getLastVersion())
+ .withDeletedAt(po.getDeletedAt())
+ .build();
+ }
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaPOStorageOps.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaPOStorageOps.java
new file mode 100644
index 0000000000..7102c4976d
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/SchemaPOStorageOps.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.storage.relational.mapper.SchemaMetaMapper;
+import org.apache.gravitino.storage.relational.po.SchemaPO;
+
+public class SchemaPOStorageOps extends BasePOStorageOps<SchemaPO,
SchemaMetaMapper> {
+
+ public SchemaPOStorageOps() {}
+
+ @Override
+ public void batchInsertPOs(SchemaMetaMapper mapper, List<SchemaPO>
schemaPOs, boolean overwrite) {
+ if (overwrite) {
+ mapper.batchInsertSchemaMetaOnDuplicateKeyUpdate(schemaPOs);
+ } else {
+ mapper.batchInsertSchemaMeta(schemaPOs);
+ }
+ }
+
+ @Override
+ public Integer updatePO(SchemaMetaMapper mapper, SchemaPO oldPO, SchemaPO
newPO) {
+ return mapper.updateSchemaMeta(oldPO, newPO);
+ }
+
+ @Override
+ public SchemaPO getPO(SchemaMetaMapper mapper, Long parentId, String name) {
+ return mapper.selectSchemaMetaByCatalogIdAndName(parentId, name);
+ }
+
+ @Override
+ public SchemaPO getPOByFullName(SchemaMetaMapper mapper, NameIdentifier
identifier) {
+ Namespace namespace = identifier.namespace();
+ SchemaPO po =
+ mapper.selectSchemaByFullQualifiedName(
+ namespace.level(0), namespace.level(1), identifier.name());
+ // INNER JOIN on metalake/catalog: a null PO means the metalake or catalog
does not exist.
+ if (po == null) {
+ throw new NoSuchEntityException(
+ NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+ Entity.EntityType.CATALOG.name().toLowerCase(),
+ namespace.level(1));
+ }
+ // LEFT JOIN on schema_meta: a row with non-null catalogId but null
schemaId means
+ // the catalog exists but the schema does not.
+ if (po.getSchemaId() == null) {
+ return null;
+ }
+ return po;
+ }
+
+ @Override
+ public List<SchemaPO> listPOs(SchemaMetaMapper schemaMetaMapper, Long
parentId) {
+ return schemaMetaMapper.listSchemaPOsByCatalogId(parentId);
+ }
+
+ @Override
+ public List<SchemaPO> listPOs(
+ SchemaMetaMapper schemaMetaMapper, Namespace namespace, List<String>
names) {
+ return schemaMetaMapper.batchSelectSchemaByIdentifier(
+ namespace.level(0), namespace.level(1), names);
+ }
+
+ @Override
+ public List<SchemaPO> listPOs(SchemaMetaMapper schemaMetaMapper, List<Long>
entityIds) {
+ return schemaMetaMapper.listSchemaPOsBySchemaIds(entityIds);
+ }
+
+ @Override
+ public List<SchemaPO> listPOsByNSFullName(
+ SchemaMetaMapper schemaMetaMapper, Namespace namespace) {
+ List<SchemaPO> pos =
+ schemaMetaMapper.listSchemaPOsByFullQualifiedName(namespace.level(0),
namespace.level(1));
+ // An empty result means the parent metalake or catalog does not exist.
+ if (pos.isEmpty()) {
+ throw new NoSuchEntityException(
+ NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+ Entity.EntityType.CATALOG.name().toLowerCase(),
+ namespace.level(1));
+ }
+ // Same LEFT JOIN behavior as getPOByFullName: filter out the placeholder
row that
+ // represents an existing catalog without any matching schema.
+ return pos.stream().filter(po -> po.getSchemaId() !=
null).collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean supportsParentIdRelationalRead() {
+ return true;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
index 02c094a509..72391bc31b 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TableMetaService.java
@@ -28,10 +28,7 @@ import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
-import java.util.stream.Collectors;
import org.apache.gravitino.Entity;
-import org.apache.gravitino.Entity.EntityType;
-import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.HasIdentifier;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.NameIdentifier;
@@ -60,29 +57,31 @@ import org.apache.gravitino.utils.NamespaceUtil;
/** The service class for table metadata. It provides the basic database
operations for table. */
public class TableMetaService {
private static final TableMetaService INSTANCE = new TableMetaService();
+ private BasePOStorageOps<TablePO, TableMetaMapper> ops;
public static TableMetaService getInstance() {
return INSTANCE;
}
- private TableMetaService() {}
+ private TableMetaService() {
+ this.ops = new HierarchicalConversionPOStorageOps<>(new
TablePOStorageOps());
+ }
@Monitored(
metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
baseMetricName = "getTableIdBySchemaIdAndName")
public Long getTableIdBySchemaIdAndName(Long schemaId, String tableName) {
- Long tableId =
+ TablePO tablePO =
SessionUtils.getWithoutCommit(
- TableMetaMapper.class,
- mapper -> mapper.selectTableIdBySchemaIdAndName(schemaId,
tableName));
+ TableMetaMapper.class, mapper -> ops.getPO(mapper, schemaId,
tableName));
- if (tableId == null) {
+ if (tablePO == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.TABLE.name().toLowerCase(),
tableName);
}
- return tableId;
+ return tablePO.getTableId();
}
@Monitored(
@@ -124,11 +123,7 @@ public class TableMetaService {
TableMetaMapper.class,
mapper -> {
tablePORef.set(po);
- if (overwrite) {
- mapper.insertTableMetaOnDuplicateKeyUpdate(po);
- } else {
- mapper.insertTableMeta(po);
- }
+ ops.insertPO(mapper, po, overwrite);
}),
() ->
SessionUtils.doWithoutCommit(
@@ -204,7 +199,7 @@ public class TableMetaService {
updateResult.set(
SessionUtils.getWithoutCommit(
TableMetaMapper.class,
- mapper -> mapper.updateTableMeta(newTablePO, oldTablePO,
newSchemaId))),
+ mapper -> ops.updatePO(mapper, newTablePO, oldTablePO))),
() ->
SessionUtils.doWithoutCommit(
TableVersionMapper.class,
@@ -347,131 +342,47 @@ public class TableMetaService {
Objects.equals(schemaIdent,
NameIdentifierUtil.getSchemaIdentifier(ident)));
tableNames.add(ident.name());
}
- Long schemaId = EntityIdService.getEntityId(schemaIdent,
Entity.EntityType.SCHEMA);
return SessionUtils.doWithCommitAndFetchResult(
TableMetaMapper.class,
mapper -> {
- List<TablePO> tableList =
mapper.batchSelectTableByIdentifier(schemaId, tableNames);
+ List<TablePO> tableList = ops.listPOs(mapper,
firstIdent.namespace(), tableNames);
return POConverters.fromTablePOs(tableList, firstIdent.namespace());
});
}
- private void fillTablePOBuilderParentEntityId(TablePO.Builder builder,
Namespace namespace) {
- NamespaceUtil.checkTable(namespace);
- NamespacedEntityId namespacedEntityId =
- EntityIdService.getEntityIds(
- NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
- builder.withMetalakeId(namespacedEntityId.namespaceIds()[0]);
- builder.withCatalogId(namespacedEntityId.namespaceIds()[1]);
- builder.withSchemaId(namespacedEntityId.entityId());
+ public BasePOStorageOps<TablePO, TableMetaMapper> ops() {
+ return ops;
}
private TablePO getTablePOByIdentifier(NameIdentifier identifier) {
NameIdentifierUtil.checkTable(identifier);
-
- return tablePOFetcher().apply(identifier);
- }
-
- private TablePO getTablePOBySchemaIdAndName(Long schemaId, String tableName)
{
TablePO tablePO =
SessionUtils.getWithoutCommit(
TableMetaMapper.class,
- mapper -> mapper.selectTableMetaBySchemaIdAndName(schemaId,
tableName));
+ mapper -> POStorageReadRouting.getPO(mapper, identifier, ops,
Entity.EntityType.TABLE));
if (tablePO == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.TABLE.name().toLowerCase(),
- tableName);
- }
- return tablePO;
- }
-
- private TablePO getTableByFullQualifiedName(
- String metalakeName, String catalogName, String schemaName, String
tableName) {
- TablePO tablePO =
- SessionUtils.getWithoutCommit(
- TableMetaMapper.class,
- mapper ->
- mapper.selectTableByFullQualifiedName(
- metalakeName, catalogName, schemaName, tableName));
- if (tablePO == null) {
- throw new NoSuchEntityException(
- NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
- Entity.EntityType.TABLE.name().toLowerCase(),
- tableName);
+ identifier.name());
}
return tablePO;
}
private List<TablePO> listTablePOs(Namespace namespace) {
- return tableListFetcher().apply(namespace);
- }
-
- private List<TablePO> listTablePOsBySchemaId(Namespace namespace) {
- Long schemaId =
- EntityIdService.getEntityId(
- NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
return SessionUtils.getWithoutCommit(
- TableMetaMapper.class, mapper ->
mapper.listTablePOsBySchemaId(schemaId));
- }
-
- private List<TablePO> listTablePOsByFullQualifiedName(Namespace namespace) {
- String[] namespaceLevels = namespace.levels();
- List<TablePO> tablePOs =
- SessionUtils.getWithoutCommit(
- TableMetaMapper.class,
- mapper ->
- mapper.listTablePOsByFullQualifiedName(
- namespaceLevels[0], namespaceLevels[1],
namespaceLevels[2]));
- if (tablePOs.isEmpty() || tablePOs.get(0).getSchemaId() == null) {
- throw new NoSuchEntityException(
- NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
- EntityType.SCHEMA.name().toLowerCase(),
- namespaceLevels[2]);
- }
- return tablePOs.stream().filter(po -> po.getTableId() !=
null).collect(Collectors.toList());
- }
-
- private TablePO getTablePOBySchemaId(NameIdentifier identifier) {
- Long schemaId =
- EntityIdService.getEntityId(
- NameIdentifier.of(identifier.namespace().levels()),
Entity.EntityType.SCHEMA);
- return getTablePOBySchemaIdAndName(schemaId, identifier.name());
- }
-
- private TablePO getTablePOByFullQualifiedName(NameIdentifier identifier) {
- String[] namespaceLevels = identifier.namespace().levels();
- TablePO tablePO =
- getTableByFullQualifiedName(
- namespaceLevels[0], namespaceLevels[1], namespaceLevels[2],
identifier.name());
-
- if (tablePO.getSchemaId() == null) {
- throw new NoSuchEntityException(
- NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
- EntityType.SCHEMA.name().toLowerCase(),
- namespaceLevels[2]);
- }
-
- if (tablePO.getTableId() == null) {
- throw new NoSuchEntityException(
- NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
- EntityType.TABLE.name().toLowerCase(),
- identifier.name());
- }
-
- return tablePO;
- }
-
- private Function<Namespace, List<TablePO>> tableListFetcher() {
- return GravitinoEnv.getInstance().cacheEnabled()
- ? this::listTablePOsBySchemaId
- : this::listTablePOsByFullQualifiedName;
+ TableMetaMapper.class,
+ mapper -> POStorageReadRouting.listPOs(mapper, namespace, ops,
Entity.EntityType.TABLE));
}
- private Function<NameIdentifier, TablePO> tablePOFetcher() {
- return GravitinoEnv.getInstance().cacheEnabled()
- ? this::getTablePOBySchemaId
- : this::getTablePOByFullQualifiedName;
+ private void fillTablePOBuilderParentEntityId(TablePO.Builder builder,
Namespace namespace) {
+ NamespaceUtil.checkTable(namespace);
+ NamespacedEntityId namespacedEntityId =
+ EntityIdService.getEntityIds(
+ NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
+ builder.withMetalakeId(namespacedEntityId.namespaceIds()[0]);
+ builder.withCatalogId(namespacedEntityId.namespaceIds()[1]);
+ builder.withSchemaId(namespacedEntityId.entityId());
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/TablePOStorageOps.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TablePOStorageOps.java
new file mode 100644
index 0000000000..599a25fded
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/TablePOStorageOps.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.storage.relational.mapper.TableMetaMapper;
+import org.apache.gravitino.storage.relational.po.TablePO;
+
+public class TablePOStorageOps extends BasePOStorageOps<TablePO,
TableMetaMapper> {
+
+ public TablePOStorageOps() {}
+
+ @Override
+ public void insertPO(TableMetaMapper mapper, TablePO tablePO, boolean
overwrite) {
+ if (overwrite) {
+ mapper.insertTableMetaOnDuplicateKeyUpdate(tablePO);
+ } else {
+ mapper.insertTableMeta(tablePO);
+ }
+ }
+
+ @Override
+ public Integer updatePO(TableMetaMapper mapper, TablePO newPO, TablePO
oldPO) {
+ return mapper.updateTableMeta(newPO, oldPO, newPO.getSchemaId());
+ }
+
+ @Override
+ public TablePO getPO(TableMetaMapper mapper, Long parentId, String name) {
+ return mapper.selectTableMetaBySchemaIdAndName(parentId, name);
+ }
+
+ @Override
+ public TablePO getPOByFullName(TableMetaMapper mapper, NameIdentifier
identifier) {
+ Namespace namespace = identifier.namespace();
+ TablePO po =
+ mapper.selectTableByFullQualifiedName(
+ namespace.level(0), namespace.level(1), namespace.level(2),
identifier.name());
+ // INNER JOIN on metalake/catalog: a null PO means the metalake or catalog
does not exist.
+ if (po == null) {
+ throw new NoSuchEntityException(
+ NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+ Entity.EntityType.CATALOG.name().toLowerCase(),
+ namespace.level(1));
+ }
+ // LEFT JOIN on schema_meta: a row with non-null catalogId but null
schemaId means
+ // the catalog exists but the schema does not.
+ if (po.getSchemaId() == null) {
+ throw new NoSuchEntityException(
+ NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+ Entity.EntityType.SCHEMA.name().toLowerCase(),
+ namespace.level(2));
+ }
+ // LEFT JOIN on table_meta: a row with non-null schemaId but null tableId
means
+ // the schema exists but the table does not.
+ if (po.getTableId() == null) {
+ return null;
+ }
+ return po;
+ }
+
+ @Override
+ public List<TablePO> listPOs(TableMetaMapper mapper, Long parentId) {
+ return mapper.listTablePOsBySchemaId(parentId);
+ }
+
+ @Override
+ public List<TablePO> listPOs(TableMetaMapper mapper, Namespace namespace,
List<String> names) {
+ Long schemaId =
+ EntityIdService.getEntityId(
+ NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
+ return mapper.batchSelectTableByIdentifier(schemaId, names);
+ }
+
+ @Override
+ public List<TablePO> listPOs(TableMetaMapper mapper, List<Long> entityIds) {
+ return mapper.listTablePOsByTableIds(entityIds);
+ }
+
+ @Override
+ public List<TablePO> listPOsByNSFullName(TableMetaMapper mapper, Namespace
namespace) {
+ List<TablePO> pos =
+ mapper.listTablePOsByFullQualifiedName(
+ namespace.level(0), namespace.level(1), namespace.level(2));
+ // INNER JOIN on metalake/catalog: an empty result means the metalake or
catalog does not exist.
+ if (pos.isEmpty()) {
+ throw new NoSuchEntityException(
+ NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+ Entity.EntityType.CATALOG.name().toLowerCase(),
+ namespace.level(1));
+ }
+ // LEFT JOIN on schema_meta: rows with non-null catalogId but null
schemaId mean the
+ // catalog exists but the schema does not.
+ if (pos.get(0).getSchemaId() == null) {
+ throw new NoSuchEntityException(
+ NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+ Entity.EntityType.SCHEMA.name().toLowerCase(),
+ namespace.level(2));
+ }
+ // LEFT JOIN on table_meta: filter out the placeholder row for a schema
without tables.
+ return pos.stream().filter(po -> po.getTableId() !=
null).collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean supportsParentIdRelationalRead() {
+ return true;
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewMetaService.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewMetaService.java
index 177118c811..a305e05645 100644
---
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewMetaService.java
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewMetaService.java
@@ -31,8 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.gravitino.Entity;
-import org.apache.gravitino.Entity.EntityType;
-import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.HasIdentifier;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.NameIdentifier;
@@ -57,29 +55,31 @@ import org.apache.gravitino.utils.NamespaceUtil;
public class ViewMetaService {
private static final ViewMetaService INSTANCE = new ViewMetaService();
+ private BasePOStorageOps<ViewPO, ViewMetaMapper> ops;
public static ViewMetaService getInstance() {
return INSTANCE;
}
- private ViewMetaService() {}
+ private ViewMetaService() {
+ this.ops = new HierarchicalConversionPOStorageOps<>(new
ViewPOStorageOps());
+ }
@Monitored(
metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
baseMetricName = "getViewIdBySchemaIdAndName")
public Long getViewIdBySchemaIdAndName(Long schemaId, String viewName) {
- Long viewId =
+ ViewPO viewPO =
SessionUtils.getWithoutCommit(
- ViewMetaMapper.class,
- mapper -> mapper.selectViewIdBySchemaIdAndName(schemaId,
viewName));
+ ViewMetaMapper.class, mapper -> ops.getPO(mapper, schemaId,
viewName));
- if (viewId == null) {
+ if (viewPO == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.VIEW.name().toLowerCase(),
viewName);
}
- return viewId;
+ return viewPO.getViewId();
}
@Monitored(
@@ -87,7 +87,7 @@ public class ViewMetaService {
baseMetricName = "listViewsByNamespace")
public List<ViewEntity> listViewsByNamespace(Namespace namespace) {
NamespaceUtil.checkView(namespace);
- List<ViewPO> viewPOs = listViewPOsByNamespace(namespace);
+ List<ViewPO> viewPOs = listViewPOs(namespace);
return viewPOs.stream().map(po -> fromViewPO(po,
namespace)).collect(Collectors.toList());
}
@@ -95,8 +95,7 @@ public class ViewMetaService {
metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
baseMetricName = "getViewByIdentifier")
public ViewEntity getViewByIdentifier(NameIdentifier identifier) {
- NameIdentifierUtil.checkView(identifier);
- ViewPO viewPO = viewPOFetcher().apply(identifier);
+ ViewPO viewPO = getViewPOByIdentifier(identifier);
return fromViewPO(viewPO, identifier.namespace());
}
@@ -111,14 +110,7 @@ public class ViewMetaService {
SessionUtils.doMultipleWithCommit(
() ->
SessionUtils.doWithoutCommit(
- ViewMetaMapper.class,
- mapper -> {
- if (overwrite) {
- mapper.insertViewMetaOnDuplicateKeyUpdate(po);
- } else {
- mapper.insertViewMeta(po);
- }
- }),
+ ViewMetaMapper.class, mapper -> ops.insertPO(mapper, po,
overwrite)),
() ->
SessionUtils.doWithoutCommit(
ViewVersionInfoMapper.class,
@@ -141,9 +133,7 @@ public class ViewMetaService {
baseMetricName = "updateViewByIdentifier")
public <E extends Entity & HasIdentifier> ViewEntity updateView(
NameIdentifier ident, Function<E, E> updater) throws IOException {
- NameIdentifierUtil.checkView(ident);
-
- ViewPO oldViewPO = viewPOFetcher().apply(ident);
+ ViewPO oldViewPO = getViewPOByIdentifier(ident);
ViewEntity oldViewEntity = fromViewPO(oldViewPO, ident.namespace());
ViewEntity newEntity = (ViewEntity) updater.apply((E) oldViewEntity);
Preconditions.checkArgument(
@@ -171,7 +161,7 @@ public class ViewMetaService {
() -> {
updateResult.set(
SessionUtils.getWithoutCommit(
- ViewMetaMapper.class, mapper ->
mapper.updateViewMeta(newViewPO, oldViewPO)));
+ ViewMetaMapper.class, mapper -> ops.updatePO(mapper,
newViewPO, oldViewPO)));
if (updateResult.get() == 0) {
throw new RuntimeException("Failed to update the entity: " +
ident);
}
@@ -203,8 +193,7 @@ public class ViewMetaService {
metricsSource = GRAVITINO_RELATIONAL_STORE_METRIC_NAME,
baseMetricName = "deleteViewByIdentifier")
public boolean deleteView(NameIdentifier ident) {
- NameIdentifierUtil.checkView(ident);
- ViewPO viewPO = viewPOFetcher().apply(ident);
+ ViewPO viewPO = getViewPOByIdentifier(ident);
String metalakeName = ident.namespace().level(0);
String catalogName = ident.namespace().level(1);
String schemaName = ident.namespace().level(2);
@@ -231,6 +220,10 @@ public class ViewMetaService {
return versionDeletedCount + metaDeletedCount;
}
+ public BasePOStorageOps<ViewPO, ViewMetaMapper> ops() {
+ return ops;
+ }
+
private boolean deleteView(Long viewId, String metalakeName, String
viewFullName) {
AtomicInteger deleteResult = new AtomicInteger(0);
SessionUtils.doMultipleWithCommit(
@@ -287,88 +280,25 @@ public class ViewMetaService {
return buildViewPO(newEntity, builder, newVersion.intValue());
}
- private List<ViewPO> listViewPOsByNamespace(Namespace namespace) {
- return viewListFetcher().apply(namespace);
- }
-
- private Function<Namespace, List<ViewPO>> viewListFetcher() {
- return GravitinoEnv.getInstance().cacheEnabled()
- ? this::listViewPOsBySchemaId
- : this::listViewPOsByFullQualifiedName;
- }
-
- private Function<NameIdentifier, ViewPO> viewPOFetcher() {
- return GravitinoEnv.getInstance().cacheEnabled()
- ? this::getViewPOBySchemaId
- : this::getViewPOByFullQualifiedName;
- }
-
- private List<ViewPO> listViewPOsBySchemaId(Namespace namespace) {
- Long schemaId =
- EntityIdService.getEntityId(
- NameIdentifier.of(namespace.levels()), Entity.EntityType.SCHEMA);
- return SessionUtils.getWithoutCommit(
- ViewMetaMapper.class, mapper ->
mapper.listViewPOsBySchemaId(schemaId));
- }
-
- private List<ViewPO> listViewPOsByFullQualifiedName(Namespace namespace) {
- String[] namespaceLevels = namespace.levels();
- List<ViewPO> viewPOs =
- SessionUtils.getWithoutCommit(
- ViewMetaMapper.class,
- mapper ->
- mapper.listViewPOsByFullQualifiedName(
- namespaceLevels[0], namespaceLevels[1],
namespaceLevels[2]));
- if (viewPOs.isEmpty() || viewPOs.get(0).getSchemaId() == null) {
- throw new NoSuchEntityException(
- NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
- EntityType.SCHEMA.name().toLowerCase(),
- namespaceLevels[2]);
- }
- return viewPOs.stream().filter(po -> po.getViewId() !=
null).collect(Collectors.toList());
- }
-
- private ViewPO getViewPOBySchemaId(NameIdentifier identifier) {
- Long schemaId =
- EntityIdService.getEntityId(
- NameIdentifier.of(identifier.namespace().levels()),
Entity.EntityType.SCHEMA);
+ private ViewPO getViewPOByIdentifier(NameIdentifier identifier) {
+ NameIdentifierUtil.checkView(identifier);
ViewPO viewPO =
SessionUtils.getWithoutCommit(
ViewMetaMapper.class,
- mapper -> mapper.selectViewMetaBySchemaIdAndName(schemaId,
identifier.name()));
-
+ mapper -> POStorageReadRouting.getPO(mapper, identifier, ops,
Entity.EntityType.VIEW));
if (viewPO == null) {
throw new NoSuchEntityException(
NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
Entity.EntityType.VIEW.name().toLowerCase(),
identifier.name());
}
+
return viewPO;
}
- private ViewPO getViewPOByFullQualifiedName(NameIdentifier identifier) {
- String[] namespaceLevels = identifier.namespace().levels();
- ViewPO viewPO =
- SessionUtils.getWithoutCommit(
- ViewMetaMapper.class,
- mapper ->
- mapper.selectViewByFullQualifiedName(
- namespaceLevels[0], namespaceLevels[1],
namespaceLevels[2], identifier.name()));
-
- if (viewPO == null || viewPO.getSchemaId() == null) {
- throw new NoSuchEntityException(
- NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
- EntityType.SCHEMA.name().toLowerCase(),
- namespaceLevels[2]);
- }
-
- if (viewPO.getViewId() == null) {
- throw new NoSuchEntityException(
- NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
- EntityType.VIEW.name().toLowerCase(),
- identifier.name());
- }
-
- return viewPO;
+ private List<ViewPO> listViewPOs(Namespace namespace) {
+ return SessionUtils.getWithoutCommit(
+ ViewMetaMapper.class,
+ mapper -> POStorageReadRouting.listPOs(mapper, namespace, ops,
Entity.EntityType.VIEW));
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewPOStorageOps.java
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewPOStorageOps.java
new file mode 100644
index 0000000000..7294c50887
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/storage/relational/service/ViewPOStorageOps.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.exceptions.NoSuchEntityException;
+import org.apache.gravitino.storage.relational.mapper.ViewMetaMapper;
+import org.apache.gravitino.storage.relational.po.ViewPO;
+
+public class ViewPOStorageOps extends BasePOStorageOps<ViewPO, ViewMetaMapper>
{
+
+ public ViewPOStorageOps() {}
+
+ @Override
+ public void insertPO(ViewMetaMapper mapper, ViewPO viewPO, boolean
overwrite) {
+ if (overwrite) {
+ mapper.insertViewMetaOnDuplicateKeyUpdate(viewPO);
+ } else {
+ mapper.insertViewMeta(viewPO);
+ }
+ }
+
+ @Override
+ public Integer updatePO(ViewMetaMapper mapper, ViewPO newPO, ViewPO oldPO) {
+ return mapper.updateViewMeta(newPO, oldPO);
+ }
+
+ @Override
+ public ViewPO getPO(ViewMetaMapper mapper, Long parentId, String name) {
+ return mapper.selectViewMetaBySchemaIdAndName(parentId, name);
+ }
+
+ @Override
+ public ViewPO getPOByFullName(ViewMetaMapper mapper, NameIdentifier
identifier) {
+ Namespace namespace = identifier.namespace();
+ ViewPO po =
+ mapper.selectViewByFullQualifiedName(
+ namespace.level(0), namespace.level(1), namespace.level(2),
identifier.name());
+ // INNER JOIN on metalake/catalog: a null PO means the metalake or catalog
does not exist.
+ if (po == null) {
+ throw new NoSuchEntityException(
+ NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+ Entity.EntityType.CATALOG.name().toLowerCase(),
+ namespace.level(1));
+ }
+ // LEFT JOIN on schema_meta: a row with non-null catalogId but null
schemaId means
+ // the catalog exists but the schema does not.
+ if (po.getSchemaId() == null) {
+ throw new NoSuchEntityException(
+ NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+ Entity.EntityType.SCHEMA.name().toLowerCase(),
+ namespace.level(2));
+ }
+ // LEFT JOIN on view_meta: a row with non-null schemaId but null viewId
means
+ // the schema exists but the view does not.
+ if (po.getViewId() == null) {
+ return null;
+ }
+ return po;
+ }
+
+ @Override
+ public List<ViewPO> listPOs(ViewMetaMapper mapper, Long parentId) {
+ return mapper.listViewPOsBySchemaId(parentId);
+ }
+
+ @Override
+ public List<ViewPO> listPOs(ViewMetaMapper mapper, List<Long> entityIds) {
+ return mapper.listViewPOsByViewIds(entityIds);
+ }
+
+ @Override
+ public List<ViewPO> listPOsByNSFullName(ViewMetaMapper mapper, Namespace
namespace) {
+ List<ViewPO> pos =
+ mapper.listViewPOsByFullQualifiedName(
+ namespace.level(0), namespace.level(1), namespace.level(2));
+ // INNER JOIN on metalake/catalog: an empty result means the metalake or
catalog does not exist.
+ if (pos.isEmpty()) {
+ throw new NoSuchEntityException(
+ NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+ Entity.EntityType.CATALOG.name().toLowerCase(),
+ namespace.level(1));
+ }
+ // LEFT JOIN on schema_meta: rows with non-null catalogId but null
schemaId mean the
+ // catalog exists but the schema does not.
+ if (pos.get(0).getSchemaId() == null) {
+ throw new NoSuchEntityException(
+ NoSuchEntityException.NO_SUCH_ENTITY_MESSAGE,
+ Entity.EntityType.SCHEMA.name().toLowerCase(),
+ namespace.level(2));
+ }
+ // LEFT JOIN on view_meta: filter out the placeholder row for a schema
without views.
+ return pos.stream().filter(po -> po.getViewId() !=
null).collect(Collectors.toList());
+ }
+
+ @Override
+ public boolean supportsParentIdRelationalRead() {
+ return true;
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackendBatchInsert.java
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackendBatchInsert.java
new file mode 100644
index 0000000000..ca822541f7
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/TestJDBCBackendBatchInsert.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.storage.RandomIdGenerator;
+import org.apache.gravitino.utils.NamespaceUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+
+/**
+ * Tests schema insert paths that use relational {@code batchInsertSchemaMeta}
(single or batch).
+ */
+public class TestJDBCBackendBatchInsert extends TestJDBCBackend {
+
+ @TestTemplate
+ public void testBatchInsertSingleSchemaViaBackend() throws IOException {
+ String metalakeName = "metalake_batch_insert_single";
+ String catalogName = "catalog_batch_insert_single";
+ createAndInsertMakeLake(metalakeName);
+ createAndInsertCatalog(metalakeName, catalogName);
+
+ SchemaEntity schema =
+ createSchemaEntity(
+ RandomIdGenerator.INSTANCE.nextId(),
+ NamespaceUtil.ofSchema(metalakeName, catalogName),
+ "flat_schema_batch",
+ AUDIT_INFO);
+ backend.insert(schema, false);
+
+ SchemaEntity loaded =
+ (SchemaEntity)
+ backend.get(
+ NameIdentifier.of(metalakeName, catalogName,
"flat_schema_batch"),
+ Entity.EntityType.SCHEMA);
+ Assertions.assertEquals(schema.id(), loaded.id());
+ Assertions.assertEquals("flat_schema_batch", loaded.name());
+ Assertions.assertEquals(schema.namespace(), loaded.namespace());
+
+ List<SchemaEntity> listed =
+ backend.list(
+ NamespaceUtil.ofSchema(metalakeName, catalogName),
Entity.EntityType.SCHEMA, true);
+ Assertions.assertEquals(1, listed.size());
+ Assertions.assertEquals("flat_schema_batch", listed.get(0).name());
+ }
+
+ @TestTemplate
+ public void
testBatchInsertHierarchicalSchemaCreatesAncestorsAndLeafViaBackend()
+ throws IOException {
+ String metalakeName = "metalake_batch_insert_nested";
+ String catalogName = "catalog_batch_insert_nested";
+ createAndInsertMakeLake(metalakeName);
+ createAndInsertCatalog(metalakeName, catalogName);
+
+ String logicalLeaf = "ns_a:ns_b:leaf";
+ SchemaEntity hierarchical =
+ SchemaEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName(logicalLeaf)
+ .withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName))
+ .withComment("nested")
+ .withProperties(Collections.emptyMap())
+ .withAuditInfo(AUDIT_INFO)
+ .build();
+ backend.insert(hierarchical, false);
+
+ List<SchemaEntity> schemas =
+ backend.list(
+ NamespaceUtil.ofSchema(metalakeName, catalogName),
Entity.EntityType.SCHEMA, true);
+ Set<String> logicalNames =
schemas.stream().map(SchemaEntity::name).collect(Collectors.toSet());
+
+ Assertions.assertTrue(logicalNames.contains("ns_a"));
+ Assertions.assertTrue(logicalNames.contains("ns_a:ns_b"));
+ Assertions.assertTrue(logicalNames.contains(logicalLeaf));
+
+ SchemaEntity loaded =
+ (SchemaEntity)
+ backend.get(
+ NameIdentifier.of(metalakeName, catalogName, logicalLeaf),
+ Entity.EntityType.SCHEMA);
+ Assertions.assertEquals(logicalLeaf, loaded.name());
+ Assertions.assertEquals("nested", loaded.comment());
+ }
+
+ @TestTemplate
+ public void testBatchInsertHierarchicalSecondLeafReusesAncestorsViaBackend()
throws IOException {
+ String metalakeName = "metalake_batch_insert_reuse";
+ String catalogName = "catalog_batch_insert_reuse";
+ createAndInsertMakeLake(metalakeName);
+ createAndInsertCatalog(metalakeName, catalogName);
+
+ String leaf1 = "ns_a:ns_b:leaf1";
+ String leaf2 = "ns_a:ns_b:leaf2";
+ String ancestorA = "ns_a";
+ String ancestorAB = "ns_a:ns_b";
+
+ SchemaEntity first =
+ SchemaEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName(leaf1)
+ .withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName))
+ .withComment("first")
+ .withProperties(Collections.emptyMap())
+ .withAuditInfo(AUDIT_INFO)
+ .build();
+ backend.insert(first, false);
+
+ long idA =
+ ((SchemaEntity)
+ backend.get(
+ NameIdentifier.of(metalakeName, catalogName, ancestorA),
+ Entity.EntityType.SCHEMA))
+ .id();
+ long idAB =
+ ((SchemaEntity)
+ backend.get(
+ NameIdentifier.of(metalakeName, catalogName, ancestorAB),
+ Entity.EntityType.SCHEMA))
+ .id();
+
+ SchemaEntity second =
+ SchemaEntity.builder()
+ .withId(RandomIdGenerator.INSTANCE.nextId())
+ .withName(leaf2)
+ .withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName))
+ .withComment("second")
+ .withProperties(Collections.emptyMap())
+ .withAuditInfo(AUDIT_INFO)
+ .build();
+ backend.insert(second, false);
+
+ Assertions.assertEquals(
+ idA,
+ ((SchemaEntity)
+ backend.get(
+ NameIdentifier.of(metalakeName, catalogName, ancestorA),
+ Entity.EntityType.SCHEMA))
+ .id());
+ Assertions.assertEquals(
+ idAB,
+ ((SchemaEntity)
+ backend.get(
+ NameIdentifier.of(metalakeName, catalogName, ancestorAB),
+ Entity.EntityType.SCHEMA))
+ .id());
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestHierarchicalConversionPOStorageOps.java
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestHierarchicalConversionPOStorageOps.java
new file mode 100644
index 0000000000..ec69b86369
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestHierarchicalConversionPOStorageOps.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.UnaryOperator;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.utils.HierarchicalSchemaUtil;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestHierarchicalConversionPOStorageOps {
+
+ private static final String SEP = HierarchicalSchemaUtil.schemaSeparator();
+ private static final String PHYS =
HierarchicalSchemaUtil.physicalSeparator();
+
+ private BasePOStorageOps<String, Object> delegate;
+ private Object mapper;
+
+ @BeforeEach
+ @SuppressWarnings("unchecked")
+ public void setUp() {
+ delegate = mock(BasePOStorageOps.class);
+ mapper = new Object();
+ }
+
+ // ---------- Input name conversion ----------
+
+ @Test
+ public void getPOByParentIdConvertsHierarchicalName() {
+ when(delegate.getPO(eq(mapper), eq(7L), eq("ns_a" + PHYS +
"ns_b"))).thenReturn("found");
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(delegate);
+
+ String result = wrapper.getPO(mapper, 7L, "ns_a" + SEP + "ns_b");
+
+ assertEquals("found", result);
+ verify(delegate).getPO(mapper, 7L, "ns_a" + PHYS + "ns_b");
+ }
+
+ @Test
+ public void getPOByParentIdLeavesSimpleNameUnchanged() {
+ when(delegate.getPO(eq(mapper), eq(7L), eq("plain"))).thenReturn("po");
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(delegate);
+
+ String result = wrapper.getPO(mapper, 7L, "plain");
+
+ assertEquals("po", result);
+ verify(delegate).getPO(mapper, 7L, "plain");
+ }
+
+ @Test
+ public void getPOByParentIdReturnsNullWhenDelegateMisses() {
+ when(delegate.getPO(any(), any(Long.class),
any(String.class))).thenReturn(null);
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(
+ delegate, s -> s + "-rewritten", UnaryOperator.identity());
+
+ assertNull(wrapper.getPO(mapper, 1L, "missing"));
+ }
+
+ @Test
+ public void listPOsByNamespaceAndNamesConvertsEachHierarchicalName() {
+ List<String> names = Arrays.asList("plain", "ns_a" + SEP + "ns_b",
"other");
+ Namespace ns = Namespace.of("ml", "cat");
+ when(delegate.listPOs(eq(mapper), eq(ns), any(List.class)))
+ .thenReturn(Collections.singletonList("po"));
+
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(delegate);
+
+ wrapper.listPOs(mapper, ns, names);
+
+ ArgumentCaptor<List<String>> namesCaptor =
ArgumentCaptor.forClass(List.class);
+ verify(delegate).listPOs(eq(mapper), eq(ns), namesCaptor.capture());
+ assertEquals(Arrays.asList("plain", "ns_a" + PHYS + "ns_b", "other"),
namesCaptor.getValue());
+ }
+
+ // ---------- Identifier / namespace logical → physical translation
----------
+
+ @Test
+ public void getPOByFullNameConvertsSchemaIdentifierName() {
+ NameIdentifier ident = NameIdentifier.of(Namespace.of("ml", "cat"), "ns_a"
+ SEP + "ns_b");
+ when(delegate.getPOByFullName(any(),
any(NameIdentifier.class))).thenReturn("po");
+
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(delegate);
+
+ wrapper.getPOByFullName(mapper, ident);
+
+ ArgumentCaptor<NameIdentifier> captor =
ArgumentCaptor.forClass(NameIdentifier.class);
+ verify(delegate).getPOByFullName(eq(mapper), captor.capture());
+ NameIdentifier converted = captor.getValue();
+ assertEquals("ns_a" + PHYS + "ns_b", converted.name());
+ assertEquals(Namespace.of("ml", "cat"), converted.namespace());
+ }
+
+ @Test
+ public void getPOByFullNameConvertsSchemaSegmentForChildIdentifier() {
+ NameIdentifier ident =
+ NameIdentifier.of(Namespace.of("ml", "cat", "ns_a" + SEP + "ns_b"),
"tbl");
+ when(delegate.getPOByFullName(any(),
any(NameIdentifier.class))).thenReturn("po");
+
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(delegate);
+
+ wrapper.getPOByFullName(mapper, ident);
+
+ ArgumentCaptor<NameIdentifier> captor =
ArgumentCaptor.forClass(NameIdentifier.class);
+ verify(delegate).getPOByFullName(eq(mapper), captor.capture());
+ NameIdentifier converted = captor.getValue();
+ assertEquals("tbl", converted.name());
+ assertEquals(Namespace.of("ml", "cat", "ns_a" + PHYS + "ns_b"),
converted.namespace());
+ }
+
+ @Test
+ public void getPOByFullNamePassesThroughWhenNoSeparatorPresent() {
+ NameIdentifier ident = NameIdentifier.of(Namespace.of("ml", "cat",
"schema"), "tbl");
+ when(delegate.getPOByFullName(any(),
any(NameIdentifier.class))).thenReturn("po");
+
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(delegate);
+
+ wrapper.getPOByFullName(mapper, ident);
+
+ verify(delegate).getPOByFullName(mapper, ident);
+ }
+
+ @Test
+ public void listPOsByNSFullNameConvertsSchemaSegment() {
+ Namespace ns = Namespace.of("ml", "cat", "ns_a" + SEP + "ns_b");
+ when(delegate.listPOsByNSFullName(any(), any(Namespace.class)))
+ .thenReturn(Collections.singletonList("po"));
+
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(delegate);
+
+ wrapper.listPOsByNSFullName(mapper, ns);
+
+ ArgumentCaptor<Namespace> nsCaptor =
ArgumentCaptor.forClass(Namespace.class);
+ verify(delegate).listPOsByNSFullName(eq(mapper), nsCaptor.capture());
+ assertEquals(Namespace.of("ml", "cat", "ns_a" + PHYS + "ns_b"),
nsCaptor.getValue());
+ }
+
+ @Test
+ public void listPOsByNSFullNameLeavesShortNamespaceUnchanged() {
+ Namespace ns = Namespace.of("ml", "cat");
+ when(delegate.listPOsByNSFullName(any(), any(Namespace.class)))
+ .thenReturn(Collections.emptyList());
+
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(delegate);
+
+ wrapper.listPOsByNSFullName(mapper, ns);
+
+ verify(delegate).listPOsByNSFullName(mapper, ns);
+ }
+
+ // ---------- physicalToLogicalRewriter (read path) ----------
+
+ @Test
+ public void physicalToLogicalRewriterAppliedToGetPOByParentId() {
+ when(delegate.getPO(any(), any(Long.class),
any(String.class))).thenReturn("raw");
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(
+ delegate, s -> s + "-rewritten", UnaryOperator.identity());
+
+ assertEquals("raw-rewritten", wrapper.getPO(mapper, 1L, "plain"));
+ }
+
+ @Test
+ public void physicalToLogicalRewriterAppliedToListPOsByParentId() {
+ when(delegate.listPOs(any(),
any(Long.class))).thenReturn(Arrays.asList("a", "b"));
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(
+ delegate, s -> s.toUpperCase(), UnaryOperator.identity());
+
+ assertEquals(Arrays.asList("A", "B"), wrapper.listPOs(mapper, 1L));
+ }
+
+ @Test
+ public void physicalToLogicalRewriterAppliedToListPOsByIds() {
+ when(delegate.listPOs(any(),
any(List.class))).thenReturn(Arrays.asList("x", "y"));
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(delegate, s -> s + "!",
UnaryOperator.identity());
+
+ List<Long> ids = Arrays.asList(1L, 2L);
+ assertEquals(Arrays.asList("x!", "y!"), wrapper.listPOs(mapper, ids));
+ }
+
+ @Test
+ public void physicalToLogicalRewriterIsNotInvokedOnNullResult() {
+ when(delegate.getPO(any(), any(Long.class),
any(String.class))).thenReturn(null);
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(
+ delegate,
+ s -> {
+ throw new AssertionError("rewriter must not be invoked on null");
+ },
+ UnaryOperator.identity());
+
+ assertNull(wrapper.getPO(mapper, 1L, "x"));
+ }
+
+ @Test
+ public void physicalToLogicalRewriterIsNotInvokedOnEmptyList() {
+ when(delegate.listPOs(any(),
any(Long.class))).thenReturn(Collections.emptyList());
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(
+ delegate,
+ s -> {
+ throw new AssertionError("rewriter must not be invoked on empty
list");
+ },
+ UnaryOperator.identity());
+
+ assertEquals(Collections.emptyList(), wrapper.listPOs(mapper, 1L));
+ }
+
+ // ---------- logicalToPhysicalRewriter (write path) ----------
+
+ @Test
+ public void logicalToPhysicalRewriterAppliedToInsertPO() {
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(
+ delegate, UnaryOperator.identity(), s -> s + "-written");
+
+ wrapper.insertPO(mapper, "logical", true);
+
+ verify(delegate).insertPO(mapper, "logical-written", true);
+ }
+
+ @Test
+ public void logicalToPhysicalRewriterAppliedToBatchInsertPOs() {
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(delegate,
UnaryOperator.identity(), s -> s + "-W");
+
+ wrapper.batchInsertPOs(mapper, Arrays.asList("a", "b"), false);
+
+ ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);
+ verify(delegate).batchInsertPOs(eq(mapper), captor.capture(), eq(false));
+ assertEquals(Arrays.asList("a-W", "b-W"), captor.getValue());
+ }
+
+ @Test
+ public void logicalToPhysicalRewriterAppliedToBothPOsInUpdate() {
+ when(delegate.updatePO(any(), eq("new-W"), eq("old-W"))).thenReturn(1);
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(delegate,
UnaryOperator.identity(), s -> s + "-W");
+
+ assertEquals(1, wrapper.updatePO(mapper, "new", "old"));
+ verify(delegate).updatePO(mapper, "new-W", "old-W");
+ }
+
+ @Test
+ public void singleArgConstructorUsesIdentityRewritersForBothDirections() {
+ when(delegate.getPO(any(), any(Long.class),
any(String.class))).thenReturn("po");
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(delegate);
+
+ assertEquals("po", wrapper.getPO(mapper, 1L, "plain"));
+ wrapper.insertPO(mapper, "raw", false);
+ verify(delegate).insertPO(mapper, "raw", false);
+
+ wrapper.batchInsertPOs(mapper, Arrays.asList("a", "b"), true);
+ ArgumentCaptor<List<String>> captor = ArgumentCaptor.forClass(List.class);
+ verify(delegate).batchInsertPOs(eq(mapper), captor.capture(), eq(true));
+ assertEquals(Arrays.asList("a", "b"), captor.getValue());
+ }
+
+ // ---------- Delegation ----------
+
+ @Test
+ public void supportsParentIdRelationalReadAndEntityTypeDelegated() {
+ when(delegate.supportsParentIdRelationalRead()).thenReturn(true);
+
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(delegate);
+
+ assertTrue(wrapper.supportsParentIdRelationalRead());
+ }
+
+ @Test
+ public void batchInsertEmptyListIsForwarded() {
+ HierarchicalConversionPOStorageOps<String, Object> wrapper =
+ new HierarchicalConversionPOStorageOps<>(
+ delegate,
+ UnaryOperator.identity(),
+ s -> {
+ throw new AssertionError("rewriter must not be invoked on empty
list");
+ });
+
+ wrapper.batchInsertPOs(mapper, new ArrayList<>(), false);
+
+ verify(delegate).batchInsertPOs(eq(mapper), eq(Collections.emptyList()),
eq(false));
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPOStorageReadRouting.java
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPOStorageReadRouting.java
new file mode 100644
index 0000000000..b0e09a0780
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestPOStorageReadRouting.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.storage.relational.service;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mockStatic;
+
+import java.util.Collections;
+import java.util.List;
+import org.apache.gravitino.Entity;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+
+public class TestPOStorageReadRouting {
+
+ private static final Object MAPPER = new Object();
+
+ @Test
+ public void getPOUsesFullNameWhenCacheDisabled() {
+ RecordingOps ops = new RecordingOps(true);
+ NameIdentifier id = NameIdentifier.of("ml", "cat", "schema");
+ String result = POStorageReadRouting.getPO(MAPPER, id, ops,
Entity.EntityType.SCHEMA, false);
+ assertEquals("by-full", result);
+ assertFalse(ops.getPOByParentCalled);
+ assertTrue(ops.getPOByFullNameCalled);
+ }
+
+ @Test
+ public void getPOUsesFullNameWhenCacheEnabledButParentPathUnsupported() {
+ RecordingOps ops = new RecordingOps(false);
+ String result =
+ POStorageReadRouting.getPO(
+ MAPPER, NameIdentifier.of("ml", "cat", "schema"), ops,
Entity.EntityType.SCHEMA, true);
+ assertEquals("by-full", result);
+ assertFalse(ops.getPOByParentCalled);
+ assertTrue(ops.getPOByFullNameCalled);
+ }
+
+ @Test
+ public void getPOUsesParentIdWhenCacheEnabledAndSupported() {
+ try (MockedStatic<EntityIdService> entityIds =
mockStatic(EntityIdService.class)) {
+ entityIds
+ .when(() -> EntityIdService.getEntityId(any(),
eq(Entity.EntityType.CATALOG)))
+ .thenReturn(42L);
+ RecordingOps ops = new RecordingOps(true);
+ NameIdentifier id = NameIdentifier.of("ml", "cat", "schema");
+ String result = POStorageReadRouting.getPO(MAPPER, id, ops,
Entity.EntityType.SCHEMA, true);
+ assertEquals("by-parent", result);
+ assertTrue(ops.getPOByParentCalled);
+ assertFalse(ops.getPOByFullNameCalled);
+ assertEquals(Long.valueOf(42), ops.seenParentId);
+ assertEquals("schema", ops.seenShortName);
+ }
+ }
+
+ @Test
+ public void listPOsUsesFullNamespaceWhenCacheDisabled() {
+ RecordingOps ops = new RecordingOps(true);
+ Namespace ns = Namespace.of("ml", "cat", "sch");
+ List<String> result =
+ POStorageReadRouting.listPOs(MAPPER, ns, ops, Entity.EntityType.TABLE,
false);
+ assertTrue(result.isEmpty());
+ assertFalse(ops.listByParentCalled);
+ assertTrue(ops.listByFullNsCalled);
+ }
+
+ @Test
+ public void listPOsUsesParentIdWhenCacheEnabledAndSupported() {
+ try (MockedStatic<EntityIdService> entityIds =
mockStatic(EntityIdService.class)) {
+ entityIds
+ .when(() -> EntityIdService.getEntityId(any(),
eq(Entity.EntityType.SCHEMA)))
+ .thenReturn(7L);
+ RecordingOps ops = new RecordingOps(true);
+ Namespace ns = Namespace.of("ml", "cat", "sch");
+ List<String> out =
+ POStorageReadRouting.listPOs(MAPPER, ns, ops,
Entity.EntityType.TABLE, true);
+ assertEquals(Collections.singletonList("row"), out);
+ assertTrue(ops.listByParentCalled);
+ assertFalse(ops.listByFullNsCalled);
+ assertEquals(Long.valueOf(7), ops.seenListParentId);
+ }
+ }
+
+ private static final class RecordingOps extends BasePOStorageOps<String,
Object> {
+ private final boolean supportsParent;
+ boolean getPOByParentCalled;
+ boolean getPOByFullNameCalled;
+ boolean listByParentCalled;
+ boolean listByFullNsCalled;
+ Long seenParentId;
+ String seenShortName;
+ Long seenListParentId;
+
+ RecordingOps(boolean supportsParent) {
+ this.supportsParent = supportsParent;
+ }
+
+ @Override
+ public boolean supportsParentIdRelationalRead() {
+ return supportsParent;
+ }
+
+ @Override
+ public String getPO(Object mapper, Long parentId, String name) {
+ getPOByParentCalled = true;
+ seenParentId = parentId;
+ seenShortName = name;
+ return "by-parent";
+ }
+
+ @Override
+ public String getPOByFullName(Object mapper, NameIdentifier identifier) {
+ getPOByFullNameCalled = true;
+ return "by-full";
+ }
+
+ @Override
+ public List<String> listPOs(Object mapper, Long parentId) {
+ listByParentCalled = true;
+ seenListParentId = parentId;
+ return Collections.singletonList("row");
+ }
+
+ @Override
+ public List<String> listPOsByNSFullName(Object mapper, Namespace
namespace) {
+ listByFullNsCalled = true;
+ return Collections.emptyList();
+ }
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSchemaMetaService.java
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSchemaMetaService.java
index 2c19502caa..599c32886e 100644
---
a/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSchemaMetaService.java
+++
b/core/src/test/java/org/apache/gravitino/storage/relational/service/TestSchemaMetaService.java
@@ -24,11 +24,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.io.IOException;
import java.time.Instant;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
-import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityAlreadyExistsException;
@@ -38,7 +36,6 @@ import org.apache.gravitino.meta.SchemaEntity;
import org.apache.gravitino.meta.TopicEntity;
import org.apache.gravitino.storage.RandomIdGenerator;
import org.apache.gravitino.storage.relational.TestJDBCBackend;
-import org.apache.gravitino.utils.HierarchicalSchemaUtil;
import org.apache.gravitino.utils.NameIdentifierUtil;
import org.apache.gravitino.utils.NamespaceUtil;
import org.junit.jupiter.api.Assertions;
@@ -220,12 +217,10 @@ public class TestSchemaMetaService extends
TestJDBCBackend {
SchemaMetaService schemaMetaService = SchemaMetaService.getInstance();
String logicalLeaf = "ns_a:ns_b:leaf";
- String sep = HierarchicalSchemaUtil.schemaSeparator();
- String physicalLeaf =
HierarchicalSchemaUtil.logicalToPhysical(logicalLeaf, sep);
SchemaEntity hierarchical =
SchemaEntity.builder()
.withId(RandomIdGenerator.INSTANCE.nextId())
- .withName(physicalLeaf)
+ .withName(logicalLeaf)
.withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName))
.withComment("nested")
.withProperties(Collections.emptyMap())
@@ -235,17 +230,7 @@ public class TestSchemaMetaService extends TestJDBCBackend
{
List<SchemaEntity> schemas =
schemaMetaService.listSchemasByNamespace(NamespaceUtil.ofSchema(metalakeName,
catalogName));
- Set<String> logicalNames =
- schemas.stream()
- .map(SchemaEntity::name)
- .map(
- n -> {
- if (n != null &&
n.contains(HierarchicalSchemaUtil.physicalSeparator())) {
- return HierarchicalSchemaUtil.physicalToLogical(n, sep);
- }
- return n;
- })
- .collect(Collectors.toSet());
+ Set<String> logicalNames =
schemas.stream().map(SchemaEntity::name).collect(Collectors.toSet());
Assertions.assertTrue(logicalNames.contains("ns_a"));
Assertions.assertTrue(logicalNames.contains("ns_a:ns_b"));
@@ -253,8 +238,8 @@ public class TestSchemaMetaService extends TestJDBCBackend {
SchemaEntity loaded =
schemaMetaService.getSchemaByIdentifier(
- NameIdentifier.of(metalakeName, catalogName, physicalLeaf));
- Assertions.assertEquals(physicalLeaf, loaded.name());
+ NameIdentifier.of(metalakeName, catalogName, logicalLeaf));
+ Assertions.assertEquals(logicalLeaf, loaded.name());
Assertions.assertEquals("nested", loaded.comment());
}
@@ -264,18 +249,15 @@ public class TestSchemaMetaService extends
TestJDBCBackend {
createAndInsertCatalog(metalakeName, catalogName);
SchemaMetaService schemaMetaService = SchemaMetaService.getInstance();
- String sep = HierarchicalSchemaUtil.schemaSeparator();
- String physSep = HierarchicalSchemaUtil.physicalSeparator();
- String physicalLeaf1 =
HierarchicalSchemaUtil.logicalToPhysical("ns_a:ns_b:leaf1", sep);
- String physicalLeaf2 =
HierarchicalSchemaUtil.logicalToPhysical("ns_a:ns_b:leaf2", sep);
- String[] parts = physicalLeaf1.split(Pattern.quote(physSep), -1);
- String ancestorA = parts[0];
- String ancestorAB = String.join(physSep, Arrays.copyOfRange(parts, 0, 2));
+ String leaf1 = "ns_a:ns_b:leaf1";
+ String leaf2 = "ns_a:ns_b:leaf2";
+ String ancestorA = "ns_a";
+ String ancestorAB = "ns_a:ns_b";
SchemaEntity first =
SchemaEntity.builder()
.withId(RandomIdGenerator.INSTANCE.nextId())
- .withName(physicalLeaf1)
+ .withName(leaf1)
.withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName))
.withComment("first")
.withProperties(Collections.emptyMap())
@@ -295,7 +277,7 @@ public class TestSchemaMetaService extends TestJDBCBackend {
SchemaEntity second =
SchemaEntity.builder()
.withId(RandomIdGenerator.INSTANCE.nextId())
- .withName(physicalLeaf2)
+ .withName(leaf2)
.withNamespace(NamespaceUtil.ofSchema(metalakeName, catalogName))
.withComment("second")
.withProperties(Collections.emptyMap())