This is an automated email from the ASF dual-hosted git repository.

mchades pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a2dbcee5c [#9586] improvement(core): optimize checking in use status 
of catalogs and metalakes (#9574)
3a2dbcee5c is described below

commit 3a2dbcee5c85b2c24fa84436cec3c18b357999d4
Author: Mini Yu <[email protected]>
AuthorDate: Thu Jan 8 14:29:19 2026 +0800

    [#9586] improvement(core): optimize checking in use status of catalogs and 
metalakes (#9574)
    
    ### What changes were proposed in this pull request?
    
    This pull request refactors catalog usage validation in the Gravitino
    codebase, improving how the system checks whether catalogs and their
    parent metalakes are enabled ("in use"). The main changes include
    removing the dynamic proxy-based `OperationDispatcherInterceptor`,
    centralizing usage checks within the catalog implementation, and
    introducing a new property to track metalake usage status. Additionally,
    the code now ensures that operations on catalogs consistently verify
    both catalog and metalake usage status.
    
    ### Why are the changes needed?
    
    To clean and simply code path
    
    Fix: #9574
    
    
    ### Does this PR introduce _any_ user-facing change?
    
    N/A.
    
    ### How was this patch tested?
    
    UTs
---
 .../catalog/hive/TestHiveCatalogOperations.java    |   2 +-
 .../client/integration/test/CatalogIT.java         |   3 +-
 .../java/org/apache/gravitino/GravitinoEnv.java    |  57 +-------
 .../apache/gravitino/catalog/CatalogManager.java   | 153 ++++++++++-----------
 .../catalog/OperationDispatcherInterceptor.java    | 116 ----------------
 .../apache/gravitino/connector/BaseCatalog.java    |  54 ++++++++
 .../connector/BaseCatalogPropertiesMetadata.java   |  13 +-
 .../apache/gravitino/metalake/MetalakeManager.java | 116 +++++++++++-----
 .../gravitino/catalog/TestCatalogManager.java      |  53 ++++++-
 .../gravitino/connector/TestBaseCatalogInUse.java  | 114 +++++++++++++++
 10 files changed, 392 insertions(+), 289 deletions(-)

diff --git 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java
 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java
index 85ae07696e..b25ea4bbcc 100644
--- 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java
+++ 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/TestHiveCatalogOperations.java
@@ -60,7 +60,7 @@ class TestHiveCatalogOperations {
     Map<String, PropertyEntry<?>> propertyEntryMap =
         HIVE_PROPERTIES_METADATA.catalogPropertiesMetadata().propertyEntries();
 
-    Assertions.assertEquals(17, propertyEntryMap.size());
+    Assertions.assertEquals(18, propertyEntryMap.size());
     Assertions.assertTrue(propertyEntryMap.containsKey(METASTORE_URIS));
     
Assertions.assertTrue(propertyEntryMap.containsKey(Catalog.PROPERTY_PACKAGE));
     
Assertions.assertTrue(propertyEntryMap.containsKey(BaseCatalog.CATALOG_OPERATION_IMPL));
diff --git 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/CatalogIT.java
 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/CatalogIT.java
index 9f7928633e..45e911bdff 100644
--- 
a/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/CatalogIT.java
+++ 
b/clients/client-java/src/test/java/org/apache/gravitino/client/integration/test/CatalogIT.java
@@ -227,8 +227,7 @@ public class CatalogIT extends BaseIT {
         Assertions.assertThrows(
             CatalogNotInUseException.class,
             () -> metalake.alterCatalog(catalogName, 
CatalogChange.updateComment("new comment")));
-    Assertions.assertTrue(
-        exception.getMessage().contains("please enable it first"), 
exception.getMessage());
+    Assertions.assertTrue(exception.getMessage().contains("is not in use"), 
exception.getMessage());
 
     // test schema operations under non-in-use catalog
     SupportsSchemas schemaOps = loadedCatalog.asSchemas();
diff --git a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java 
b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
index a9980df00a..b17360423e 100644
--- a/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
+++ b/core/src/main/java/org/apache/gravitino/GravitinoEnv.java
@@ -19,7 +19,6 @@
 package org.apache.gravitino;
 
 import com.google.common.base.Preconditions;
-import java.lang.reflect.Proxy;
 import org.apache.gravitino.audit.AuditLogManager;
 import org.apache.gravitino.authorization.AccessControlDispatcher;
 import org.apache.gravitino.authorization.AccessControlManager;
@@ -41,7 +40,6 @@ import 
org.apache.gravitino.catalog.FunctionOperationDispatcher;
 import org.apache.gravitino.catalog.ModelDispatcher;
 import org.apache.gravitino.catalog.ModelNormalizeDispatcher;
 import org.apache.gravitino.catalog.ModelOperationDispatcher;
-import org.apache.gravitino.catalog.OperationDispatcherInterceptor;
 import org.apache.gravitino.catalog.PartitionDispatcher;
 import org.apache.gravitino.catalog.PartitionNormalizeDispatcher;
 import org.apache.gravitino.catalog.PartitionOperationDispatcher;
@@ -555,28 +553,14 @@ public class GravitinoEnv {
 
     SchemaOperationDispatcher schemaOperationDispatcher =
         new SchemaOperationDispatcher(catalogManager, entityStore, 
idGenerator);
-    SchemaDispatcher schemaDispatcherProxy =
-        (SchemaDispatcher)
-            Proxy.newProxyInstance(
-                SchemaDispatcher.class.getClassLoader(),
-                new Class[] {SchemaDispatcher.class},
-                new OperationDispatcherInterceptor(
-                    schemaOperationDispatcher, catalogManager, entityStore));
-    SchemaHookDispatcher schemaHookDispatcher = new 
SchemaHookDispatcher(schemaDispatcherProxy);
+    SchemaHookDispatcher schemaHookDispatcher = new 
SchemaHookDispatcher(schemaOperationDispatcher);
     SchemaNormalizeDispatcher schemaNormalizeDispatcher =
         new SchemaNormalizeDispatcher(schemaHookDispatcher, catalogManager);
     this.schemaDispatcher = new SchemaEventDispatcher(eventBus, 
schemaNormalizeDispatcher);
 
     TableOperationDispatcher tableOperationDispatcher =
         new TableOperationDispatcher(catalogManager, entityStore, idGenerator);
-    TableDispatcher tableDispatcherProxy =
-        (TableDispatcher)
-            Proxy.newProxyInstance(
-                TableDispatcher.class.getClassLoader(),
-                new Class[] {TableDispatcher.class},
-                new OperationDispatcherInterceptor(
-                    tableOperationDispatcher, catalogManager, entityStore));
-    TableHookDispatcher tableHookDispatcher = new 
TableHookDispatcher(tableDispatcherProxy);
+    TableHookDispatcher tableHookDispatcher = new 
TableHookDispatcher(tableOperationDispatcher);
     TableNormalizeDispatcher tableNormalizeDispatcher =
         new TableNormalizeDispatcher(tableHookDispatcher, catalogManager);
     this.tableDispatcher = new TableEventDispatcher(eventBus, 
tableNormalizeDispatcher);
@@ -585,55 +569,28 @@ public class GravitinoEnv {
     //  partition doesn't have ownership, so we don't need it now.
     PartitionOperationDispatcher partitionOperationDispatcher =
         new PartitionOperationDispatcher(catalogManager, entityStore, 
idGenerator);
-    PartitionDispatcher partitionDispatcherProxy =
-        (PartitionDispatcher)
-            Proxy.newProxyInstance(
-                PartitionDispatcher.class.getClassLoader(),
-                new Class[] {PartitionDispatcher.class},
-                new OperationDispatcherInterceptor(
-                    partitionOperationDispatcher, catalogManager, 
entityStore));
     PartitionNormalizeDispatcher partitionNormalizeDispatcher =
-        new PartitionNormalizeDispatcher(partitionDispatcherProxy, 
catalogManager);
+        new PartitionNormalizeDispatcher(partitionOperationDispatcher, 
catalogManager);
     this.partitionDispatcher = new PartitionEventDispatcher(eventBus, 
partitionNormalizeDispatcher);
 
     FilesetOperationDispatcher filesetOperationDispatcher =
         new FilesetOperationDispatcher(catalogManager, entityStore, 
idGenerator);
-    FilesetDispatcher filesetDispatcherProxy =
-        (FilesetDispatcher)
-            Proxy.newProxyInstance(
-                FilesetDispatcher.class.getClassLoader(),
-                new Class[] {FilesetDispatcher.class},
-                new OperationDispatcherInterceptor(
-                    filesetOperationDispatcher, catalogManager, entityStore));
-    FilesetHookDispatcher filesetHookDispatcher = new 
FilesetHookDispatcher(filesetDispatcherProxy);
+    FilesetHookDispatcher filesetHookDispatcher =
+        new FilesetHookDispatcher(filesetOperationDispatcher);
     FilesetNormalizeDispatcher filesetNormalizeDispatcher =
         new FilesetNormalizeDispatcher(filesetHookDispatcher, catalogManager);
     this.filesetDispatcher = new FilesetEventDispatcher(eventBus, 
filesetNormalizeDispatcher);
 
     TopicOperationDispatcher topicOperationDispatcher =
         new TopicOperationDispatcher(catalogManager, entityStore, idGenerator);
-    TopicDispatcher topicDispatcherProxy =
-        (TopicDispatcher)
-            Proxy.newProxyInstance(
-                TopicDispatcher.class.getClassLoader(),
-                new Class[] {TopicDispatcher.class},
-                new OperationDispatcherInterceptor(
-                    topicOperationDispatcher, catalogManager, entityStore));
-    TopicHookDispatcher topicHookDispatcher = new 
TopicHookDispatcher(topicDispatcherProxy);
+    TopicHookDispatcher topicHookDispatcher = new 
TopicHookDispatcher(topicOperationDispatcher);
     TopicNormalizeDispatcher topicNormalizeDispatcher =
         new TopicNormalizeDispatcher(topicHookDispatcher, catalogManager);
     this.topicDispatcher = new TopicEventDispatcher(eventBus, 
topicNormalizeDispatcher);
 
     ModelOperationDispatcher modelOperationDispatcher =
         new ModelOperationDispatcher(catalogManager, entityStore, idGenerator);
-    ModelDispatcher modelDispatcherProxy =
-        (ModelDispatcher)
-            Proxy.newProxyInstance(
-                ModelDispatcher.class.getClassLoader(),
-                new Class[] {ModelDispatcher.class},
-                new OperationDispatcherInterceptor(
-                    modelOperationDispatcher, catalogManager, entityStore));
-    ModelHookDispatcher modelHookDispatcher = new 
ModelHookDispatcher(modelDispatcherProxy);
+    ModelHookDispatcher modelHookDispatcher = new 
ModelHookDispatcher(modelOperationDispatcher);
     ModelNormalizeDispatcher modelNormalizeDispatcher =
         new ModelNormalizeDispatcher(modelHookDispatcher, catalogManager);
     this.modelDispatcher = new ModelEventDispatcher(eventBus, 
modelNormalizeDispatcher);
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java 
b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
index 6835d5ac28..b9c21fcb1f 100644
--- a/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
+++ b/core/src/main/java/org/apache/gravitino/catalog/CatalogManager.java
@@ -23,9 +23,8 @@ import static org.apache.gravitino.Catalog.Type.FILESET;
 import static org.apache.gravitino.StringIdentifier.DUMMY_ID;
 import static 
org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForAlter;
 import static 
org.apache.gravitino.catalog.PropertiesMetadataHelpers.validatePropertyForCreate;
-import static 
org.apache.gravitino.connector.BaseCatalogPropertiesMetadata.BASIC_CATALOG_PROPERTIES_METADATA;
+import static 
org.apache.gravitino.connector.BaseCatalogPropertiesMetadata.PROPERTY_METALAKE_IN_USE;
 import static org.apache.gravitino.metalake.MetalakeManager.checkMetalake;
-import static org.apache.gravitino.metalake.MetalakeManager.metalakeInUse;
 
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
@@ -76,7 +75,6 @@ import org.apache.gravitino.StringIdentifier;
 import org.apache.gravitino.connector.BaseCatalog;
 import org.apache.gravitino.connector.CatalogOperations;
 import org.apache.gravitino.connector.HasPropertyMetadata;
-import org.apache.gravitino.connector.PropertyEntry;
 import org.apache.gravitino.connector.SupportsSchemas;
 import org.apache.gravitino.connector.authorization.BaseAuthorization;
 import org.apache.gravitino.connector.capability.Capability;
@@ -84,7 +82,6 @@ import 
org.apache.gravitino.exceptions.CatalogAlreadyExistsException;
 import org.apache.gravitino.exceptions.CatalogInUseException;
 import org.apache.gravitino.exceptions.CatalogNotInUseException;
 import org.apache.gravitino.exceptions.GravitinoRuntimeException;
-import org.apache.gravitino.exceptions.MetalakeNotInUseException;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.exceptions.NoSuchMetalakeException;
@@ -116,17 +113,6 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(CatalogManager.class);
 
-  public void checkCatalogInUse(EntityStore store, NameIdentifier ident)
-      throws NoSuchMetalakeException, NoSuchCatalogException, 
CatalogNotInUseException,
-          MetalakeNotInUseException {
-    NameIdentifier metalakeIdent = 
NameIdentifier.of(ident.namespace().levels());
-    checkMetalake(metalakeIdent, store);
-
-    if (!getCatalogInUseValue(store, ident)) {
-      throw new CatalogNotInUseException("Catalog %s is not in use, please 
enable it first", ident);
-    }
-  }
-
   /** Wrapper class for a catalog instance and its class loader. */
   public static class CatalogWrapper {
 
@@ -388,13 +374,13 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
    */
   @Override
   public Catalog loadCatalog(NameIdentifier ident) throws 
NoSuchCatalogException {
-    NameIdentifier metalakeIdent = 
NameIdentifier.of(ident.namespace().levels());
     return TreeLockUtils.doWithTreeLock(
         ident,
         LockType.READ,
         () -> {
-          checkMetalake(metalakeIdent, store);
-          return loadCatalogAndWrap(ident).catalog;
+          BaseCatalog baseCatalog = loadCatalogAndWrap(ident).catalog();
+          baseCatalog.checkMetalakeInUse();
+          return baseCatalog;
         });
   }
 
@@ -555,18 +541,18 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
   public void enableCatalog(NameIdentifier ident)
       throws NoSuchCatalogException, CatalogNotInUseException {
     NameIdentifier metalakeIdent = 
NameIdentifier.of(ident.namespace().levels());
-
     TreeLockUtils.doWithTreeLock(
         metalakeIdent,
         LockType.WRITE,
         () -> {
-          checkMetalake(metalakeIdent, store);
+          BaseCatalog baseCatalog = loadCatalogAndWrap(ident).catalog();
+          baseCatalog.checkMetalakeInUse();
 
-          try {
-            if (catalogInUse(store, ident)) {
-              return null;
-            }
+          if (baseCatalog.catalogInUse()) {
+            return null;
+          }
 
+          try {
             store.update(
                 ident,
                 CatalogEntity.class,
@@ -595,17 +581,18 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
   @Override
   public void disableCatalog(NameIdentifier ident) throws 
NoSuchCatalogException {
     NameIdentifier metalakeIdent = 
NameIdentifier.of(ident.namespace().levels());
-
     TreeLockUtils.doWithTreeLock(
         metalakeIdent,
         LockType.WRITE,
         () -> {
-          checkMetalake(metalakeIdent, store);
+          BaseCatalog baseCatalog = loadCatalogAndWrap(ident).catalog();
+          baseCatalog.checkMetalakeInUse();
+
+          if (!baseCatalog.catalogInUse()) {
+            return null;
+          }
 
           try {
-            if (!catalogInUse(store, ident)) {
-              return null;
-            }
             store.update(
                 ident,
                 CatalogEntity.class,
@@ -643,20 +630,21 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
   @Override
   public Catalog alterCatalog(NameIdentifier ident, CatalogChange... changes)
       throws NoSuchCatalogException, IllegalArgumentException {
+
     TreeLockUtils.doWithTreeLock(
         ident,
         LockType.READ,
         () -> {
-          checkCatalogInUse(store, ident);
-
           // There could be a race issue that someone is using the catalog 
from cache while we are
           // updating it.
-
           CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
           if (catalogWrapper == null) {
             throw new NoSuchCatalogException(CATALOG_DOES_NOT_EXIST_MSG, 
ident);
           }
 
+          BaseCatalog catalog = catalogWrapper.catalog();
+          catalog.checkMetalakeAndCatalogInUse();
+
           try {
             catalogWrapper.doWithPropertiesMeta(
                 f -> {
@@ -739,9 +727,11 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
         metalakeIdent,
         LockType.WRITE,
         () -> {
-          checkMetalake(metalakeIdent, store);
           try {
-            boolean catalogInUse = getCatalogInUseValue(store, ident);
+            CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
+            catalogWrapper.catalog().checkMetalakeInUse();
+
+            boolean catalogInUse = catalogWrapper.catalog().catalogInUse();
             if (catalogInUse && !force) {
               throw new CatalogInUseException(
                   "Catalog %s is in use, please disable it first or use force 
option", ident);
@@ -750,8 +740,6 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
             Namespace schemaNs = Namespace.of(ident.namespace().level(0), 
ident.name());
             List<SchemaEntity> schemaEntities =
                 store.list(schemaNs, SchemaEntity.class, EntityType.SCHEMA);
-
-            CatalogWrapper catalogWrapper = loadCatalogAndWrap(ident);
             if (!force && containsUserCreatedSchemas(schemaEntities, 
catalogWrapper)) {
               throw new NonEmptyCatalogException(
                   "Catalog %s has schemas, please drop them first or use force 
option", ident);
@@ -865,35 +853,6 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
     return catalogCache.get(ident, this::loadCatalogInternal);
   }
 
-  private boolean catalogInUse(EntityStore store, NameIdentifier ident)
-      throws NoSuchMetalakeException, NoSuchCatalogException {
-    NameIdentifier metalakeIdent = 
NameIdentifier.of(ident.namespace().levels());
-    return metalakeInUse(store, metalakeIdent) && getCatalogInUseValue(store, 
ident);
-  }
-
-  private boolean getCatalogInUseValue(EntityStore store, NameIdentifier 
catalogIdent) {
-    try {
-      CatalogWrapper wrapper = catalogCache.getIfPresent(catalogIdent);
-      CatalogEntity catalogEntity;
-      if (wrapper != null) {
-        catalogEntity = wrapper.catalog.entity();
-      } else {
-        catalogEntity = store.get(catalogIdent, EntityType.CATALOG, 
CatalogEntity.class);
-      }
-      return (boolean)
-          BASIC_CATALOG_PROPERTIES_METADATA.getOrDefault(
-              catalogEntity.getProperties(), PROPERTY_IN_USE);
-
-    } catch (NoSuchEntityException e) {
-      LOG.warn("Catalog {} does not exist", catalogIdent, e);
-      throw new NoSuchCatalogException(CATALOG_DOES_NOT_EXIST_MSG, 
catalogIdent);
-
-    } catch (IOException e) {
-      LOG.error("Failed to do store operation", e);
-      throw new RuntimeException(e);
-    }
-  }
-
   private boolean isManagedStorageCatalog(CatalogWrapper catalogWrapper) {
     try {
       Capability capability = catalogWrapper.capabilities();
@@ -1026,22 +985,6 @@ public class CatalogManager implements CatalogDispatcher, 
Closeable {
     }
   }
 
-  private Set<String> getHiddenPropertyNames(CatalogEntity entity) {
-    Map<String, String> conf = entity.getProperties();
-    String provider = entity.getProvider();
-
-    try (IsolatedClassLoader classLoader = createClassLoader(provider, conf)) {
-      BaseCatalog<?> catalog = createBaseCatalog(classLoader, entity);
-      return classLoader.withClassLoader(
-          cl ->
-              
catalog.catalogPropertiesMetadata().propertyEntries().values().stream()
-                  .filter(PropertyEntry::isHidden)
-                  .map(PropertyEntry::getName)
-                  .collect(Collectors.toSet()),
-          RuntimeException.class);
-    }
-  }
-
   private BaseCatalog<?> createBaseCatalog(IsolatedClassLoader classLoader, 
CatalogEntity entity) {
     // Load Catalog class instance
     BaseCatalog<?> catalog = createCatalogInstance(classLoader, 
entity.getProvider());
@@ -1255,4 +1198,52 @@ public class CatalogManager implements 
CatalogDispatcher, Closeable {
     // If the provider is not "hadoop", we assume it is already a fileset 
catalog entity.
     return entity;
   }
+
+  /**
+   * Set the metalake in-use status in a specified catalog.
+   *
+   * @param nameIdentifier The name identifier of the catalog.
+   * @param status The in-use status to set.
+   */
+  public void setMetalakeInUseStatus(NameIdentifier nameIdentifier, boolean 
status) {
+    updateCatalogProperty(nameIdentifier, PROPERTY_METALAKE_IN_USE, 
String.valueOf(status));
+  }
+
+  private void updateCatalogProperty(
+      NameIdentifier nameIdentifier, String propertyKey, String propertyValue) 
{
+    try {
+      store.update(
+          nameIdentifier,
+          CatalogEntity.class,
+          EntityType.CATALOG,
+          catalog -> {
+            CatalogEntity.Builder newCatalogBuilder =
+                newCatalogBuilder(nameIdentifier.namespace(), catalog);
+
+            Map<String, String> newProps =
+                catalog.getProperties() == null
+                    ? new HashMap<>()
+                    : new HashMap<>(catalog.getProperties());
+            newProps.put(propertyKey, propertyValue);
+            newCatalogBuilder.withProperties(newProps);
+
+            return newCatalogBuilder.build();
+          });
+      catalogCache.invalidate(nameIdentifier);
+
+    } catch (NoSuchCatalogException e) {
+      LOG.error("Catalog {} does not exist", nameIdentifier, e);
+      throw new RuntimeException(e);
+    } catch (IllegalArgumentException e) {
+      LOG.error(
+          "Failed to update catalog {} property {} with unknown change",
+          nameIdentifier,
+          propertyKey,
+          e);
+      throw e;
+    } catch (IOException ioe) {
+      LOG.error("Failed to update catalog {} property {}", nameIdentifier, 
propertyKey, ioe);
+      throw new RuntimeException(ioe);
+    }
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/OperationDispatcherInterceptor.java
 
b/core/src/main/java/org/apache/gravitino/catalog/OperationDispatcherInterceptor.java
deleted file mode 100644
index 89d9ec958a..0000000000
--- 
a/core/src/main/java/org/apache/gravitino/catalog/OperationDispatcherInterceptor.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.catalog;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import org.apache.gravitino.EntityStore;
-import org.apache.gravitino.NameIdentifier;
-import org.apache.gravitino.Namespace;
-import org.apache.gravitino.lock.LockType;
-import org.apache.gravitino.lock.TreeLockUtils;
-import org.apache.gravitino.utils.NameIdentifierUtil;
-
-/**
- * {@code OperationDispatcherInterceptor} is an invocation handler that 
intercepts method calls to
- * an operation dispatcher to perform catalog usage checks before proceeding 
with the actual method
- * invocation.
- *
- * <p>Note: This interceptor will only intercept methods in
- *
- * <p>SchemaDispatcher
- *
- * <p>TableDispatch
- *
- * <p>FilesetDispatch
- *
- * <p>ModelDispatch
- *
- * <p>TopicDispatch
- *
- * <p>PartitionDispatch
- */
-public class OperationDispatcherInterceptor implements InvocationHandler {
-  private final Object dispatcher;
-  private final CatalogManager catalogManager;
-  private final EntityStore store;
-
-  /**
-   * An {@link InvocationHandler} implementation that intercepts method calls 
on dispatcher objects
-   * in the Gravitino catalog system. This class is used as part of the 
dynamic proxy pattern to
-   * wrap dispatcher instances, enabling pre-processing logic such as catalog 
existence checks and
-   * tree-based locking before delegating the actual method invocation to the 
underlying dispatcher.
-   *
-   * <p>For each intercepted method call, if the first argument is a {@link 
NameIdentifier} or
-   * {@link Namespace}, the interceptor extracts the catalog identifier and 
acquires a read lock on
-   * the catalog using {@link TreeLockUtils}. It then checks if the catalog is 
in use via the {@link
-   * CatalogManager}. Only after these checks and locks does it invoke the 
original method on the
-   * dispatcher.
-   *
-   * <p>This mechanism ensures that all dispatcher operations are performed 
safely and consistently
-   * with respect to catalog state and concurrency requirements.
-   */
-  public OperationDispatcherInterceptor(
-      Object dispatcher, CatalogManager catalogManager, EntityStore store) {
-    this.dispatcher = dispatcher;
-    this.catalogManager = catalogManager;
-    this.store = store;
-  }
-
-  @Override
-  public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
-    if (args != null && args.length > 0) {
-      NameIdentifier catalogIdent = null;
-      if (args[0] instanceof NameIdentifier ident) {
-        catalogIdent = NameIdentifierUtil.getCatalogIdentifier(ident);
-      } else if (args[0] instanceof Namespace ns) {
-        if (ns.length() >= 2) {
-          catalogIdent = NameIdentifier.of(ns.level(0), ns.level(1));
-        }
-      }
-
-      if (catalogIdent != null) {
-        final NameIdentifier finalCatalogIdent = catalogIdent;
-        // Note: In this implementation, the catalog-in-use check is performed 
separately
-        // under a tree lock before invoking the main operation. In the 
original code,
-        // this check may have been performed as part of a single, monolithic 
operation.
-        // This separation ensures that the catalog's state is validated under 
the appropriate
-        // lock, improving thread safety and consistency. However, it 
introduces a trade-off:
-        // the check and the main operation are not atomic with respect to 
each other, so there
-        // is a small window where the catalog's state could change between 
the check and the
-        // operation. This approach was chosen to avoid holding locks during 
potentially
-        // long-running operations, balancing safety and performance.
-        TreeLockUtils.doWithTreeLock(
-            catalogIdent,
-            LockType.READ,
-            () -> {
-              catalogManager.checkCatalogInUse(store, finalCatalogIdent);
-              return null;
-            });
-      }
-    }
-
-    try {
-      return method.invoke(dispatcher, args);
-    } catch (InvocationTargetException e) {
-      throw e.getTargetException();
-    }
-  }
-}
diff --git a/core/src/main/java/org/apache/gravitino/connector/BaseCatalog.java 
b/core/src/main/java/org/apache/gravitino/connector/BaseCatalog.java
index 6c749f903f..f681d36c66 100644
--- a/core/src/main/java/org/apache/gravitino/connector/BaseCatalog.java
+++ b/core/src/main/java/org/apache/gravitino/connector/BaseCatalog.java
@@ -18,6 +18,8 @@
  */
 package org.apache.gravitino.connector;
 
+import static 
org.apache.gravitino.connector.BaseCatalogPropertiesMetadata.PROPERTY_METALAKE_IN_USE;
+
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
 import java.io.Closeable;
@@ -32,6 +34,8 @@ import 
org.apache.gravitino.connector.authorization.AuthorizationPlugin;
 import org.apache.gravitino.connector.authorization.BaseAuthorization;
 import org.apache.gravitino.connector.capability.Capability;
 import org.apache.gravitino.credential.CatalogCredentialManager;
+import org.apache.gravitino.exceptions.CatalogNotInUseException;
+import org.apache.gravitino.exceptions.MetalakeNotInUseException;
 import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.utils.IsolatedClassLoader;
 import org.slf4j.Logger;
@@ -175,6 +179,8 @@ public abstract class BaseCatalog<T extends BaseCatalog>
           Preconditions.checkArgument(
               entity != null && conf != null, "entity and conf must be set 
before calling ops()");
           CatalogOperations newOps = createOps(conf);
+          // Check metalake and catalog in use
+          checkMetalakeAndCatalogInUse();
           newOps.initialize(conf, entity.toCatalogInfo(), this);
           ops =
               newProxyPlugin(conf)
@@ -190,6 +196,54 @@ public abstract class BaseCatalog<T extends BaseCatalog>
     return ops;
   }
 
+  public void checkMetalakeAndCatalogInUse() {
+    Map<String, String> catalogProperties = entity.getProperties();
+    if (catalogProperties == null) {
+      return;
+    }
+
+    boolean metalakeInuse =
+        
Boolean.parseBoolean(catalogProperties.getOrDefault(PROPERTY_METALAKE_IN_USE, 
"true"));
+    if (!metalakeInuse) {
+      throw new MetalakeNotInUseException(
+          "The metalake that holds catalog %s is not in use", entity.name());
+    }
+
+    // Check the stack and find whether its within method `dropCatalog` as we 
don't need to check
+    // the catalog in use status when dropping the catalog.
+    if (isInvokedBy("dropCatalog")) {
+      LOG.info("Skip checking catalog in use status since it's invoked by 
dropCatalog method.");
+      return;
+    }
+
+    boolean catalogInuse =
+        Boolean.parseBoolean(catalogProperties.getOrDefault(PROPERTY_IN_USE, 
"true"));
+    if (!catalogInuse) {
+      throw new CatalogNotInUseException("The catalog %s is not in use", 
entity.name());
+    }
+  }
+
+  public void checkMetalakeInUse() {
+    Map<String, String> catalogProperties = entity().getProperties();
+    String metaLakeInUseStr = 
catalogProperties.getOrDefault(PROPERTY_METALAKE_IN_USE, "true");
+    boolean metalakeInuse = Boolean.parseBoolean(metaLakeInUseStr);
+    if (!metalakeInuse) {
+      throw new MetalakeNotInUseException(
+          "Metalake %s is not in use, please enable it first", 
entity().name());
+    }
+  }
+
+  public boolean catalogInUse() {
+    Map<String, String> catalogProperties = entity().getProperties();
+    String catalogInUseStr = 
catalogProperties.getOrDefault(Catalog.PROPERTY_IN_USE, "true");
+    return Boolean.parseBoolean(catalogInUseStr);
+  }
+
+  private boolean isInvokedBy(String methodName) {
+    return StackWalker.getInstance()
+        .walk(frames -> frames.anyMatch(frame -> 
frame.getMethodName().equals(methodName)));
+  }
+
   public AuthorizationPlugin getAuthorizationPlugin() {
     if (authorizationPlugin == null) {
       synchronized (this) {
diff --git 
a/core/src/main/java/org/apache/gravitino/connector/BaseCatalogPropertiesMetadata.java
 
b/core/src/main/java/org/apache/gravitino/connector/BaseCatalogPropertiesMetadata.java
index 28a95432f9..d304c57b77 100644
--- 
a/core/src/main/java/org/apache/gravitino/connector/BaseCatalogPropertiesMetadata.java
+++ 
b/core/src/main/java/org/apache/gravitino/connector/BaseCatalogPropertiesMetadata.java
@@ -36,6 +36,12 @@ import org.apache.gravitino.annotation.Evolving;
 @Evolving
 public abstract class BaseCatalogPropertiesMetadata extends 
BasePropertiesMetadata {
 
+  /**
+   * This property indicates whether the metalake that contains this catalog 
is in use. When a
+   * metalake is disabled, this property is set to {@code false} on all its 
catalogs.
+   */
+  public static String PROPERTY_METALAKE_IN_USE = "metalake-in-use";
+
   public static final PropertiesMetadata BASIC_CATALOG_PROPERTIES_METADATA =
       new BaseCatalogPropertiesMetadata() {
         @Override
@@ -88,7 +94,12 @@ public abstract class BaseCatalogPropertiesMetadata extends 
BasePropertiesMetada
                   PROPERTY_IN_USE,
                   "The property indicating the catalog is in use",
                   true /* default value */,
-                  false /* hidden */)),
+                  false /* hidden */),
+              PropertyEntry.booleanReservedPropertyEntry(
+                  PROPERTY_METALAKE_IN_USE,
+                  "The property indicating the metalake that holds the catalog 
is in use",
+                  true /* default value */,
+                  true /* hidden */)),
           PropertyEntry::getName);
 
   @Override
diff --git 
a/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java 
b/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java
index fd6592d727..5d586f718e 100644
--- a/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java
+++ b/core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.metalake;
 
 import static org.apache.gravitino.Metalake.PROPERTY_IN_USE;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import java.io.Closeable;
 import java.io.IOException;
@@ -32,6 +33,7 @@ import java.util.stream.Collectors;
 import org.apache.gravitino.Entity.EntityType;
 import org.apache.gravitino.EntityAlreadyExistsException;
 import org.apache.gravitino.EntityStore;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.MetalakeChange;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
@@ -367,25 +369,31 @@ public class MetalakeManager implements 
MetalakeDispatcher, Closeable {
         () -> {
           try {
             boolean inUse = metalakeInUse(store, ident);
-            if (!inUse) {
-              store.update(
-                  ident,
-                  BaseMetalake.class,
-                  EntityType.METALAKE,
-                  metalake -> {
-                    BaseMetalake.Builder builder = 
newMetalakeBuilder(metalake);
-
-                    Map<String, String> newProps =
-                        metalake.properties() == null
-                            ? Maps.newHashMap()
-                            : Maps.newHashMap(metalake.properties());
-                    newProps.put(PROPERTY_IN_USE, "true");
-                    builder.withProperties(newProps);
-
-                    return builder.build();
-                  });
+            if (inUse) {
+              return null;
             }
 
+            store.update(
+                ident,
+                BaseMetalake.class,
+                EntityType.METALAKE,
+                metalake -> {
+                  BaseMetalake.Builder builder = newMetalakeBuilder(metalake);
+
+                  Map<String, String> newProps =
+                      metalake.properties() == null
+                          ? Maps.newHashMap()
+                          : Maps.newHashMap(metalake.properties());
+                  newProps.put(PROPERTY_IN_USE, "true");
+                  builder.withProperties(newProps);
+
+                  return builder.build();
+                });
+
+            // The only problem is that we can't make sure we can change all 
catalog properties
+            // in a transaction. If any catalog property update fails, the 
metalake is already
+            // enabled but catalog properties remain inconsistent.
+            updateMetalakeInUseStatusInCatalog(ident, true);
             return null;
           } catch (IOException e) {
             throw new RuntimeException(e);
@@ -401,24 +409,31 @@ public class MetalakeManager implements 
MetalakeDispatcher, Closeable {
         () -> {
           try {
             boolean inUse = metalakeInUse(store, ident);
-            if (inUse) {
-              store.update(
-                  ident,
-                  BaseMetalake.class,
-                  EntityType.METALAKE,
-                  metalake -> {
-                    BaseMetalake.Builder builder = 
newMetalakeBuilder(metalake);
-
-                    Map<String, String> newProps =
-                        metalake.properties() == null
-                            ? Maps.newHashMap()
-                            : Maps.newHashMap(metalake.properties());
-                    newProps.put(PROPERTY_IN_USE, "false");
-                    builder.withProperties(newProps);
-
-                    return builder.build();
-                  });
+            if (!inUse) {
+              return null;
             }
+
+            store.update(
+                ident,
+                BaseMetalake.class,
+                EntityType.METALAKE,
+                metalake -> {
+                  BaseMetalake.Builder builder = newMetalakeBuilder(metalake);
+
+                  Map<String, String> newProps =
+                      metalake.properties() == null
+                          ? Maps.newHashMap()
+                          : Maps.newHashMap(metalake.properties());
+                  newProps.put(PROPERTY_IN_USE, "false");
+                  builder.withProperties(newProps);
+
+                  return builder.build();
+                });
+
+            // The only problem is that we can't make sure we can change all 
catalog properties
+            // in a transaction, if any of them fails, the metalake is already 
enabled and the value
+            // in catalog is inconsistent.
+            updateMetalakeInUseStatusInCatalog(ident, false);
             return null;
           } catch (IOException e) {
             throw new RuntimeException(e);
@@ -481,4 +496,37 @@ public class MetalakeManager implements 
MetalakeDispatcher, Closeable {
 
     return builder.withProperties(newProps);
   }
+
+  private void updateMetalakeInUseStatusInCatalog(NameIdentifier ident, 
boolean value)
+      throws IOException {
+    List<NameIdentifier> failedCatalogs = Lists.newArrayList();
+    store
+        .list(Namespace.of(ident.name()), CatalogEntity.class, 
EntityType.CATALOG)
+        .forEach(
+            catalogEntity -> {
+              try {
+                // update the properties metalake-in-use in catalog to true
+                GravitinoEnv.getInstance()
+                    .catalogManager()
+                    .setMetalakeInUseStatus(catalogEntity.nameIdentifier(), 
value);
+                LOG.info(
+                    "Enabled or disable metalake-in-use property for catalog 
{} success",
+                    catalogEntity.nameIdentifier());
+              } catch (Exception e) {
+                // Only log the error and continue to update other catalogs
+                LOG.error(
+                    "Failed to enable or disable metalake-in-use property for 
catalog {}",
+                    catalogEntity.nameIdentifier(),
+                    e);
+                failedCatalogs.add(catalogEntity.nameIdentifier());
+              }
+            });
+
+    if (!failedCatalogs.isEmpty()) {
+      LOG.error(
+          "Total failed catalogs when update properties: {}",
+          
failedCatalogs.stream().map(NameIdentifier::toString).collect(Collectors.joining(",
 ")));
+      throw new RuntimeException("Failed to update in use status of the 
following catalogs.");
+    }
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java 
b/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
index 8a4f728af2..ba08eba42f 100644
--- a/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
+++ b/core/src/test/java/org/apache/gravitino/catalog/TestCatalogManager.java
@@ -39,6 +39,7 @@ import org.apache.gravitino.Catalog;
 import org.apache.gravitino.CatalogChange;
 import org.apache.gravitino.Config;
 import org.apache.gravitino.Configs;
+import org.apache.gravitino.Entity.EntityType;
 import org.apache.gravitino.EntityStore;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
@@ -52,6 +53,7 @@ import 
org.apache.gravitino.exceptions.NoSuchMetalakeException;
 import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.meta.AuditInfo;
 import org.apache.gravitino.meta.BaseMetalake;
+import org.apache.gravitino.meta.CatalogEntity;
 import org.apache.gravitino.meta.SchemaEntity;
 import org.apache.gravitino.meta.SchemaVersion;
 import org.apache.gravitino.storage.RandomIdGenerator;
@@ -536,7 +538,7 @@ public class TestCatalogManager {
   }
 
   @Test
-  public void testDropCatalog() {
+  public void testDropCatalog() throws Exception {
     NameIdentifier ident = NameIdentifier.of("metalake", "test41");
     Map<String, String> props =
         ImmutableMap.of(
@@ -550,7 +552,8 @@ public class TestCatalogManager {
             "value3");
     String comment = "comment";
 
-    catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider, 
comment, props);
+    Catalog catalog =
+        catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider, 
comment, props);
 
     // Test drop catalog
     Exception exception =
@@ -559,6 +562,19 @@ public class TestCatalogManager {
     Assertions.assertTrue(exception.getMessage().contains("Catalog 
metalake.test41 is in use"));
 
     Assertions.assertDoesNotThrow(() -> catalogManager.disableCatalog(ident));
+
+    CatalogEntity oldEntity = entityStore.get(ident, EntityType.CATALOG, 
CatalogEntity.class);
+    FieldUtils.writeField(catalog, "entity", oldEntity, true);
+
+    CatalogManager.CatalogWrapper catalogWrapper =
+        Mockito.mock(CatalogManager.CatalogWrapper.class);
+    Capability capability = Mockito.mock(Capability.class);
+    CapabilityResult unsupportedResult = CapabilityResult.unsupported("Not 
managed");
+    
Mockito.doReturn(catalogWrapper).when(catalogManager).loadCatalogAndWrap(ident);
+    Mockito.doReturn(catalog).when(catalogWrapper).catalog();
+    Mockito.doReturn(capability).when(catalogWrapper).capabilities();
+    Mockito.doReturn(unsupportedResult).when(capability).managedStorage(any());
+
     boolean dropped = catalogManager.dropCatalog(ident);
     Assertions.assertTrue(dropped);
 
@@ -585,7 +601,8 @@ public class TestCatalogManager {
             PROPERTY_KEY5_PREFIX + "1",
             "value3");
     String comment = "comment";
-    catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider, 
comment, props);
+    Catalog catalog =
+        catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider, 
comment, props);
     SchemaEntity schemaEntity =
         SchemaEntity.builder()
             .withId(RandomIdGenerator.INSTANCE.nextId())
@@ -605,6 +622,7 @@ public class TestCatalogManager {
     
Mockito.doReturn(catalogWrapper).when(catalogManager).loadCatalogAndWrap(ident);
     Mockito.doReturn(capability).when(catalogWrapper).capabilities();
     Mockito.doReturn(unsupportedResult).when(capability).managedStorage(any());
+    Mockito.doReturn(catalog).when(catalogWrapper).catalog();
     Mockito.doThrow(new RuntimeException("Failed connect"))
         .when(catalogWrapper)
         .doWithSchemaOps(any());
@@ -613,7 +631,7 @@ public class TestCatalogManager {
 
   @Test
   void testAlterMutableProperties() {
-    NameIdentifier ident = NameIdentifier.of("metalake", "test41");
+    NameIdentifier ident = NameIdentifier.of("metalake", "test51");
     Map<String, String> props =
         ImmutableMap.of(
             "provider",
@@ -635,6 +653,33 @@ public class TestCatalogManager {
     Assertions.assertNotEquals(oldCatalog, newCatalog);
   }
 
+  @Test
+  public void testEnableAndDisableCatalog() throws Exception {
+    NameIdentifier ident = NameIdentifier.of("metalake", "enable_disable");
+    Map<String, String> props =
+        ImmutableMap.of(
+            "provider",
+            "test",
+            PROPERTY_KEY1,
+            "value1",
+            PROPERTY_KEY2,
+            "value2",
+            PROPERTY_KEY5_PREFIX + "1",
+            "value3");
+
+    catalogManager.createCatalog(ident, Catalog.Type.RELATIONAL, provider, 
"comment", props);
+
+    catalogManager.disableCatalog(ident);
+    CatalogEntity disabled = entityStore.get(ident, EntityType.CATALOG, 
CatalogEntity.class);
+    Assertions.assertEquals("false", 
disabled.getProperties().get(Catalog.PROPERTY_IN_USE));
+
+    catalogManager.enableCatalog(ident);
+    CatalogEntity enabled = entityStore.get(ident, EntityType.CATALOG, 
CatalogEntity.class);
+    Assertions.assertEquals("true", 
enabled.getProperties().get(Catalog.PROPERTY_IN_USE));
+
+    
Assertions.assertNull(catalogManager.getCatalogCache().getIfPresent(ident));
+  }
+
   private void testProperties(Map<String, String> expectedProps, Map<String, 
String> testProps) {
     expectedProps.forEach(
         (k, v) -> {
diff --git 
a/core/src/test/java/org/apache/gravitino/connector/TestBaseCatalogInUse.java 
b/core/src/test/java/org/apache/gravitino/connector/TestBaseCatalogInUse.java
new file mode 100644
index 0000000000..ff9577b8f0
--- /dev/null
+++ 
b/core/src/test/java/org/apache/gravitino/connector/TestBaseCatalogInUse.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.connector;
+
+import static 
org.apache.gravitino.connector.BaseCatalogPropertiesMetadata.PROPERTY_METALAKE_IN_USE;
+
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.Namespace;
+import org.apache.gravitino.connector.capability.Capability;
+import org.apache.gravitino.exceptions.CatalogNotInUseException;
+import org.apache.gravitino.exceptions.MetalakeNotInUseException;
+import org.apache.gravitino.meta.AuditInfo;
+import org.apache.gravitino.meta.CatalogEntity;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class TestBaseCatalogInUse {
+
+  @Test
+  void testMetalakeAndCatalogInUse() throws IllegalAccessException {
+    CatalogEntity entity = buildCatalogEntity(props(true, true));
+    TestCatalogImpl catalog = new TestCatalogImpl();
+    FieldUtils.writeField(catalog, "entity", entity, true);
+    Assertions.assertDoesNotThrow(() -> 
catalog.checkMetalakeAndCatalogInUse());
+  }
+
+  @Test
+  void testMetalakeNotInUseThrows() throws IllegalAccessException {
+    CatalogEntity entity = buildCatalogEntity(props(false, true));
+    TestCatalogImpl catalog = new TestCatalogImpl();
+    FieldUtils.writeField(catalog, "entity", entity, true);
+    Assertions.assertThrows(
+        MetalakeNotInUseException.class, () -> 
catalog.checkMetalakeAndCatalogInUse());
+  }
+
+  @Test
+  void testCatalogNotInUseThrows() throws IllegalAccessException {
+    CatalogEntity entity = buildCatalogEntity(props(true, false));
+    TestCatalogImpl catalog = new TestCatalogImpl();
+    FieldUtils.writeField(catalog, "entity", entity, true);
+    Assertions.assertThrows(
+        CatalogNotInUseException.class, () -> 
catalog.checkMetalakeAndCatalogInUse());
+  }
+
+  private Map<String, String> props(boolean metalakeInUse, boolean 
catalogInUse) {
+    Map<String, String> props = new HashMap<>();
+    props.put(PROPERTY_METALAKE_IN_USE, String.valueOf(metalakeInUse));
+    props.put(Catalog.PROPERTY_IN_USE, String.valueOf(catalogInUse));
+    return props;
+  }
+
+  private CatalogEntity buildCatalogEntity(Map<String, String> props) {
+    return CatalogEntity.builder()
+        .withId(1L)
+        .withName("test")
+        .withNamespace(Namespace.of("metalake"))
+        .withType(Catalog.Type.RELATIONAL)
+        .withProvider("test")
+        .withComment("comment")
+        .withProperties(props)
+        .withAuditInfo(
+            AuditInfo.builder()
+                .withCreator("tester")
+                .withCreateTime(Instant.now())
+                .withLastModifier("tester")
+                .withLastModifiedTime(Instant.now())
+                .build())
+        .build();
+  }
+
+  private static class TestCatalogImpl extends BaseCatalog<TestCatalogImpl> {
+
+    @Override
+    public String shortName() {
+      return "test";
+    }
+
+    @Override
+    protected CatalogOperations newOps(Map<String, String> config) {
+      return Mockito.mock(CatalogOperations.class);
+    }
+
+    @Override
+    protected Capability newCapability() {
+      return Capability.DEFAULT;
+    }
+
+    @Override
+    public PropertiesMetadata catalogPropertiesMetadata() throws 
UnsupportedOperationException {
+      return BaseCatalogPropertiesMetadata.BASIC_CATALOG_PROPERTIES_METADATA;
+    }
+  }
+}

Reply via email to