This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new d6721459d [#4195] improvement(core): Decouple `OperationDispatcher`
from `NormalizeDispatcher` (#4196)
d6721459d is described below
commit d6721459d499d1237cfcb5b0527ed5252e2462ce
Author: mchades <[email protected]>
AuthorDate: Fri Jul 19 18:06:44 2024 +0800
[#4195] improvement(core): Decouple `OperationDispatcher` from
`NormalizeDispatcher` (#4196)
### What changes were proposed in this pull request?
- Decouple `OperationDispatcher` from `NormalizeDispatcher`
- move `getCatalogCapability` method from `OperationDispatcher` to
`CapabilityHelpers`
### Why are the changes needed?
Fix: #4195
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
existing tests
---
.../java/org/apache/gravitino/GravitinoEnv.java | 11 ++---
.../gravitino/catalog/CapabilityHelpers.java | 23 ++++++----
.../catalog/FilesetNormalizeDispatcher.java | 48 ++++++++++++++-------
.../catalog/FilesetOperationDispatcher.java | 1 +
.../gravitino/catalog/OperationDispatcher.java | 40 +-----------------
.../catalog/PartitionNormalizeDispatcher.java | 47 ++++++++++-----------
.../catalog/SchemaNormalizeDispatcher.java | 38 ++++++++++++-----
.../catalog/SchemaOperationDispatcher.java | 1 +
.../catalog/TableNormalizeDispatcher.java | 49 +++++++++++++++-------
.../catalog/TableOperationDispatcher.java | 1 +
.../catalog/TopicNormalizeDispatcher.java | 47 +++++++++++++++------
.../catalog/TopicOperationDispatcher.java | 1 +
.../apache/gravitino/utils/NameIdentifierUtil.java | 32 ++++++++++++++
.../catalog/TestFilesetNormalizeDispatcher.java | 6 ++-
.../gravitino/catalog/TestOperationDispatcher.java | 13 +++---
.../catalog/TestPartitionNormalizeDispatcher.java | 2 +-
.../catalog/TestSchemaNormalizeDispatcher.java | 2 +-
.../catalog/TestTableNormalizeDispatcher.java | 6 ++-
.../catalog/TestTopicNormalizeDispatcher.java | 6 ++-
19 files changed, 232 insertions(+), 142 deletions(-)
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index b307cddbd..cf95dd7e7 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -154,30 +154,31 @@ public class GravitinoEnv {
SchemaOperationDispatcher schemaOperationDispatcher =
new SchemaOperationDispatcher(catalogManager, entityStore,
idGenerator);
SchemaNormalizeDispatcher schemaNormalizeDispatcher =
- new SchemaNormalizeDispatcher(schemaOperationDispatcher);
+ new SchemaNormalizeDispatcher(schemaOperationDispatcher,
catalogManager);
this.schemaDispatcher = new SchemaEventDispatcher(eventBus,
schemaNormalizeDispatcher);
TableOperationDispatcher tableOperationDispatcher =
new TableOperationDispatcher(catalogManager, entityStore, idGenerator);
TableNormalizeDispatcher tableNormalizeDispatcher =
- new TableNormalizeDispatcher(tableOperationDispatcher);
+ new TableNormalizeDispatcher(tableOperationDispatcher, catalogManager);
this.tableDispatcher = new TableEventDispatcher(eventBus,
tableNormalizeDispatcher);
PartitionOperationDispatcher partitionOperationDispatcher =
new PartitionOperationDispatcher(catalogManager, entityStore,
idGenerator);
// todo: support PartitionEventDispatcher
- this.partitionDispatcher = new
PartitionNormalizeDispatcher(partitionOperationDispatcher);
+ this.partitionDispatcher =
+ new PartitionNormalizeDispatcher(partitionOperationDispatcher,
catalogManager);
FilesetOperationDispatcher filesetOperationDispatcher =
new FilesetOperationDispatcher(catalogManager, entityStore,
idGenerator);
FilesetNormalizeDispatcher filesetNormalizeDispatcher =
- new FilesetNormalizeDispatcher(filesetOperationDispatcher);
+ new FilesetNormalizeDispatcher(filesetOperationDispatcher,
catalogManager);
this.filesetDispatcher = new FilesetEventDispatcher(eventBus,
filesetNormalizeDispatcher);
TopicOperationDispatcher topicOperationDispatcher =
new TopicOperationDispatcher(catalogManager, entityStore, idGenerator);
TopicNormalizeDispatcher topicNormalizeDispatcher =
- new TopicNormalizeDispatcher(topicOperationDispatcher);
+ new TopicNormalizeDispatcher(topicOperationDispatcher, catalogManager);
this.topicDispatcher = new TopicEventDispatcher(eventBus,
topicNormalizeDispatcher);
// Create and initialize access control related modules
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java
b/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java
index 373aef049..266b4a401 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CapabilityHelpers.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.catalog;
import static org.apache.gravitino.rel.Column.DEFAULT_VALUE_NOT_SET;
import static
org.apache.gravitino.rel.expressions.transforms.Transforms.NAME_OF_IDENTITY;
+import static
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
import com.google.common.base.Preconditions;
import java.util.Arrays;
@@ -48,6 +49,16 @@ import org.apache.gravitino.rel.partitions.RangePartition;
public class CapabilityHelpers {
+ public static Capability getCapability(NameIdentifier ident, CatalogManager
catalogManager) {
+ NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+ CatalogManager.CatalogWrapper c =
catalogManager.loadCatalogAndWrap(catalogIdent);
+ try {
+ return c.capabilities();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to get capabilities for catalog: " +
catalogIdent, e);
+ }
+ }
+
public static Column[] applyCapabilities(Column[] columns, Capability
capabilities) {
return Arrays.stream(columns)
.map(c -> applyCapabilities(c, capabilities))
@@ -99,30 +110,28 @@ public class CapabilityHelpers {
}
public static NameIdentifier[] applyCaseSensitive(
- NameIdentifier[] idents, Capability.Scope scope, OperationDispatcher
operationDispatcher) {
+ NameIdentifier[] idents, Capability.Scope scope, Capability
capabilities) {
return Arrays.stream(idents)
- .map(ident -> applyCaseSensitive(ident, scope, operationDispatcher))
+ .map(ident -> applyCaseSensitive(ident, scope, capabilities))
.toArray(NameIdentifier[]::new);
}
public static NameIdentifier applyCaseSensitive(
- NameIdentifier ident, Capability.Scope scope, OperationDispatcher
operationDispatcher) {
- Capability capabilities = operationDispatcher.getCatalogCapability(ident);
- Namespace namespace = applyCaseSensitive(ident.namespace(), scope,
operationDispatcher);
+ NameIdentifier ident, Capability.Scope scope, Capability capabilities) {
+ Namespace namespace = applyCaseSensitive(ident.namespace(), scope,
capabilities);
String name = applyCaseSensitiveOnName(scope, ident.name(), capabilities);
return NameIdentifier.of(namespace, name);
}
public static Namespace applyCaseSensitive(
- Namespace namespace, Capability.Scope identScope, OperationDispatcher
operationDispatcher) {
+ Namespace namespace, Capability.Scope identScope, Capability
capabilities) {
String metalake = namespace.level(0);
String catalog = namespace.level(1);
if (identScope == Capability.Scope.TABLE
|| identScope == Capability.Scope.FILESET
|| identScope == Capability.Scope.TOPIC) {
String schema = namespace.level(namespace.length() - 1);
- Capability capabilities =
operationDispatcher.getCatalogCapability(namespace);
schema = applyCaseSensitiveOnName(Capability.Scope.SCHEMA, schema,
capabilities);
return Namespace.of(metalake, catalog, schema);
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
index 6698aeccf..d79630392 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/FilesetNormalizeDispatcher.java
@@ -20,8 +20,10 @@ package org.apache.gravitino.catalog;
import static org.apache.gravitino.catalog.CapabilityHelpers.applyCapabilities;
import static
org.apache.gravitino.catalog.CapabilityHelpers.applyCaseSensitive;
+import static org.apache.gravitino.catalog.CapabilityHelpers.getCapability;
import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.connector.capability.Capability;
@@ -32,35 +34,35 @@ import org.apache.gravitino.file.Fileset;
import org.apache.gravitino.file.FilesetChange;
public class FilesetNormalizeDispatcher implements FilesetDispatcher {
+ private final CatalogManager catalogManager;
+ private final FilesetDispatcher dispatcher;
- private final FilesetOperationDispatcher dispatcher;
-
- public FilesetNormalizeDispatcher(FilesetOperationDispatcher dispatcher) {
+ public FilesetNormalizeDispatcher(FilesetDispatcher dispatcher,
CatalogManager catalogManager) {
this.dispatcher = dispatcher;
+ this.catalogManager = catalogManager;
}
@Override
public NameIdentifier[] listFilesets(Namespace namespace) throws
NoSuchSchemaException {
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- Namespace caseSensitiveNs = applyCaseSensitive(namespace,
Capability.Scope.FILESET, dispatcher);
+ Namespace caseSensitiveNs = normalizeCaseSensitive(namespace);
NameIdentifier[] identifiers = dispatcher.listFilesets(caseSensitiveNs);
- return applyCaseSensitive(identifiers, Capability.Scope.FILESET,
dispatcher);
+ return normalizeCaseSensitive(identifiers);
}
@Override
public Fileset loadFileset(NameIdentifier ident) throws
NoSuchFilesetException {
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- return dispatcher.loadFileset(applyCaseSensitive(ident,
Capability.Scope.FILESET, dispatcher));
+ return dispatcher.loadFileset(normalizeCaseSensitive(ident));
}
@Override
public boolean filesetExists(NameIdentifier ident) {
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- return dispatcher.filesetExists(
- applyCaseSensitive(ident, Capability.Scope.FILESET, dispatcher));
+ return dispatcher.filesetExists(normalizeCaseSensitive(ident));
}
@Override
@@ -78,23 +80,41 @@ public class FilesetNormalizeDispatcher implements
FilesetDispatcher {
@Override
public Fileset alterFileset(NameIdentifier ident, FilesetChange... changes)
throws NoSuchFilesetException, IllegalArgumentException {
- Capability capability = dispatcher.getCatalogCapability(ident);
+ Capability capability = getCapability(ident, catalogManager);
return dispatcher.alterFileset(
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- applyCaseSensitive(ident, Capability.Scope.FILESET, dispatcher),
- applyCapabilities(capability, changes));
+ normalizeCaseSensitive(ident), applyCapabilities(capability, changes));
}
@Override
public boolean dropFileset(NameIdentifier ident) {
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- return dispatcher.dropFileset(applyCaseSensitive(ident,
Capability.Scope.FILESET, dispatcher));
+ return dispatcher.dropFileset(normalizeCaseSensitive(ident));
}
private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) {
- Capability capability = dispatcher.getCatalogCapability(ident);
- return applyCapabilities(ident, Capability.Scope.FILESET, capability);
+ Capability capabilities = getCapability(ident, catalogManager);
+ return applyCapabilities(ident, Capability.Scope.FILESET, capabilities);
+ }
+
+ private Namespace normalizeCaseSensitive(Namespace namespace) {
+ Capability capabilities =
getCapability(NameIdentifier.of(namespace.levels()), catalogManager);
+ return applyCaseSensitive(namespace, Capability.Scope.FILESET,
capabilities);
+ }
+
+ private NameIdentifier normalizeCaseSensitive(NameIdentifier filesetIdent) {
+ Capability capabilities = getCapability(filesetIdent, catalogManager);
+ return applyCaseSensitive(filesetIdent, Capability.Scope.FILESET,
capabilities);
+ }
+
+ private NameIdentifier[] normalizeCaseSensitive(NameIdentifier[]
filesetIdents) {
+ if (ArrayUtils.isEmpty(filesetIdents)) {
+ return filesetIdents;
+ }
+
+ Capability capabilities = getCapability(filesetIdents[0], catalogManager);
+ return applyCaseSensitive(filesetIdents, Capability.Scope.FILESET,
capabilities);
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
index 1dbe3fcad..e28369f96 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/FilesetOperationDispatcher.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.catalog;
import static
org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;
+import static
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
import java.util.Map;
import org.apache.gravitino.EntityStore;
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/OperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/OperationDispatcher.java
index bf8b0746b..a9f875df6 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/OperationDispatcher.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/OperationDispatcher.java
@@ -19,25 +19,21 @@
package org.apache.gravitino.catalog;
import static
org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForAlter;
+import static
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
import com.google.common.collect.Maps;
-import java.util.Arrays;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.gravitino.Entity;
import org.apache.gravitino.EntityStore;
import org.apache.gravitino.HasIdentifier;
import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.Namespace;
import org.apache.gravitino.SchemaChange;
import org.apache.gravitino.StringIdentifier;
import org.apache.gravitino.connector.HasPropertyMetadata;
import org.apache.gravitino.connector.PropertiesMetadata;
import org.apache.gravitino.connector.capability.Capability;
-import org.apache.gravitino.exceptions.IllegalNameIdentifierException;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.file.FilesetChange;
import org.apache.gravitino.messaging.TopicChange;
@@ -75,20 +71,6 @@ public abstract class OperationDispatcher {
this.idGenerator = idGenerator;
}
- protected Capability getCatalogCapability(NameIdentifier ident) {
- return doWithCatalog(
- getCatalogIdentifier(ident),
- CatalogManager.CatalogWrapper::capabilities,
- IllegalArgumentException.class);
- }
-
- protected Capability getCatalogCapability(Namespace namespace) {
- return doWithCatalog(
- getCatalogIdentifier(NameIdentifier.of(namespace.levels())),
- CatalogManager.CatalogWrapper::capabilities,
- IllegalArgumentException.class);
- }
-
protected <R, E extends Throwable> R doWithTable(
NameIdentifier tableIdent, ThrowableFunction<SupportsPartitions, R> fn,
Class<E> ex)
throws E {
@@ -239,26 +221,6 @@ public abstract class OperationDispatcher {
return ret;
}
- // TODO(xun): Remove this method when we implement a better way to get the
catalog identifier
- // [#257] Add an explicit get catalog functions in NameIdentifier
- protected NameIdentifier getCatalogIdentifier(NameIdentifier ident) {
- NameIdentifier.check(
- ident.name() != null, "The name variable in the NameIdentifier must
have value.");
- Namespace.check(
- ident.namespace() != null && ident.namespace().length() > 0,
- "Catalog namespace must be non-null and have 1 level, the input
namespace is %s",
- ident.namespace());
-
- List<String> allElems =
- Stream.concat(Arrays.stream(ident.namespace().levels()),
Stream.of(ident.name()))
- .collect(Collectors.toList());
- if (allElems.size() < 2) {
- throw new IllegalNameIdentifierException(
- "Cannot create a catalog NameIdentifier less than two elements.");
- }
- return NameIdentifier.of(allElems.get(0), allElems.get(1));
- }
-
boolean isManagedEntity(NameIdentifier catalogIdent, Capability.Scope scope)
{
return doWithCatalog(
catalogIdent,
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/PartitionNormalizeDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/PartitionNormalizeDispatcher.java
index 8d14e4210..899846fdb 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/PartitionNormalizeDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/PartitionNormalizeDispatcher.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.catalog;
import static
org.apache.gravitino.catalog.CapabilityHelpers.applyCaseSensitive;
import static
org.apache.gravitino.catalog.CapabilityHelpers.applyCaseSensitiveOnName;
+import static org.apache.gravitino.catalog.CapabilityHelpers.getCapability;
import java.util.Arrays;
import org.apache.gravitino.NameIdentifier;
@@ -29,19 +30,21 @@ import
org.apache.gravitino.exceptions.PartitionAlreadyExistsException;
import org.apache.gravitino.rel.partitions.Partition;
public class PartitionNormalizeDispatcher implements PartitionDispatcher {
+ private final CatalogManager catalogManager;
+ private final PartitionDispatcher dispatcher;
- private final PartitionOperationDispatcher dispatcher;
-
- public PartitionNormalizeDispatcher(PartitionOperationDispatcher dispatcher)
{
+ public PartitionNormalizeDispatcher(
+ PartitionDispatcher dispatcher, CatalogManager catalogManager) {
this.dispatcher = dispatcher;
+ this.catalogManager = catalogManager;
}
@Override
public String[] listPartitionNames(NameIdentifier tableIdent) {
+ Capability capabilities = getCapability(tableIdent, catalogManager);
String[] partitionNames =
dispatcher.listPartitionNames(
- applyCaseSensitive(tableIdent, Capability.Scope.TABLE,
dispatcher));
- Capability capabilities = dispatcher.getCatalogCapability(tableIdent);
+ applyCaseSensitive(tableIdent, Capability.Scope.TABLE,
capabilities));
return Arrays.stream(partitionNames)
.map(
partitionName ->
@@ -51,49 +54,45 @@ public class PartitionNormalizeDispatcher implements
PartitionDispatcher {
@Override
public Partition[] listPartitions(NameIdentifier tableIdent) {
+ Capability capabilities = getCapability(tableIdent, catalogManager);
Partition[] partitions =
dispatcher.listPartitions(
- CapabilityHelpers.applyCaseSensitive(tableIdent,
Capability.Scope.TABLE, dispatcher));
- return applyCaseSensitive(partitions,
dispatcher.getCatalogCapability(tableIdent));
+ CapabilityHelpers.applyCaseSensitive(tableIdent,
Capability.Scope.TABLE, capabilities));
+ return applyCaseSensitive(partitions, capabilities);
}
@Override
public Partition getPartition(NameIdentifier tableIdent, String
partitionName)
throws NoSuchPartitionException {
+ Capability capabilities = getCapability(tableIdent, catalogManager);
return dispatcher.getPartition(
- CapabilityHelpers.applyCaseSensitive(tableIdent,
Capability.Scope.TABLE, dispatcher),
- applyCaseSensitiveOnName(
- Capability.Scope.PARTITION,
- partitionName,
- dispatcher.getCatalogCapability(tableIdent)));
+ CapabilityHelpers.applyCaseSensitive(tableIdent,
Capability.Scope.TABLE, capabilities),
+ applyCaseSensitiveOnName(Capability.Scope.PARTITION, partitionName,
capabilities));
}
@Override
public Partition addPartition(NameIdentifier tableIdent, Partition partition)
throws PartitionAlreadyExistsException {
+ Capability capabilities = getCapability(tableIdent, catalogManager);
return dispatcher.addPartition(
- CapabilityHelpers.applyCaseSensitive(tableIdent,
Capability.Scope.TABLE, dispatcher),
- applyCaseSensitive(partition,
dispatcher.getCatalogCapability(tableIdent)));
+ CapabilityHelpers.applyCaseSensitive(tableIdent,
Capability.Scope.TABLE, capabilities),
+ applyCaseSensitive(partition, capabilities));
}
@Override
public boolean dropPartition(NameIdentifier tableIdent, String
partitionName) {
+ Capability capabilities = getCapability(tableIdent, catalogManager);
return dispatcher.dropPartition(
- CapabilityHelpers.applyCaseSensitive(tableIdent,
Capability.Scope.TABLE, dispatcher),
- applyCaseSensitiveOnName(
- Capability.Scope.PARTITION,
- partitionName,
- dispatcher.getCatalogCapability(tableIdent)));
+ CapabilityHelpers.applyCaseSensitive(tableIdent,
Capability.Scope.TABLE, capabilities),
+ applyCaseSensitiveOnName(Capability.Scope.PARTITION, partitionName,
capabilities));
}
@Override
public boolean purgePartition(NameIdentifier tableIdent, String
partitionName)
throws UnsupportedOperationException {
+ Capability capabilities = getCapability(tableIdent, catalogManager);
return dispatcher.purgePartition(
- CapabilityHelpers.applyCaseSensitive(tableIdent,
Capability.Scope.TABLE, dispatcher),
- applyCaseSensitiveOnName(
- Capability.Scope.PARTITION,
- partitionName,
- dispatcher.getCatalogCapability(tableIdent)));
+ CapabilityHelpers.applyCaseSensitive(tableIdent,
Capability.Scope.TABLE, capabilities),
+ applyCaseSensitiveOnName(Capability.Scope.PARTITION, partitionName,
capabilities));
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/SchemaNormalizeDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/SchemaNormalizeDispatcher.java
index 04e695fe6..25460bb5d 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/SchemaNormalizeDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/SchemaNormalizeDispatcher.java
@@ -20,8 +20,10 @@ package org.apache.gravitino.catalog;
import static org.apache.gravitino.catalog.CapabilityHelpers.applyCapabilities;
import static
org.apache.gravitino.catalog.CapabilityHelpers.applyCaseSensitive;
+import static org.apache.gravitino.catalog.CapabilityHelpers.getCapability;
import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.Schema;
@@ -33,11 +35,12 @@ import
org.apache.gravitino.exceptions.NonEmptySchemaException;
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
public class SchemaNormalizeDispatcher implements SchemaDispatcher {
+ private final CatalogManager catalogManager;
+ private final SchemaDispatcher dispatcher;
- private final SchemaOperationDispatcher dispatcher;
-
- public SchemaNormalizeDispatcher(SchemaOperationDispatcher dispatcher) {
+ public SchemaNormalizeDispatcher(SchemaDispatcher dispatcher, CatalogManager
catalogManager) {
this.dispatcher = dispatcher;
+ this.catalogManager = catalogManager;
}
@Override
@@ -45,14 +48,14 @@ public class SchemaNormalizeDispatcher implements
SchemaDispatcher {
NameIdentifier[] identifiers = dispatcher.listSchemas(namespace);
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- return applyCaseSensitive(identifiers, Capability.Scope.SCHEMA,
dispatcher);
+ return normalizeCaseSensitive(identifiers);
}
@Override
public boolean schemaExists(NameIdentifier ident) {
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- return dispatcher.schemaExists(applyCaseSensitive(ident,
Capability.Scope.SCHEMA, dispatcher));
+ return dispatcher.schemaExists(normalizeCaseSensitive(ident));
}
@Override
@@ -65,7 +68,7 @@ public class SchemaNormalizeDispatcher implements
SchemaDispatcher {
public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- return dispatcher.loadSchema(applyCaseSensitive(ident,
Capability.Scope.SCHEMA, dispatcher));
+ return dispatcher.loadSchema(normalizeCaseSensitive(ident));
}
@Override
@@ -73,8 +76,7 @@ public class SchemaNormalizeDispatcher implements
SchemaDispatcher {
throws NoSuchSchemaException {
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- return dispatcher.alterSchema(
- applyCaseSensitive(ident, Capability.Scope.SCHEMA, dispatcher),
changes);
+ return dispatcher.alterSchema(normalizeCaseSensitive(ident), changes);
}
@Override
@@ -82,8 +84,22 @@ public class SchemaNormalizeDispatcher implements
SchemaDispatcher {
return dispatcher.dropSchema(normalizeNameIdentifier(ident), cascade);
}
- private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) {
- Capability capability = dispatcher.getCatalogCapability(ident);
- return applyCapabilities(ident, Capability.Scope.SCHEMA, capability);
+ private NameIdentifier normalizeNameIdentifier(NameIdentifier schemaIdent) {
+ Capability capabilities = getCapability(schemaIdent, catalogManager);
+ return applyCapabilities(schemaIdent, Capability.Scope.SCHEMA,
capabilities);
+ }
+
+ private NameIdentifier normalizeCaseSensitive(NameIdentifier schemaIdent) {
+ Capability capabilities = getCapability(schemaIdent, catalogManager);
+ return applyCaseSensitive(schemaIdent, Capability.Scope.SCHEMA,
capabilities);
+ }
+
+ private NameIdentifier[] normalizeCaseSensitive(NameIdentifier[]
schemaIdents) {
+ if (ArrayUtils.isEmpty(schemaIdents)) {
+ return schemaIdents;
+ }
+
+ Capability capabilities = getCapability(schemaIdents[0], catalogManager);
+ return applyCaseSensitive(schemaIdents, Capability.Scope.SCHEMA,
capabilities);
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
index 267a68049..758d19737 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.catalog;
import static org.apache.gravitino.Entity.EntityType.SCHEMA;
import static
org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;
+import static
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
import java.time.Instant;
import java.util.Map;
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/TableNormalizeDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/TableNormalizeDispatcher.java
index f2b680e13..12fd1cb62 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/TableNormalizeDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/TableNormalizeDispatcher.java
@@ -20,8 +20,10 @@ package org.apache.gravitino.catalog;
import static org.apache.gravitino.catalog.CapabilityHelpers.applyCapabilities;
import static
org.apache.gravitino.catalog.CapabilityHelpers.applyCaseSensitive;
+import static org.apache.gravitino.catalog.CapabilityHelpers.getCapability;
import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.connector.capability.Capability;
@@ -37,10 +39,11 @@ import
org.apache.gravitino.rel.expressions.transforms.Transform;
import org.apache.gravitino.rel.indexes.Index;
public class TableNormalizeDispatcher implements TableDispatcher {
+ private final CatalogManager catalogManager;
+ private final TableDispatcher dispatcher;
- private final TableOperationDispatcher dispatcher;
-
- public TableNormalizeDispatcher(TableOperationDispatcher dispatcher) {
+ public TableNormalizeDispatcher(TableDispatcher dispatcher, CatalogManager
catalogManager) {
+ this.catalogManager = catalogManager;
this.dispatcher = dispatcher;
}
@@ -48,16 +51,16 @@ public class TableNormalizeDispatcher implements
TableDispatcher {
public NameIdentifier[] listTables(Namespace namespace) throws
NoSuchSchemaException {
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- Namespace caseSensitiveNs = applyCaseSensitive(namespace,
Capability.Scope.TABLE, dispatcher);
+ Namespace caseSensitiveNs = normalizeCaseSensitive(namespace);
NameIdentifier[] identifiers = dispatcher.listTables(caseSensitiveNs);
- return applyCaseSensitive(identifiers, Capability.Scope.TABLE, dispatcher);
+ return normalizeCaseSensitive(identifiers);
}
@Override
public Table loadTable(NameIdentifier ident) throws NoSuchTableException {
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- return dispatcher.loadTable(applyCaseSensitive(ident,
Capability.Scope.TABLE, dispatcher));
+ return dispatcher.loadTable(normalizeCaseSensitive(ident));
}
@Override
@@ -71,7 +74,7 @@ public class TableNormalizeDispatcher implements
TableDispatcher {
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
- Capability capability = dispatcher.getCatalogCapability(ident);
+ Capability capability = getCapability(ident, catalogManager);
return dispatcher.createTable(
applyCapabilities(ident, Capability.Scope.TABLE, capability),
applyCapabilities(columns, capability),
@@ -86,12 +89,11 @@ public class TableNormalizeDispatcher implements
TableDispatcher {
@Override
public Table alterTable(NameIdentifier ident, TableChange... changes)
throws NoSuchTableException, IllegalArgumentException {
- Capability capability = dispatcher.getCatalogCapability(ident);
+ Capability capability = getCapability(ident, catalogManager);
return dispatcher.alterTable(
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- applyCaseSensitive(ident, Capability.Scope.TABLE, dispatcher),
- applyCapabilities(capability, changes));
+ normalizeCaseSensitive(ident), applyCapabilities(capability, changes));
}
@Override
@@ -108,11 +110,30 @@ public class TableNormalizeDispatcher implements
TableDispatcher {
public boolean tableExists(NameIdentifier ident) {
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- return dispatcher.tableExists(applyCaseSensitive(ident,
Capability.Scope.TABLE, dispatcher));
+ return dispatcher.tableExists(normalizeCaseSensitive(ident));
+ }
+
+ private Namespace normalizeCaseSensitive(Namespace namespace) {
+ Capability capabilities =
getCapability(NameIdentifier.of(namespace.levels()), catalogManager);
+ return applyCaseSensitive(namespace, Capability.Scope.TABLE, capabilities);
+ }
+
+ private NameIdentifier normalizeCaseSensitive(NameIdentifier tableIdent) {
+ Capability capability = getCapability(tableIdent, catalogManager);
+ return applyCaseSensitive(tableIdent, Capability.Scope.TABLE, capability);
+ }
+
+ private NameIdentifier[] normalizeCaseSensitive(NameIdentifier[]
tableIdents) {
+ if (ArrayUtils.isEmpty(tableIdents)) {
+ return tableIdents;
+ }
+
+ Capability capabilities = getCapability(tableIdents[0], catalogManager);
+ return applyCaseSensitive(tableIdents, Capability.Scope.TABLE,
capabilities);
}
- private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) {
- Capability capability = dispatcher.getCatalogCapability(ident);
- return applyCapabilities(ident, Capability.Scope.TABLE, capability);
+ private NameIdentifier normalizeNameIdentifier(NameIdentifier tableIdent) {
+ Capability capability = getCapability(tableIdent, catalogManager);
+ return applyCapabilities(tableIdent, Capability.Scope.TABLE, capability);
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index 8a0697a37..404af695a 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -22,6 +22,7 @@ import static org.apache.gravitino.Entity.EntityType.TABLE;
import static org.apache.gravitino.catalog.CapabilityHelpers.applyCapabilities;
import static
org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;
import static
org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM;
+import static
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
import java.time.Instant;
import java.util.Arrays;
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/TopicNormalizeDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/TopicNormalizeDispatcher.java
index ebcaee981..a104b4f67 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/TopicNormalizeDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/TopicNormalizeDispatcher.java
@@ -20,8 +20,10 @@ package org.apache.gravitino.catalog;
import static org.apache.gravitino.catalog.CapabilityHelpers.applyCapabilities;
import static
org.apache.gravitino.catalog.CapabilityHelpers.applyCaseSensitive;
+import static org.apache.gravitino.catalog.CapabilityHelpers.getCapability;
import java.util.Map;
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
import org.apache.gravitino.connector.capability.Capability;
@@ -33,34 +35,35 @@ import org.apache.gravitino.messaging.Topic;
import org.apache.gravitino.messaging.TopicChange;
public class TopicNormalizeDispatcher implements TopicDispatcher {
+ private final CatalogManager catalogManager;
+ private final TopicDispatcher dispatcher;
- private final TopicOperationDispatcher dispatcher;
-
- public TopicNormalizeDispatcher(TopicOperationDispatcher dispatcher) {
+ public TopicNormalizeDispatcher(TopicDispatcher dispatcher, CatalogManager
catalogManager) {
this.dispatcher = dispatcher;
+ this.catalogManager = catalogManager;
}
@Override
public NameIdentifier[] listTopics(Namespace namespace) throws
NoSuchSchemaException {
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- Namespace caseSensitiveNs = applyCaseSensitive(namespace,
Capability.Scope.TOPIC, dispatcher);
+ Namespace caseSensitiveNs = normalizeCaseSensitive(namespace);
NameIdentifier[] identifiers = dispatcher.listTopics(caseSensitiveNs);
- return applyCaseSensitive(identifiers, Capability.Scope.TOPIC, dispatcher);
+ return normalizeCaseSensitive(identifiers);
}
@Override
public Topic loadTopic(NameIdentifier ident) throws NoSuchTopicException {
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- return dispatcher.loadTopic(applyCaseSensitive(ident,
Capability.Scope.TOPIC, dispatcher));
+ return dispatcher.loadTopic(normalizeCaseSensitive(ident));
}
@Override
public boolean topicExists(NameIdentifier ident) {
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- return dispatcher.topicExists(applyCaseSensitive(ident,
Capability.Scope.TOPIC, dispatcher));
+ return dispatcher.topicExists(normalizeCaseSensitive(ident));
}
@Override
@@ -75,19 +78,37 @@ public class TopicNormalizeDispatcher implements
TopicDispatcher {
throws NoSuchTopicException, IllegalArgumentException {
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- return dispatcher.alterTopic(
- applyCaseSensitive(ident, Capability.Scope.TOPIC, dispatcher),
changes);
+ return dispatcher.alterTopic(normalizeCaseSensitive(ident), changes);
}
@Override
public boolean dropTopic(NameIdentifier ident) {
// The constraints of the name spec may be more strict than underlying
catalog,
// and for compatibility reasons, we only apply case-sensitive
capabilities here.
- return dispatcher.dropTopic(applyCaseSensitive(ident,
Capability.Scope.TOPIC, dispatcher));
+ return dispatcher.dropTopic(normalizeCaseSensitive(ident));
+ }
+
+ private Namespace normalizeCaseSensitive(Namespace namespace) {
+ Capability capabilities =
getCapability(NameIdentifier.of(namespace.levels()), catalogManager);
+ return applyCaseSensitive(namespace, Capability.Scope.TOPIC, capabilities);
+ }
+
+ private NameIdentifier normalizeCaseSensitive(NameIdentifier topicIdent) {
+ Capability capabilities = getCapability(topicIdent, catalogManager);
+ return applyCaseSensitive(topicIdent, Capability.Scope.TOPIC,
capabilities);
+ }
+
+ private NameIdentifier[] normalizeCaseSensitive(NameIdentifier[]
topicIdents) {
+ if (ArrayUtils.isEmpty(topicIdents)) {
+ return topicIdents;
+ }
+
+ Capability capabilities = getCapability(topicIdents[0], catalogManager);
+ return applyCaseSensitive(topicIdents, Capability.Scope.TOPIC,
capabilities);
}
- private NameIdentifier normalizeNameIdentifier(NameIdentifier ident) {
- Capability capability = dispatcher.getCatalogCapability(ident);
- return applyCapabilities(ident, Capability.Scope.TOPIC, capability);
+ private NameIdentifier normalizeNameIdentifier(NameIdentifier topicIdent) {
+ Capability capability = getCapability(topicIdent, catalogManager);
+ return applyCapabilities(topicIdent, Capability.Scope.TOPIC, capability);
}
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
index e2ac7a4f6..981d999f9 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.catalog;
import static org.apache.gravitino.Entity.EntityType.TOPIC;
import static org.apache.gravitino.StringIdentifier.fromProperties;
import static
org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;
+import static
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
import java.time.Instant;
import java.util.Map;
diff --git
a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
index 616deb235..6afed3952 100644
--- a/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
+++ b/core/src/main/java/org/apache/gravitino/utils/NameIdentifierUtil.java
@@ -22,10 +22,15 @@ import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.FormatMethod;
import com.google.errorprone.annotations.FormatString;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.gravitino.Entity;
import org.apache.gravitino.MetadataObject;
import org.apache.gravitino.MetadataObjects;
import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Namespace;
import org.apache.gravitino.exceptions.IllegalNameIdentifierException;
import org.apache.gravitino.exceptions.IllegalNamespaceException;
@@ -117,6 +122,33 @@ public class NameIdentifierUtil {
return NameIdentifier.of(metalake, catalog, schema, topic);
}
+ /**
+ * Try to get the catalog {@link NameIdentifier} from the given {@link
NameIdentifier}.
+ *
+ * @param ident The {@link NameIdentifier} to check.
+ * @return The catalog {@link NameIdentifier}
+ * @throws IllegalNameIdentifierException If the given {@link
NameIdentifier} does not include
+ * catalog name
+ */
+ public static NameIdentifier getCatalogIdentifier(NameIdentifier ident)
+ throws IllegalNameIdentifierException {
+ NameIdentifier.check(
+ ident.name() != null, "The name variable in the NameIdentifier must
have value.");
+ Namespace.check(
+ ident.namespace() != null && !ident.namespace().isEmpty(),
+ "Catalog namespace must be non-null and have 1 level, the input
namespace is %s",
+ ident.namespace());
+
+ List<String> allElems =
+ Stream.concat(Arrays.stream(ident.namespace().levels()),
Stream.of(ident.name()))
+ .collect(Collectors.toList());
+ if (allElems.size() < 2) {
+ throw new IllegalNameIdentifierException(
+ "Cannot create a catalog NameIdentifier less than two elements.");
+ }
+ return NameIdentifier.of(allElems.get(0), allElems.get(1));
+ }
+
/**
* Check the given {@link NameIdentifier} is a metalake identifier. Throw an
{@link
* IllegalNameIdentifierException} if it's not.
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestFilesetNormalizeDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestFilesetNormalizeDispatcher.java
index 7976dbe9b..8781a0299 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestFilesetNormalizeDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestFilesetNormalizeDispatcher.java
@@ -40,9 +40,11 @@ public class TestFilesetNormalizeDispatcher extends
TestOperationDispatcher {
public static void initialize() throws IOException {
TestFilesetOperationDispatcher.initialize();
filesetNormalizeDispatcher =
- new
FilesetNormalizeDispatcher(TestFilesetOperationDispatcher.filesetOperationDispatcher);
+ new FilesetNormalizeDispatcher(
+ TestFilesetOperationDispatcher.filesetOperationDispatcher,
catalogManager);
schemaNormalizeDispatcher =
- new
SchemaNormalizeDispatcher(TestFilesetOperationDispatcher.schemaOperationDispatcher);
+ new SchemaNormalizeDispatcher(
+ TestFilesetOperationDispatcher.schemaOperationDispatcher,
catalogManager);
}
@Test
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestOperationDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestOperationDispatcher.java
index eae426c8c..58cd8ce5f 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestOperationDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestOperationDispatcher.java
@@ -19,6 +19,7 @@
package org.apache.gravitino.catalog;
import static
org.apache.gravitino.TestFilesetPropertiesMetadata.TEST_FILESET_HIDDEN_KEY;
+import static
org.apache.gravitino.utils.NameIdentifierUtil.getCatalogIdentifier;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.reset;
@@ -108,22 +109,20 @@ public abstract class TestOperationDispatcher {
@Test
public void testGetCatalogIdentifier() {
- OperationDispatcher dispatcher = new OperationDispatcher(null, null, null)
{};
-
NameIdentifier id1 = NameIdentifier.of("a");
- assertThrows(IllegalNamespaceException.class, () ->
dispatcher.getCatalogIdentifier(id1));
+ assertThrows(IllegalNamespaceException.class, () ->
getCatalogIdentifier(id1));
NameIdentifier id2 = NameIdentifier.of("a", "b");
- assertEquals(dispatcher.getCatalogIdentifier(id2), NameIdentifier.of("a",
"b"));
+ assertEquals(getCatalogIdentifier(id2), NameIdentifier.of("a", "b"));
NameIdentifier id3 = NameIdentifier.of("a", "b", "c");
- assertEquals(dispatcher.getCatalogIdentifier(id3), NameIdentifier.of("a",
"b"));
+ assertEquals(getCatalogIdentifier(id3), NameIdentifier.of("a", "b"));
NameIdentifier id4 = NameIdentifier.of("a", "b", "c", "d");
- assertEquals(dispatcher.getCatalogIdentifier(id4), NameIdentifier.of("a",
"b"));
+ assertEquals(getCatalogIdentifier(id4), NameIdentifier.of("a", "b"));
NameIdentifier id5 = NameIdentifier.of("a", "b", "c", "d", "e");
- assertEquals(dispatcher.getCatalogIdentifier(id5), NameIdentifier.of("a",
"b"));
+ assertEquals(getCatalogIdentifier(id5), NameIdentifier.of("a", "b"));
}
void testProperties(Map<String, String> expectedProps, Map<String, String>
testProps) {
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
index fcd209b6c..2c47d69bc 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
@@ -43,7 +43,7 @@ public class TestPartitionNormalizeDispatcher extends
TestOperationDispatcher {
TestPartitionOperationDispatcher.prepareTable();
partitionNormalizeDispatcher =
new PartitionNormalizeDispatcher(
- TestPartitionOperationDispatcher.partitionOperationDispatcher);
+ TestPartitionOperationDispatcher.partitionOperationDispatcher,
catalogManager);
NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema(metalake,
catalog, SCHEMA);
TestPartitionOperationDispatcher.schemaOperationDispatcher.createSchema(
schemaIdent, "comment", null);
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestSchemaNormalizeDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestSchemaNormalizeDispatcher.java
index 539560e8e..e4fef5487 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestSchemaNormalizeDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestSchemaNormalizeDispatcher.java
@@ -37,7 +37,7 @@ public class TestSchemaNormalizeDispatcher extends
TestOperationDispatcher {
public static void initialize() throws IOException, IllegalAccessException {
TestSchemaOperationDispatcher.initialize();
schemaNormalizeDispatcher =
- new
SchemaNormalizeDispatcher(TestSchemaOperationDispatcher.dispatcher);
+ new
SchemaNormalizeDispatcher(TestSchemaOperationDispatcher.dispatcher,
catalogManager);
}
@Test
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestTableNormalizeDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestTableNormalizeDispatcher.java
index 25c9203a5..2c8938edc 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestTableNormalizeDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestTableNormalizeDispatcher.java
@@ -56,9 +56,11 @@ public class TestTableNormalizeDispatcher extends
TestOperationDispatcher {
public static void initialize() throws IOException, IllegalAccessException {
TestTableOperationDispatcher.initialize();
tableNormalizeDispatcher =
- new
TableNormalizeDispatcher(TestTableOperationDispatcher.tableOperationDispatcher);
+ new TableNormalizeDispatcher(
+ TestTableOperationDispatcher.tableOperationDispatcher,
catalogManager);
schemaNormalizeDispatcher =
- new
SchemaNormalizeDispatcher(TestTableOperationDispatcher.schemaOperationDispatcher);
+ new SchemaNormalizeDispatcher(
+ TestTableOperationDispatcher.schemaOperationDispatcher,
catalogManager);
}
@Test
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestTopicNormalizeDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestTopicNormalizeDispatcher.java
index 6d493b91c..ef2589241 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestTopicNormalizeDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestTopicNormalizeDispatcher.java
@@ -38,9 +38,11 @@ public class TestTopicNormalizeDispatcher extends
TestOperationDispatcher {
public static void initialize() throws IOException, IllegalAccessException {
TestTopicOperationDispatcher.initialize();
schemaNormalizeDispatcher =
- new
SchemaNormalizeDispatcher(TestTopicOperationDispatcher.schemaOperationDispatcher);
+ new SchemaNormalizeDispatcher(
+ TestTopicOperationDispatcher.schemaOperationDispatcher,
catalogManager);
topicNormalizeDispatcher =
- new
TopicNormalizeDispatcher(TestTopicOperationDispatcher.topicOperationDispatcher);
+ new TopicNormalizeDispatcher(
+ TestTopicOperationDispatcher.topicOperationDispatcher,
catalogManager);
}
@Test