This is an automated email from the ASF dual-hosted git repository.
mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 1a22afee5 [#5794] feat(core): Add ModelOperationDispatcher logic
(#5908)
1a22afee5 is described below
commit 1a22afee5c54e4752816d26f8b0b4786cc0a520a
Author: Jerry Shao <[email protected]>
AuthorDate: Mon Dec 23 12:12:26 2024 +0800
[#5794] feat(core): Add ModelOperationDispatcher logic (#5908)
### What changes were proposed in this pull request?
This PR adds the ModelOperationDispatcher logic in core.
### Why are the changes needed?
This is a part of work to support model management.
Fix: #5794
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added UTs to test.
---
.../apache/gravitino/catalog/CatalogManager.java | 15 ++
.../gravitino/catalog/EntityCombinedFileset.java | 2 +-
.../gravitino/catalog/EntityCombinedModel.java | 94 ++++++++
.../catalog/EntityCombinedModelVersion.java | 101 ++++++++
.../gravitino/catalog/EntityCombinedSchema.java | 2 +-
.../gravitino/catalog/EntityCombinedTable.java | 2 +-
.../gravitino/catalog/EntityCombinedTopic.java | 2 +-
.../catalog/FilesetOperationDispatcher.java | 6 +-
.../catalog/ModelOperationDispatcher.java | 166 ++++++++++++-
.../catalog/SchemaOperationDispatcher.java | 18 +-
.../catalog/TableOperationDispatcher.java | 16 +-
.../catalog/TopicOperationDispatcher.java | 10 +-
.../java/org/apache/gravitino/TestCatalog.java | 5 +
.../test/java/org/apache/gravitino/TestModel.java | 44 ++++
.../org/apache/gravitino/TestModelVersion.java | 45 ++++
.../catalog/TestModelOperationDispatcher.java | 264 +++++++++++++++++++++
.../gravitino/connector/TestCatalogOperations.java | 248 ++++++++++++++++++-
17 files changed, 1000 insertions(+), 40 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
index 2e77b8e16..43bc74bb2 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
@@ -95,6 +95,7 @@ import org.apache.gravitino.messaging.TopicCatalog;
import org.apache.gravitino.meta.AuditInfo;
import org.apache.gravitino.meta.CatalogEntity;
import org.apache.gravitino.meta.SchemaEntity;
+import org.apache.gravitino.model.ModelCatalog;
import org.apache.gravitino.rel.SupportsPartitions;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
@@ -178,6 +179,16 @@ public class CatalogManager implements CatalogDispatcher,
Closeable {
});
}
+ public <R> R doWithModelOps(ThrowableFunction<ModelCatalog, R> fn) throws
Exception {
+ return classLoader.withClassLoader(
+ cl -> {
+ if (asModels() == null) {
+ throw new UnsupportedOperationException("Catalog does not
support model operations");
+ }
+ return fn.apply(asModels());
+ });
+ }
+
public <R> R doWithCatalogOps(ThrowableFunction<CatalogOperations, R> fn)
throws Exception {
return classLoader.withClassLoader(cl -> fn.apply(catalog.ops()));
}
@@ -236,6 +247,10 @@ public class CatalogManager implements CatalogDispatcher,
Closeable {
private TopicCatalog asTopics() {
return catalog.ops() instanceof TopicCatalog ? (TopicCatalog)
catalog.ops() : null;
}
+
+ private ModelCatalog asModels() {
+ return catalog.ops() instanceof ModelCatalog ? (ModelCatalog)
catalog.ops() : null;
+ }
}
private final Config config;
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java
index 2a6b55a2d..c7b847fc9 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedFileset.java
@@ -48,7 +48,7 @@ public final class EntityCombinedFileset implements Fileset {
return new EntityCombinedFileset(fileset, null);
}
- public EntityCombinedFileset withHiddenPropertiesSet(Set<String>
hiddenProperties) {
+ public EntityCombinedFileset withHiddenProperties(Set<String>
hiddenProperties) {
this.hiddenProperties = hiddenProperties;
return this;
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModel.java
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModel.java
new file mode 100644
index 000000000..4aeefa0be
--- /dev/null
+++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModel.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Audit;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.ModelEntity;
+import org.apache.gravitino.model.Model;
+
+public final class EntityCombinedModel implements Model {
+
+ private final Model model;
+
+ private final ModelEntity modelEntity;
+
+ private Set<String> hiddenProperties = Collections.emptySet();
+
+ private EntityCombinedModel(Model model, ModelEntity modelEntity) {
+ this.model = model;
+ this.modelEntity = modelEntity;
+ }
+
+ public static EntityCombinedModel of(Model model, ModelEntity modelEntity) {
+ return new EntityCombinedModel(model, modelEntity);
+ }
+
+ public static EntityCombinedModel of(Model model) {
+ return new EntityCombinedModel(model, null);
+ }
+
+ public EntityCombinedModel withHiddenProperties(Set<String>
hiddenProperties) {
+ this.hiddenProperties = hiddenProperties;
+ return this;
+ }
+
+ @Override
+ public String name() {
+ return model.name();
+ }
+
+ @Override
+ public String comment() {
+ return model.comment();
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return model.properties() == null
+ ? null
+ : model.properties().entrySet().stream()
+ .filter(e -> !hiddenProperties.contains(e.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ @Override
+ public int latestVersion() {
+ return model.latestVersion();
+ }
+
+ @Override
+ public Audit auditInfo() {
+ AuditInfo mergedAudit =
+ AuditInfo.builder()
+ .withCreator(model.auditInfo().creator())
+ .withCreateTime(model.auditInfo().createTime())
+ .withLastModifier(model.auditInfo().lastModifier())
+ .withLastModifiedTime(model.auditInfo().lastModifiedTime())
+ .build();
+
+ return modelEntity == null
+ ? mergedAudit
+ : mergedAudit.merge(modelEntity.auditInfo(), true /* overwrite */);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModelVersion.java
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModelVersion.java
new file mode 100644
index 000000000..b41e2889d
--- /dev/null
+++
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedModelVersion.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.gravitino.Audit;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.ModelVersionEntity;
+import org.apache.gravitino.model.ModelVersion;
+
+public final class EntityCombinedModelVersion implements ModelVersion {
+
+ private final ModelVersion modelVersion;
+
+ private final ModelVersionEntity modelVersionEntity;
+
+ private Set<String> hiddenProperties = Collections.emptySet();
+
+ private EntityCombinedModelVersion(
+ ModelVersion modelVersion, ModelVersionEntity modelVersionEntity) {
+ this.modelVersion = modelVersion;
+ this.modelVersionEntity = modelVersionEntity;
+ }
+
+ public static EntityCombinedModelVersion of(
+ ModelVersion modelVersion, ModelVersionEntity modelVersionEntity) {
+ return new EntityCombinedModelVersion(modelVersion, modelVersionEntity);
+ }
+
+ public static EntityCombinedModelVersion of(ModelVersion modelVersion) {
+ return new EntityCombinedModelVersion(modelVersion, null);
+ }
+
+ public EntityCombinedModelVersion withHiddenProperties(Set<String>
hiddenProperties) {
+ this.hiddenProperties = hiddenProperties;
+ return this;
+ }
+
+ @Override
+ public int version() {
+ return modelVersion.version();
+ }
+
+ @Override
+ public String comment() {
+ return modelVersion.comment();
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ return modelVersion.properties() == null
+ ? null
+ : modelVersion.properties().entrySet().stream()
+ .filter(e -> !hiddenProperties.contains(e.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ @Override
+ public String uri() {
+ return modelVersion.uri();
+ }
+
+ @Override
+ public String[] aliases() {
+ return modelVersion.aliases();
+ }
+
+ @Override
+ public Audit auditInfo() {
+ AuditInfo mergedAudit =
+ AuditInfo.builder()
+ .withCreator(modelVersion.auditInfo().creator())
+ .withCreateTime(modelVersion.auditInfo().createTime())
+ .withLastModifier(modelVersion.auditInfo().lastModifier())
+ .withLastModifiedTime(modelVersion.auditInfo().lastModifiedTime())
+ .build();
+
+ return modelVersionEntity == null
+ ? mergedAudit
+ : mergedAudit.merge(modelVersionEntity.auditInfo(), true /* overwrite
*/);
+ }
+}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedSchema.java
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedSchema.java
index 79a4b12a1..ce3d0a3be 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedSchema.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedSchema.java
@@ -61,7 +61,7 @@ public final class EntityCombinedSchema implements Schema {
return of(schema, null);
}
- public EntityCombinedSchema withHiddenPropertiesSet(Set<String>
hiddenProperties) {
+ public EntityCombinedSchema withHiddenProperties(Set<String>
hiddenProperties) {
this.hiddenProperties = hiddenProperties;
return this;
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java
index 4b0da1568..70cbd0ace 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTable.java
@@ -67,7 +67,7 @@ public final class EntityCombinedTable implements Table {
return new EntityCombinedTable(table, null);
}
- public EntityCombinedTable withHiddenPropertiesSet(Set<String>
hiddenProperties) {
+ public EntityCombinedTable withHiddenProperties(Set<String>
hiddenProperties) {
this.hiddenProperties = hiddenProperties;
return this;
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTopic.java
b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTopic.java
index 2360f31ae..972df622b 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTopic.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/EntityCombinedTopic.java
@@ -60,7 +60,7 @@ public class EntityCombinedTopic implements Topic {
return new EntityCombinedTopic(topic, null);
}
- public EntityCombinedTopic withHiddenPropertiesSet(Set<String>
hiddenProperties) {
+ public EntityCombinedTopic withHiddenProperties(Set<String>
hiddenProperties) {
this.hiddenProperties = hiddenProperties;
return this;
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
index 98c6311bd..828e98138 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
@@ -81,7 +81,7 @@ public class FilesetOperationDispatcher extends
OperationDispatcher implements F
// Currently we only support maintaining the Fileset in the Gravitino's
store.
return EntityCombinedFileset.of(fileset)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::filesetPropertiesMetadata,
@@ -137,7 +137,7 @@ public class FilesetOperationDispatcher extends
OperationDispatcher implements F
NoSuchSchemaException.class,
FilesetAlreadyExistsException.class);
return EntityCombinedFileset.of(createdFileset)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::filesetPropertiesMetadata,
@@ -172,7 +172,7 @@ public class FilesetOperationDispatcher extends
OperationDispatcher implements F
NoSuchFilesetException.class,
IllegalArgumentException.class);
return EntityCombinedFileset.of(alteredFileset)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::filesetPropertiesMetadata,
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/ModelOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/ModelOperationDispatcher.java
index eb1f17c96..1c5291d51 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/ModelOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/ModelOperationDispatcher.java
@@ -18,15 +18,23 @@
*/
package org.apache.gravitino.catalog;
+import static
org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;
+import static
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
+
import java.util.Map;
+import java.util.function.Supplier;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
+import org.apache.gravitino.StringIdentifier;
+import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.exceptions.ModelAlreadyExistsException;
import
org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException;
import org.apache.gravitino.exceptions.NoSuchModelException;
import org.apache.gravitino.exceptions.NoSuchModelVersionException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
+import org.apache.gravitino.lock.LockType;
+import org.apache.gravitino.lock.TreeLockUtils;
import org.apache.gravitino.model.Model;
import org.apache.gravitino.model.ModelVersion;
import org.apache.gravitino.storage.IdGenerator;
@@ -40,40 +48,114 @@ public class ModelOperationDispatcher extends
OperationDispatcher implements Mod
@Override
public NameIdentifier[] listModels(Namespace namespace) throws
NoSuchSchemaException {
- throw new UnsupportedOperationException("Not implemented");
+ return TreeLockUtils.doWithTreeLock(
+ NameIdentifier.of(namespace.levels()),
+ LockType.READ,
+ () ->
+ doWithCatalog(
+ getCatalogIdentifier(NameIdentifier.of(namespace.levels())),
+ c -> c.doWithModelOps(m -> m.listModels(namespace)),
+ NoSuchSchemaException.class));
}
@Override
public Model getModel(NameIdentifier ident) throws NoSuchModelException {
- throw new UnsupportedOperationException("Not implemented");
+ NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+ Model model =
+ TreeLockUtils.doWithTreeLock(
+ ident,
+ LockType.READ,
+ () ->
+ doWithCatalog(
+ catalogIdent,
+ c -> c.doWithModelOps(m -> m.getModel(ident)),
+ NoSuchModelException.class));
+
+ return EntityCombinedModel.of(model)
+ .withHiddenProperties(
+ getHiddenPropertyNames(
+ catalogIdent, HasPropertyMetadata::modelPropertiesMetadata,
model.properties()));
}
@Override
public Model registerModel(NameIdentifier ident, String comment, Map<String,
String> properties)
throws NoSuchModelException, ModelAlreadyExistsException {
- throw new UnsupportedOperationException("Not implemented");
+ NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+ Map<String, String> updatedProperties =
checkAndUpdateProperties(catalogIdent, properties);
+
+ Model registeredModel =
+ TreeLockUtils.doWithTreeLock(
+ NameIdentifier.of(ident.namespace().levels()),
+ LockType.WRITE,
+ () ->
+ doWithCatalog(
+ catalogIdent,
+ c -> c.doWithModelOps(m -> m.registerModel(ident, comment,
updatedProperties)),
+ NoSuchSchemaException.class,
+ ModelAlreadyExistsException.class));
+
+ return EntityCombinedModel.of(registeredModel)
+ .withHiddenProperties(
+ getHiddenPropertyNames(
+ catalogIdent,
+ HasPropertyMetadata::modelPropertiesMetadata,
+ registeredModel.properties()));
}
@Override
public boolean deleteModel(NameIdentifier ident) {
- throw new UnsupportedOperationException("Not implemented");
+ return TreeLockUtils.doWithTreeLock(
+ NameIdentifier.of(ident.namespace().levels()),
+ LockType.WRITE,
+ () ->
+ doWithCatalog(
+ getCatalogIdentifier(ident),
+ c -> c.doWithModelOps(m -> m.deleteModel(ident)),
+ RuntimeException.class));
}
@Override
public int[] listModelVersions(NameIdentifier ident) throws
NoSuchModelException {
- throw new UnsupportedOperationException("Not implemented");
+ return TreeLockUtils.doWithTreeLock(
+ ident,
+ LockType.READ,
+ () ->
+ doWithCatalog(
+ getCatalogIdentifier(ident),
+ c -> c.doWithModelOps(m -> m.listModelVersions(ident)),
+ NoSuchModelException.class));
}
@Override
public ModelVersion getModelVersion(NameIdentifier ident, int version)
throws NoSuchModelVersionException {
- throw new UnsupportedOperationException("Not implemented");
+ return internalGetModelVersion(
+ ident,
+ () ->
+ TreeLockUtils.doWithTreeLock(
+ ident,
+ LockType.READ,
+ () ->
+ doWithCatalog(
+ getCatalogIdentifier(ident),
+ c -> c.doWithModelOps(m -> m.getModelVersion(ident,
version)),
+ NoSuchModelVersionException.class)));
}
@Override
public ModelVersion getModelVersion(NameIdentifier ident, String alias)
throws NoSuchModelVersionException {
- throw new UnsupportedOperationException("Not implemented");
+ return internalGetModelVersion(
+ ident,
+ () ->
+ TreeLockUtils.doWithTreeLock(
+ ident,
+ LockType.READ,
+ () ->
+ doWithCatalog(
+ getCatalogIdentifier(ident),
+ c -> c.doWithModelOps(m -> m.getModelVersion(ident,
alias)),
+ NoSuchModelVersionException.class)));
}
@Override
@@ -84,16 +166,80 @@ public class ModelOperationDispatcher extends
OperationDispatcher implements Mod
String comment,
Map<String, String> properties)
throws NoSuchModelException, ModelVersionAliasesAlreadyExistException {
- throw new UnsupportedOperationException("Not implemented");
+ NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+ Map<String, String> updatedProperties =
checkAndUpdateProperties(catalogIdent, properties);
+
+ TreeLockUtils.doWithTreeLock(
+ ident,
+ LockType.WRITE,
+ () ->
+ doWithCatalog(
+ catalogIdent,
+ c ->
+ c.doWithModelOps(
+ m -> {
+ m.linkModelVersion(ident, uri, aliases, comment,
updatedProperties);
+ return null;
+ }),
+ NoSuchModelException.class,
+ ModelVersionAliasesAlreadyExistException.class));
}
@Override
public boolean deleteModelVersion(NameIdentifier ident, int version) {
- throw new UnsupportedOperationException("Not implemented");
+ return TreeLockUtils.doWithTreeLock(
+ ident,
+ LockType.WRITE,
+ () ->
+ doWithCatalog(
+ getCatalogIdentifier(ident),
+ c -> c.doWithModelOps(m -> m.deleteModelVersion(ident,
version)),
+ RuntimeException.class));
}
@Override
public boolean deleteModelVersion(NameIdentifier ident, String alias) {
- throw new UnsupportedOperationException("Not implemented");
+ return TreeLockUtils.doWithTreeLock(
+ ident,
+ LockType.WRITE,
+ () ->
+ doWithCatalog(
+ getCatalogIdentifier(ident),
+ c -> c.doWithModelOps(m -> m.deleteModelVersion(ident, alias)),
+ RuntimeException.class));
+ }
+
+ private ModelVersion internalGetModelVersion(
+ NameIdentifier ident, Supplier<ModelVersion> supplier) {
+ NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+
+ ModelVersion modelVersion = supplier.get();
+ return EntityCombinedModelVersion.of(modelVersion)
+ .withHiddenProperties(
+ getHiddenPropertyNames(
+ catalogIdent,
+ HasPropertyMetadata::modelPropertiesMetadata,
+ modelVersion.properties()));
+ }
+
+ private Map<String, String> checkAndUpdateProperties(
+ NameIdentifier catalogIdent, Map<String, String> properties) {
+ TreeLockUtils.doWithTreeLock(
+ catalogIdent,
+ LockType.READ,
+ () ->
+ doWithCatalog(
+ catalogIdent,
+ c ->
+ c.doWithPropertiesMeta(
+ p -> {
+
validatePropertyForCreate(p.modelPropertiesMetadata(), properties);
+ return null;
+ }),
+ IllegalArgumentException.class));
+
+ long uid = idGenerator.nextId();
+ StringIdentifier stringId = StringIdentifier.fromId(uid);
+ return StringIdentifier.newPropertiesWithId(stringId, properties);
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
index ce870523a..789e5e471 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
@@ -125,7 +125,7 @@ public class SchemaOperationDispatcher extends
OperationDispatcher implements Sc
boolean isManagedSchema = isManagedEntity(catalogIdent,
Capability.Scope.SCHEMA);
if (isManagedSchema) {
return EntityCombinedSchema.of(schema)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::schemaPropertiesMetadata,
@@ -149,7 +149,7 @@ public class SchemaOperationDispatcher extends
OperationDispatcher implements Sc
} catch (Exception e) {
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedSchema.of(schema)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::schemaPropertiesMetadata,
@@ -158,7 +158,7 @@ public class SchemaOperationDispatcher extends
OperationDispatcher implements Sc
// Merge both the metadata from catalog operation and the metadata from
entity store.
return EntityCombinedSchema.of(schema, schemaEntity)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::schemaPropertiesMetadata,
schema.properties()));
}
@@ -214,7 +214,7 @@ public class SchemaOperationDispatcher extends
OperationDispatcher implements Sc
boolean isManagedSchema = isManagedEntity(catalogIdent,
Capability.Scope.SCHEMA);
if (isManagedSchema) {
return EntityCombinedSchema.of(alteredSchema)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::schemaPropertiesMetadata,
@@ -225,7 +225,7 @@ public class SchemaOperationDispatcher extends
OperationDispatcher implements Sc
// Case 1: The schema is not created by Gravitino.
if (stringId == null) {
return EntityCombinedSchema.of(alteredSchema)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::schemaPropertiesMetadata,
@@ -258,7 +258,7 @@ public class SchemaOperationDispatcher extends
OperationDispatcher implements Sc
stringId.id());
return EntityCombinedSchema.of(alteredSchema, updatedSchemaEntity)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::schemaPropertiesMetadata,
@@ -375,7 +375,7 @@ public class SchemaOperationDispatcher extends
OperationDispatcher implements Sc
boolean isManagedSchema = isManagedEntity(catalogIdentifier,
Capability.Scope.SCHEMA);
if (isManagedSchema) {
return EntityCombinedSchema.of(schema)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdentifier,
HasPropertyMetadata::schemaPropertiesMetadata,
@@ -389,7 +389,7 @@ public class SchemaOperationDispatcher extends
OperationDispatcher implements Sc
// storing string identifiers.
if (stringId == null) {
return EntityCombinedSchema.of(schema)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdentifier,
HasPropertyMetadata::schemaPropertiesMetadata,
@@ -408,7 +408,7 @@ public class SchemaOperationDispatcher extends
OperationDispatcher implements Sc
stringId.id());
return EntityCombinedSchema.of(schema, schemaEntity)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdentifier,
HasPropertyMetadata::schemaPropertiesMetadata,
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index de34e712a..7a4c5a565 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -126,7 +126,7 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
TableEntity updatedEntity = updateColumnsIfNecessaryWhenLoad(ident,
entityCombinedTable);
return EntityCombinedTable.of(entityCombinedTable.tableFromCatalog(),
updatedEntity)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
getCatalogIdentifier(ident),
HasPropertyMetadata::tablePropertiesMetadata,
@@ -208,7 +208,7 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
// Case 1: The table is not created by Gravitino.
if (stringId == null) {
return EntityCombinedTable.of(alteredTable)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
getCatalogIdentifier(ident),
HasPropertyMetadata::tablePropertiesMetadata,
@@ -252,7 +252,7 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
stringId.id());
return EntityCombinedTable.of(alteredTable, updatedTableEntity)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
getCatalogIdentifier(ident),
HasPropertyMetadata::tablePropertiesMetadata,
@@ -406,7 +406,7 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
}
return EntityCombinedTable.of(table.tableFromCatalog(), tableEntity)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
getCatalogIdentifier(identifier),
HasPropertyMetadata::tablePropertiesMetadata,
@@ -426,7 +426,7 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
// string identifier.
if (stringId == null) {
return EntityCombinedTable.of(table)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdentifier,
HasPropertyMetadata::tablePropertiesMetadata,
@@ -445,7 +445,7 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
stringId.id());
return EntityCombinedTable.of(table, tableEntity)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdentifier,
HasPropertyMetadata::tablePropertiesMetadata,
@@ -524,13 +524,13 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
} catch (Exception e) {
LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
return EntityCombinedTable.of(table)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata,
table.properties()));
}
return EntityCombinedTable.of(table, tableEntity)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::tablePropertiesMetadata,
table.properties()));
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
index d44a09b15..eb4914b08 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
@@ -195,7 +195,7 @@ public class TopicOperationDispatcher extends
OperationDispatcher implements Top
getStringIdFromProperties(alteredTopic.properties()).id());
return EntityCombinedTopic.of(alteredTopic, updatedTopicEntity)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::topicPropertiesMetadata,
@@ -303,7 +303,7 @@ public class TopicOperationDispatcher extends
OperationDispatcher implements Top
StringIdentifier stringId = getStringIdFromProperties(topic.properties());
if (stringId == null) {
return EntityCombinedTopic.of(topic)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata,
topic.properties()))
.withImported(isEntityExist(ident, TOPIC));
@@ -317,7 +317,7 @@ public class TopicOperationDispatcher extends
OperationDispatcher implements Top
getStringIdFromProperties(topic.properties()).id());
return EntityCombinedTopic.of(topic, topicEntity)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata,
topic.properties()))
.withImported(topicEntity != null);
@@ -368,13 +368,13 @@ public class TopicOperationDispatcher extends
OperationDispatcher implements Top
} catch (Exception e) {
LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE,
"put", ident, e);
return EntityCombinedTopic.of(topic)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata,
topic.properties()));
}
return EntityCombinedTopic.of(topic, topicEntity)
- .withHiddenPropertiesSet(
+ .withHiddenProperties(
getHiddenPropertyNames(
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata,
topic.properties()));
}
diff --git a/core/src/test/java/org/apache/gravitino/TestCatalog.java
b/core/src/test/java/org/apache/gravitino/TestCatalog.java
index bdb409f20..420396559 100644
--- a/core/src/test/java/org/apache/gravitino/TestCatalog.java
+++ b/core/src/test/java/org/apache/gravitino/TestCatalog.java
@@ -134,4 +134,9 @@ public class TestCatalog extends BaseCatalog<TestCatalog> {
public PropertiesMetadata topicPropertiesMetadata() throws
UnsupportedOperationException {
return BASE_PROPERTIES_METADATA;
}
+
+ @Override
+ public PropertiesMetadata modelPropertiesMetadata() throws
UnsupportedOperationException {
+ return BASE_PROPERTIES_METADATA;
+ }
}
diff --git a/core/src/test/java/org/apache/gravitino/TestModel.java
b/core/src/test/java/org/apache/gravitino/TestModel.java
new file mode 100644
index 000000000..ee632192f
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/TestModel.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino;
+
+import org.apache.gravitino.connector.BaseModel;
+
+public class TestModel extends BaseModel {
+
+ public static class Builder extends BaseModelBuilder<Builder, TestModel> {
+
+ private Builder() {}
+
+ @Override
+ protected TestModel internalBuild() {
+ TestModel model = new TestModel();
+ model.name = name;
+ model.comment = comment;
+ model.properties = properties;
+ model.latestVersion = latestVersion;
+ model.auditInfo = auditInfo;
+ return model;
+ }
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+}
diff --git a/core/src/test/java/org/apache/gravitino/TestModelVersion.java
b/core/src/test/java/org/apache/gravitino/TestModelVersion.java
new file mode 100644
index 000000000..487496c5f
--- /dev/null
+++ b/core/src/test/java/org/apache/gravitino/TestModelVersion.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino;
+
+import org.apache.gravitino.connector.BaseModelVersion;
+
+public class TestModelVersion extends BaseModelVersion {
+
+ public static class Builder extends BaseModelVersionBuilder<Builder,
TestModelVersion> {
+
+ private Builder() {}
+
+ @Override
+ protected TestModelVersion internalBuild() {
+ TestModelVersion modelVersion = new TestModelVersion();
+ modelVersion.version = version;
+ modelVersion.comment = comment;
+ modelVersion.aliases = aliases;
+ modelVersion.uri = uri;
+ modelVersion.properties = properties;
+ modelVersion.auditInfo = auditInfo;
+ return modelVersion;
+ }
+ }
+
+ public static Builder builder() {
+ return new Builder();
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestModelOperationDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestModelOperationDispatcher.java
new file mode 100644
index 000000000..10bb85a1e
--- /dev/null
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestModelOperationDispatcher.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.catalog;
+
+import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
+import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
+import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
+import static org.apache.gravitino.StringIdentifier.ID_KEY;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.GravitinoEnv;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.exceptions.NoSuchModelException;
+import org.apache.gravitino.exceptions.NoSuchModelVersionException;
+import org.apache.gravitino.lock.LockManager;
+import org.apache.gravitino.model.Model;
+import org.apache.gravitino.model.ModelVersion;
+import org.apache.gravitino.utils.NameIdentifierUtil;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class TestModelOperationDispatcher extends TestOperationDispatcher {
+
+ static ModelOperationDispatcher modelOperationDispatcher;
+
+ static SchemaOperationDispatcher schemaOperationDispatcher;
+
+ @BeforeAll
+ public static void initialize() throws IOException, IllegalAccessException {
+ Config config = Mockito.mock(Config.class);
+ Mockito.doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
+ Mockito.doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
+ Mockito.doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new
LockManager(config), true);
+
+ modelOperationDispatcher =
+ new ModelOperationDispatcher(catalogManager, entityStore, idGenerator);
+ schemaOperationDispatcher =
+ new SchemaOperationDispatcher(catalogManager, entityStore,
idGenerator);
+ }
+
+ @Test
+ public void testRegisterAndGetModel() {
+ String schemaName = randomSchemaName();
+ NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog,
schemaName);
+ schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
+
+ Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+ String modelName = randomModelName();
+ NameIdentifier modelIdent =
+ NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName);
+
+ Model model = modelOperationDispatcher.registerModel(modelIdent,
"comment", props);
+ Assertions.assertEquals(modelName, model.name());
+ Assertions.assertEquals("comment", model.comment());
+ Assertions.assertEquals(props, model.properties());
+ Assertions.assertFalse(model.properties().containsKey(ID_KEY));
+
+ Model registeredModel = modelOperationDispatcher.getModel(modelIdent);
+ Assertions.assertEquals(modelName, registeredModel.name());
+ Assertions.assertEquals("comment", registeredModel.comment());
+ Assertions.assertEquals(props, registeredModel.properties());
+ Assertions.assertFalse(registeredModel.properties().containsKey(ID_KEY));
+
+ // Test register model with illegal property
+ Map<String, String> illegalProps = ImmutableMap.of("k1", "v1", ID_KEY,
"test");
+ testPropertyException(
+ () -> modelOperationDispatcher.registerModel(modelIdent, "comment",
illegalProps),
+ "Properties are reserved and cannot be set",
+ ID_KEY);
+ }
+
+ @Test
+ public void testRegisterAndListModels() {
+ String schemaName = randomSchemaName();
+ NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog,
schemaName);
+ schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
+
+ Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+ String modelName1 = randomModelName();
+ NameIdentifier modelIdent1 =
+ NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName1);
+ modelOperationDispatcher.registerModel(modelIdent1, "comment", props);
+
+ String modelName2 = randomModelName();
+ NameIdentifier modelIdent2 =
+ NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName2);
+ modelOperationDispatcher.registerModel(modelIdent2, "comment", props);
+
+ NameIdentifier[] modelIdents =
modelOperationDispatcher.listModels(modelIdent1.namespace());
+ Assertions.assertEquals(2, modelIdents.length);
+ Set<NameIdentifier> modelIdentSet = Sets.newHashSet(modelIdents);
+ Assertions.assertTrue(modelIdentSet.contains(modelIdent1));
+ Assertions.assertTrue(modelIdentSet.contains(modelIdent2));
+ }
+
+ @Test
+ public void testRegisterAndDeleteModel() {
+ String schemaName = randomSchemaName();
+ NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog,
schemaName);
+ schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
+
+ Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+ String modelName = randomModelName();
+ NameIdentifier modelIdent =
+ NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName);
+
+ modelOperationDispatcher.registerModel(modelIdent, "comment", props);
+ Assertions.assertTrue(modelOperationDispatcher.deleteModel(modelIdent));
+ Assertions.assertFalse(modelOperationDispatcher.deleteModel(modelIdent));
+ Assertions.assertThrows(
+ NoSuchModelException.class, () ->
modelOperationDispatcher.getModel(modelIdent));
+
+ // Test delete in-existent model
+ Assertions.assertFalse(
+ modelOperationDispatcher.deleteModel(NameIdentifier.of(metalake,
catalog, "inexistent")));
+ }
+
+ @Test
+ public void testLinkAndGetModelVersion() {
+ String schemaName = randomSchemaName();
+ NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog,
schemaName);
+ schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
+
+ Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+ String modelName = randomModelName();
+ NameIdentifier modelIdent =
+ NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName);
+
+ Model model = modelOperationDispatcher.registerModel(modelIdent,
"comment", props);
+ Assertions.assertEquals(0, model.latestVersion());
+
+ String[] aliases = new String[] {"alias1", "alias2"};
+ modelOperationDispatcher.linkModelVersion(modelIdent, "path", aliases,
"comment", props);
+
+ ModelVersion linkedModelVersion =
modelOperationDispatcher.getModelVersion(modelIdent, 0);
+ Assertions.assertEquals(0, linkedModelVersion.version());
+ Assertions.assertEquals("path", linkedModelVersion.uri());
+ Assertions.assertArrayEquals(aliases, linkedModelVersion.aliases());
+ Assertions.assertEquals("comment", linkedModelVersion.comment());
+ Assertions.assertEquals(props, linkedModelVersion.properties());
+
Assertions.assertFalse(linkedModelVersion.properties().containsKey(ID_KEY));
+
+ // Test get model version with alias
+ ModelVersion linkedModelVersionWithAlias =
+ modelOperationDispatcher.getModelVersion(modelIdent, "alias1");
+ Assertions.assertEquals(0, linkedModelVersionWithAlias.version());
+ Assertions.assertEquals("path", linkedModelVersionWithAlias.uri());
+ Assertions.assertArrayEquals(aliases,
linkedModelVersionWithAlias.aliases());
+
Assertions.assertFalse(linkedModelVersionWithAlias.properties().containsKey(ID_KEY));
+
+ ModelVersion linkedModelVersionWithAlias2 =
+ modelOperationDispatcher.getModelVersion(modelIdent, "alias2");
+ Assertions.assertEquals(0, linkedModelVersionWithAlias2.version());
+ Assertions.assertEquals("path", linkedModelVersionWithAlias2.uri());
+ Assertions.assertArrayEquals(aliases,
linkedModelVersionWithAlias2.aliases());
+
Assertions.assertFalse(linkedModelVersionWithAlias2.properties().containsKey(ID_KEY));
+
+ // Test Link model version with illegal property
+ Map<String, String> illegalProps = ImmutableMap.of("k1", "v1", ID_KEY,
"test");
+ testPropertyException(
+ () ->
+ modelOperationDispatcher.linkModelVersion(
+ modelIdent, "path", aliases, "comment", illegalProps),
+ "Properties are reserved and cannot be set",
+ ID_KEY);
+ }
+
+ @Test
+ public void testLinkAndListModelVersion() {
+ String schemaName = randomSchemaName();
+ NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog,
schemaName);
+ schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
+
+ Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+ String modelName = randomModelName();
+ NameIdentifier modelIdent =
+ NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName);
+
+ Model model = modelOperationDispatcher.registerModel(modelIdent,
"comment", props);
+ Assertions.assertEquals(0, model.latestVersion());
+
+ String[] aliases1 = new String[] {"alias1"};
+ String[] aliases2 = new String[] {"alias2"};
+ modelOperationDispatcher.linkModelVersion(modelIdent, "path1", aliases1,
"comment", props);
+ modelOperationDispatcher.linkModelVersion(modelIdent, "path2", aliases2,
"comment", props);
+
+ int[] versions = modelOperationDispatcher.listModelVersions(modelIdent);
+ Assertions.assertEquals(2, versions.length);
+ Set<Integer> versionSet =
Arrays.stream(versions).boxed().collect(Collectors.toSet());
+ Assertions.assertTrue(versionSet.contains(0));
+ Assertions.assertTrue(versionSet.contains(1));
+ }
+
+ @Test
+ public void testLinkAndDeleteModelVersion() {
+ String schemaName = randomSchemaName();
+ NameIdentifier schemaIdent = NameIdentifier.of(metalake, catalog,
schemaName);
+ schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
+
+ Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+ String modelName = randomModelName();
+ NameIdentifier modelIdent =
+ NameIdentifierUtil.ofModel(metalake, catalog, schemaName, modelName);
+
+ Model model = modelOperationDispatcher.registerModel(modelIdent,
"comment", props);
+ Assertions.assertEquals(0, model.latestVersion());
+
+ String[] aliases = new String[] {"alias1"};
+ modelOperationDispatcher.linkModelVersion(modelIdent, "path", aliases,
"comment", props);
+
Assertions.assertTrue(modelOperationDispatcher.deleteModelVersion(modelIdent,
0));
+
Assertions.assertFalse(modelOperationDispatcher.deleteModelVersion(modelIdent,
0));
+ Assertions.assertThrows(
+ NoSuchModelVersionException.class,
+ () -> modelOperationDispatcher.getModelVersion(modelIdent, 0));
+
+ // Test delete in-existent model version
+
Assertions.assertFalse(modelOperationDispatcher.deleteModelVersion(modelIdent,
1));
+
+ // Tet delete model version with alias
+ String[] aliases2 = new String[] {"alias2"};
+ modelOperationDispatcher.linkModelVersion(modelIdent, "path2", aliases2,
"comment", props);
+
Assertions.assertTrue(modelOperationDispatcher.deleteModelVersion(modelIdent,
"alias2"));
+
Assertions.assertFalse(modelOperationDispatcher.deleteModelVersion(modelIdent,
"alias2"));
+ Assertions.assertThrows(
+ NoSuchModelVersionException.class,
+ () -> modelOperationDispatcher.getModelVersion(modelIdent, "alias2"));
+ }
+
+ private String randomSchemaName() {
+ return "schema_" + UUID.randomUUID().toString().replace("-", "");
+ }
+
+ private String randomModelName() {
+ return "model_" + UUID.randomUUID().toString().replace("-", "");
+ }
+}
diff --git
a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
index 4fb98c596..f7775ef32 100644
---
a/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
+++
b/core/src/test/java/org/apache/gravitino/connector/TestCatalogOperations.java
@@ -26,11 +26,13 @@ import java.time.Instant;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
@@ -38,6 +40,8 @@ import org.apache.gravitino.Schema;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.TestColumn;
import org.apache.gravitino.TestFileset;
+import org.apache.gravitino.TestModel;
+import org.apache.gravitino.TestModelVersion;
import org.apache.gravitino.TestSchema;
import org.apache.gravitino.TestTable;
import org.apache.gravitino.TestTopic;
@@ -47,8 +51,12 @@ import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.exceptions.ConnectionFailedException;
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.gravitino.exceptions.ModelAlreadyExistsException;
+import
org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.exceptions.NoSuchFilesetException;
+import org.apache.gravitino.exceptions.NoSuchModelException;
+import org.apache.gravitino.exceptions.NoSuchModelVersionException;
import org.apache.gravitino.exceptions.NoSuchSchemaException;
import org.apache.gravitino.exceptions.NoSuchTableException;
import org.apache.gravitino.exceptions.NoSuchTopicException;
@@ -64,6 +72,9 @@ import org.apache.gravitino.messaging.Topic;
import org.apache.gravitino.messaging.TopicCatalog;
import org.apache.gravitino.messaging.TopicChange;
import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.model.Model;
+import org.apache.gravitino.model.ModelCatalog;
+import org.apache.gravitino.model.ModelVersion;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
import org.apache.gravitino.rel.TableCatalog;
@@ -76,7 +87,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestCatalogOperations
- implements CatalogOperations, TableCatalog, FilesetCatalog, TopicCatalog,
SupportsSchemas {
+ implements CatalogOperations,
+ TableCatalog,
+ FilesetCatalog,
+ TopicCatalog,
+ ModelCatalog,
+ SupportsSchemas {
private static final Logger LOG =
LoggerFactory.getLogger(TestCatalogOperations.class);
private final Map<NameIdentifier, TestTable> tables;
@@ -87,6 +103,12 @@ public class TestCatalogOperations
private final Map<NameIdentifier, TestTopic> topics;
+ private final Map<NameIdentifier, TestModel> models;
+
+ private final Map<Pair<NameIdentifier, Integer>, TestModelVersion>
modelVersions;
+
+ private final Map<Pair<NameIdentifier, String>, Integer> modelAliasToVersion;
+
public static final String FAIL_CREATE = "fail-create";
public static final String FAIL_TEST = "need-fail";
@@ -98,6 +120,9 @@ public class TestCatalogOperations
schemas = Maps.newHashMap();
filesets = Maps.newHashMap();
topics = Maps.newHashMap();
+ models = Maps.newHashMap();
+ modelVersions = Maps.newHashMap();
+ modelAliasToVersion = Maps.newHashMap();
}
@Override
@@ -649,6 +674,227 @@ public class TestCatalogOperations
}
}
+ @Override
+ public NameIdentifier[] listModels(Namespace namespace) throws
NoSuchSchemaException {
+ NameIdentifier modelSchemaIdent = NameIdentifier.of(namespace.levels());
+ if (!schemas.containsKey(modelSchemaIdent)) {
+ throw new NoSuchSchemaException("Schema %s does not exist",
modelSchemaIdent);
+ }
+
+ return models.keySet().stream()
+ .filter(ident -> ident.namespace().equals(namespace))
+ .toArray(NameIdentifier[]::new);
+ }
+
+ @Override
+ public Model getModel(NameIdentifier ident) throws NoSuchModelException {
+ if (models.containsKey(ident)) {
+ return models.get(ident);
+ } else {
+ throw new NoSuchModelException("Model %s does not exist", ident);
+ }
+ }
+
+ @Override
+ public Model registerModel(NameIdentifier ident, String comment, Map<String,
String> properties)
+ throws NoSuchSchemaException, ModelAlreadyExistsException {
+ NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+ if (!schemas.containsKey(schemaIdent)) {
+ throw new NoSuchSchemaException("Schema %s does not exist", schemaIdent);
+ }
+
+ AuditInfo auditInfo =
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build();
+ TestModel model =
+ TestModel.builder()
+ .withName(ident.name())
+ .withComment(comment)
+ .withProperties(properties)
+ .withLatestVersion(0)
+ .withAuditInfo(auditInfo)
+ .build();
+
+ if (models.containsKey(ident)) {
+ throw new ModelAlreadyExistsException("Model %s already exists", ident);
+ } else {
+ models.put(ident, model);
+ }
+
+ return model;
+ }
+
+ @Override
+ public boolean deleteModel(NameIdentifier ident) {
+ if (!models.containsKey(ident)) {
+ return false;
+ }
+
+ models.remove(ident);
+
+ List<Pair<NameIdentifier, Integer>> deletedVersions =
+ modelVersions.entrySet().stream()
+ .filter(e -> e.getKey().getLeft().equals(ident))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ deletedVersions.forEach(modelVersions::remove);
+
+ List<Pair<NameIdentifier, String>> deletedAliases =
+ modelAliasToVersion.entrySet().stream()
+ .filter(e -> e.getKey().getLeft().equals(ident))
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList());
+ deletedAliases.forEach(modelAliasToVersion::remove);
+
+ return true;
+ }
+
+ @Override
+ public int[] listModelVersions(NameIdentifier ident) throws
NoSuchModelException {
+ if (!models.containsKey(ident)) {
+ throw new NoSuchModelException("Model %s does not exist", ident);
+ }
+
+ return modelVersions.entrySet().stream()
+ .filter(e -> e.getKey().getLeft().equals(ident))
+ .mapToInt(e -> e.getValue().version())
+ .toArray();
+ }
+
+ @Override
+ public ModelVersion getModelVersion(NameIdentifier ident, int version)
+ throws NoSuchModelVersionException {
+ if (!models.containsKey(ident)) {
+ throw new NoSuchModelVersionException("Model %s does not exist", ident);
+ }
+
+ Pair<NameIdentifier, Integer> versionPair = Pair.of(ident, version);
+ if (!modelVersions.containsKey(versionPair)) {
+ throw new NoSuchModelVersionException("Model version %s does not exist",
versionPair);
+ }
+
+ return modelVersions.get(versionPair);
+ }
+
+ @Override
+ public ModelVersion getModelVersion(NameIdentifier ident, String alias)
+ throws NoSuchModelVersionException {
+ if (!models.containsKey(ident)) {
+ throw new NoSuchModelVersionException("Model %s does not exist", ident);
+ }
+
+ Pair<NameIdentifier, String> aliasPair = Pair.of(ident, alias);
+ if (!modelAliasToVersion.containsKey(aliasPair)) {
+ throw new NoSuchModelVersionException("Model version %s does not exist",
alias);
+ }
+
+ int version = modelAliasToVersion.get(aliasPair);
+ Pair<NameIdentifier, Integer> versionPair = Pair.of(ident, version);
+ if (!modelVersions.containsKey(versionPair)) {
+ throw new NoSuchModelVersionException("Model version %s does not exist",
versionPair);
+ }
+
+ return modelVersions.get(versionPair);
+ }
+
+ @Override
+ public void linkModelVersion(
+ NameIdentifier ident,
+ String uri,
+ String[] aliases,
+ String comment,
+ Map<String, String> properties)
+ throws NoSuchModelException, ModelVersionAliasesAlreadyExistException {
+ if (!models.containsKey(ident)) {
+ throw new NoSuchModelException("Model %s does not exist", ident);
+ }
+
+ String[] aliasArray = aliases != null ? aliases : new String[0];
+ for (String alias : aliasArray) {
+ Pair<NameIdentifier, String> aliasPair = Pair.of(ident, alias);
+ if (modelAliasToVersion.containsKey(aliasPair)) {
+ throw new ModelVersionAliasesAlreadyExistException(
+ "Model version alias %s already exists", alias);
+ }
+ }
+
+ int version = models.get(ident).latestVersion();
+ TestModelVersion modelVersion =
+ TestModelVersion.builder()
+ .withVersion(version)
+ .withAliases(aliases)
+ .withComment(comment)
+ .withUri(uri)
+ .withProperties(properties)
+ .withAuditInfo(
+
AuditInfo.builder().withCreator("test").withCreateTime(Instant.now()).build())
+ .build();
+ Pair<NameIdentifier, Integer> versionPair = Pair.of(ident, version);
+ modelVersions.put(versionPair, modelVersion);
+ for (String alias : aliasArray) {
+ Pair<NameIdentifier, String> aliasPair = Pair.of(ident, alias);
+ modelAliasToVersion.put(aliasPair, version);
+ }
+
+ TestModel model = models.get(ident);
+ TestModel updatedModel =
+ TestModel.builder()
+ .withName(model.name())
+ .withComment(model.comment())
+ .withProperties(model.properties())
+ .withLatestVersion(version + 1)
+ .withAuditInfo(model.auditInfo())
+ .build();
+ models.put(ident, updatedModel);
+ }
+
+ @Override
+ public boolean deleteModelVersion(NameIdentifier ident, int version) {
+ if (!models.containsKey(ident)) {
+ return false;
+ }
+
+ Pair<NameIdentifier, Integer> versionPair = Pair.of(ident, version);
+ if (!modelVersions.containsKey(versionPair)) {
+ return false;
+ }
+
+ TestModelVersion modelVersion = modelVersions.remove(versionPair);
+ if (modelVersion.aliases() != null) {
+ for (String alias : modelVersion.aliases()) {
+ Pair<NameIdentifier, String> aliasPair = Pair.of(ident, alias);
+ modelAliasToVersion.remove(aliasPair);
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean deleteModelVersion(NameIdentifier ident, String alias) {
+ if (!models.containsKey(ident)) {
+ return false;
+ }
+
+ Pair<NameIdentifier, String> aliasPair = Pair.of(ident, alias);
+ if (!modelAliasToVersion.containsKey(aliasPair)) {
+ return false;
+ }
+
+ int version = modelAliasToVersion.remove(aliasPair);
+ Pair<NameIdentifier, Integer> versionPair = Pair.of(ident, version);
+ if (!modelVersions.containsKey(versionPair)) {
+ return false;
+ }
+
+ TestModelVersion modelVersion = modelVersions.remove(versionPair);
+ for (String modelVersionAlias : modelVersion.aliases()) {
+ Pair<NameIdentifier, String> modelAliasPair = Pair.of(ident,
modelVersionAlias);
+ modelAliasToVersion.remove(modelAliasPair);
+ }
+
+ return true;
+ }
+
private boolean hasCallerContext() {
return CallerContext.CallerContextHolder.get() != null
&& CallerContext.CallerContextHolder.get().context() != null