This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new a6a69cc9f [#4812] fix(core): Make ensure the schema exists before 
creating tables and topics (#4818)
a6a69cc9f is described below

commit a6a69cc9fcb88a0ea069711e1b7711da598e1f7c
Author: roryqi <[email protected]>
AuthorDate: Fri Aug 30 22:10:13 2024 +0800

    [#4812] fix(core): Make ensure the schema exists before creating tables and 
topics (#4818)
    
    ### What changes were proposed in this pull request?
    If the schema is not created by Gravitino, the Gravitino will lack the
    metadata in the backend storage.
    If we create a table in this schema, the storage won't contain the
    metadata. So it will fail to set owner. Because the storage won't store
    the table. Because the storage won't contain schema id.
    
    This won't bring too much performance cost. Because loadSchema will use
    read lock after first loading. If we have cache, we could be more quick.
    
    ### Why are the changes needed?
    
    Fix: #4812
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added a ut.
---
 .../catalog/TableOperationDispatcher.java          | 156 ++++++++++++---------
 .../catalog/TopicOperationDispatcher.java          | 114 ++++++++-------
 .../catalog/TestPartitionNormalizeDispatcher.java  |   2 +-
 .../catalog/TestPartitionOperationDispatcher.java  |  22 ++-
 .../catalog/TestTableOperationDispatcher.java      |  26 ++++
 .../catalog/TestTopicOperationDispatcher.java      |  29 +++-
 .../gravitino/server/web/rest/TableOperations.java |  22 ++-
 .../gravitino/server/web/rest/TopicOperations.java |  14 +-
 8 files changed, 241 insertions(+), 144 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index 0562d99b7..4472859d9 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -143,70 +143,25 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
       SortOrder[] sortOrders,
       Index[] indexes)
       throws NoSuchSchemaException, TableAlreadyExistsException {
-    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
-    doWithCatalog(
-        catalogIdent,
-        c ->
-            c.doWithPropertiesMeta(
-                p -> {
-                  validatePropertyForCreate(p.tablePropertiesMetadata(), 
properties);
-                  return null;
-                }),
-        IllegalArgumentException.class);
-    long uid = idGenerator.nextId();
-    // Add StringIdentifier to the properties, the specific catalog will 
handle this
-    // StringIdentifier to make sure only when the operation is successful, 
the related
-    // TableEntity will be visible.
-    StringIdentifier stringId = StringIdentifier.fromId(uid);
-    Map<String, String> updatedProperties =
-        StringIdentifier.newPropertiesWithId(stringId, properties);
-
-    // we do not retrieve the table again (to obtain some values generated by 
underlying catalog)
-    // since some catalogs' API is async and the table may not be created 
immediately
-    Table table =
-        doWithCatalog(
-            catalogIdent,
-            c ->
-                c.doWithTableOps(
-                    t ->
-                        t.createTable(
-                            ident,
-                            columns,
-                            comment,
-                            updatedProperties,
-                            partitions == null ? EMPTY_TRANSFORM : partitions,
-                            distribution == null ? Distributions.NONE : 
distribution,
-                            sortOrders == null ? new SortOrder[0] : sortOrders,
-                            indexes == null ? Indexes.EMPTY_INDEXES : 
indexes)),
-            NoSuchSchemaException.class,
-            TableAlreadyExistsException.class);
-
-    TableEntity tableEntity =
-        TableEntity.builder()
-            .withId(uid)
-            .withName(ident.name())
-            .withNamespace(ident.namespace())
-            .withAuditInfo(
-                AuditInfo.builder()
-                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
-                    .withCreateTime(Instant.now())
-                    .build())
-            .build();
-
-    try {
-      store.put(tableEntity, true /* overwrite */);
-    } catch (Exception e) {
-      LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
-      return EntityCombinedTable.of(table)
-          .withHiddenPropertiesSet(
-              getHiddenPropertyNames(
-                  catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, 
table.properties()));
-    }
 
-    return EntityCombinedTable.of(table, tableEntity)
-        .withHiddenPropertiesSet(
-            getHiddenPropertyNames(
-                catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, 
table.properties()));
+    // Load the schema to make sure the schema exists.
+    SchemaDispatcher schemaDispatcher = 
GravitinoEnv.getInstance().schemaDispatcher();
+    NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+    schemaDispatcher.loadSchema(schemaIdent);
+
+    return TreeLockUtils.doWithTreeLock(
+        NameIdentifier.of(ident.namespace().levels()),
+        LockType.WRITE,
+        () ->
+            internalCreateTable(
+                ident,
+                columns,
+                comment,
+                properties,
+                partitions,
+                distribution,
+                sortOrders,
+                indexes));
   }
 
   /**
@@ -462,4 +417,79 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
                 table.properties()))
         .withImported(tableEntity != null);
   }
+
+  private Table internalCreateTable(
+      NameIdentifier ident,
+      Column[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitions,
+      Distribution distribution,
+      SortOrder[] sortOrders,
+      Index[] indexes) {
+    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+    doWithCatalog(
+        catalogIdent,
+        c ->
+            c.doWithPropertiesMeta(
+                p -> {
+                  validatePropertyForCreate(p.tablePropertiesMetadata(), 
properties);
+                  return null;
+                }),
+        IllegalArgumentException.class);
+    long uid = idGenerator.nextId();
+    // Add StringIdentifier to the properties, the specific catalog will 
handle this
+    // StringIdentifier to make sure only when the operation is successful, 
the related
+    // TableEntity will be visible.
+    StringIdentifier stringId = StringIdentifier.fromId(uid);
+    Map<String, String> updatedProperties =
+        StringIdentifier.newPropertiesWithId(stringId, properties);
+
+    // we do not retrieve the table again (to obtain some values generated by 
underlying catalog)
+    // since some catalogs' API is async and the table may not be created 
immediately
+    Table table =
+        doWithCatalog(
+            catalogIdent,
+            c ->
+                c.doWithTableOps(
+                    t ->
+                        t.createTable(
+                            ident,
+                            columns,
+                            comment,
+                            updatedProperties,
+                            partitions == null ? EMPTY_TRANSFORM : partitions,
+                            distribution == null ? Distributions.NONE : 
distribution,
+                            sortOrders == null ? new SortOrder[0] : sortOrders,
+                            indexes == null ? Indexes.EMPTY_INDEXES : 
indexes)),
+            NoSuchSchemaException.class,
+            TableAlreadyExistsException.class);
+
+    TableEntity tableEntity =
+        TableEntity.builder()
+            .withId(uid)
+            .withName(ident.name())
+            .withNamespace(ident.namespace())
+            .withAuditInfo(
+                AuditInfo.builder()
+                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                    .withCreateTime(Instant.now())
+                    .build())
+            .build();
+
+    try {
+      store.put(tableEntity, true /* overwrite */);
+    } catch (Exception e) {
+      LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
+      return EntityCombinedTable.of(table)
+          .withHiddenPropertiesSet(
+              getHiddenPropertyNames(
+                  catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, 
table.properties()));
+    }
+
+    return EntityCombinedTable.of(table, tableEntity)
+        .withHiddenPropertiesSet(
+            getHiddenPropertyNames(
+                catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, 
table.properties()));
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
index ec3c08b33..3b6e4a478 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
@@ -128,58 +128,16 @@ public class TopicOperationDispatcher extends 
OperationDispatcher implements Top
   public Topic createTopic(
       NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, 
String> properties)
       throws NoSuchSchemaException, TopicAlreadyExistsException {
-    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
-    doWithCatalog(
-        catalogIdent,
-        c ->
-            c.doWithPropertiesMeta(
-                p -> {
-                  validatePropertyForCreate(p.topicPropertiesMetadata(), 
properties);
-                  return null;
-                }),
-        IllegalArgumentException.class);
-    Long uid = idGenerator.nextId();
-    StringIdentifier stringId = StringIdentifier.fromId(uid);
-    Map<String, String> updatedProperties =
-        StringIdentifier.newPropertiesWithId(stringId, properties);
-
-    // we do not retrieve the topic again (to obtain some values generated by 
underlying catalog)
-    // since some catalogs' API is async and the table may not be created 
immediately
-    Topic topic =
-        doWithCatalog(
-            catalogIdent,
-            c ->
-                c.doWithTopicOps(t -> t.createTopic(ident, comment, 
dataLayout, updatedProperties)),
-            NoSuchSchemaException.class,
-            TopicAlreadyExistsException.class);
-
-    TopicEntity topicEntity =
-        TopicEntity.builder()
-            .withId(fromProperties(topic.properties()).id())
-            .withName(ident.name())
-            .withComment(comment)
-            .withNamespace(ident.namespace())
-            .withAuditInfo(
-                AuditInfo.builder()
-                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
-                    .withCreateTime(Instant.now())
-                    .build())
-            .build();
 
-    try {
-      store.put(topicEntity, true /* overwrite */);
-    } catch (Exception e) {
-      LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, 
"put", ident, e);
-      return EntityCombinedTopic.of(topic)
-          .withHiddenPropertiesSet(
-              getHiddenPropertyNames(
-                  catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, 
topic.properties()));
-    }
+    // Load the schema to make sure the schema exists.
+    SchemaDispatcher schemaDispatcher = 
GravitinoEnv.getInstance().schemaDispatcher();
+    NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+    schemaDispatcher.loadSchema(schemaIdent);
 
-    return EntityCombinedTopic.of(topic, topicEntity)
-        .withHiddenPropertiesSet(
-            getHiddenPropertyNames(
-                catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, 
topic.properties()));
+    return TreeLockUtils.doWithTreeLock(
+        NameIdentifier.of(ident.namespace().levels()),
+        LockType.WRITE,
+        () -> internalCreateTopic(ident, comment, dataLayout, properties));
   }
 
   /**
@@ -364,4 +322,60 @@ public class TopicOperationDispatcher extends 
OperationDispatcher implements Top
                 catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, 
topic.properties()))
         .withImported(topicEntity != null);
   }
+
+  private Topic internalCreateTopic(
+      NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, 
String> properties) {
+    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+    doWithCatalog(
+        catalogIdent,
+        c ->
+            c.doWithPropertiesMeta(
+                p -> {
+                  validatePropertyForCreate(p.topicPropertiesMetadata(), 
properties);
+                  return null;
+                }),
+        IllegalArgumentException.class);
+    Long uid = idGenerator.nextId();
+    StringIdentifier stringId = StringIdentifier.fromId(uid);
+    Map<String, String> updatedProperties =
+        StringIdentifier.newPropertiesWithId(stringId, properties);
+
+    // we do not retrieve the topic again (to obtain some values generated by 
underlying catalog)
+    // since some catalogs' API is async and the table may not be created 
immediately
+    Topic topic =
+        doWithCatalog(
+            catalogIdent,
+            c ->
+                c.doWithTopicOps(t -> t.createTopic(ident, comment, 
dataLayout, updatedProperties)),
+            NoSuchSchemaException.class,
+            TopicAlreadyExistsException.class);
+
+    TopicEntity topicEntity =
+        TopicEntity.builder()
+            .withId(fromProperties(topic.properties()).id())
+            .withName(ident.name())
+            .withComment(comment)
+            .withNamespace(ident.namespace())
+            .withAuditInfo(
+                AuditInfo.builder()
+                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                    .withCreateTime(Instant.now())
+                    .build())
+            .build();
+
+    try {
+      store.put(topicEntity, true /* overwrite */);
+    } catch (Exception e) {
+      LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, 
"put", ident, e);
+      return EntityCombinedTopic.of(topic)
+          .withHiddenPropertiesSet(
+              getHiddenPropertyNames(
+                  catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, 
topic.properties()));
+    }
+
+    return EntityCombinedTopic.of(topic, topicEntity)
+        .withHiddenPropertiesSet(
+            getHiddenPropertyNames(
+                catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, 
topic.properties()));
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
index 2c47d69bc..0151dcf24 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
@@ -39,7 +39,7 @@ public class TestPartitionNormalizeDispatcher extends 
TestOperationDispatcher {
       NameIdentifierUtil.ofTable(metalake, catalog, SCHEMA, 
"TEST_PARTITION_NORMALIZE_TABLE");
 
   @BeforeAll
-  public static void initialize() {
+  public static void initialize() throws IllegalAccessException {
     TestPartitionOperationDispatcher.prepareTable();
     partitionNormalizeDispatcher =
         new PartitionNormalizeDispatcher(
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java
index 3d81ee557..9ddc3b1d3 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java
@@ -18,9 +18,19 @@
  */
 package org.apache.gravitino.catalog;
 
+import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
+import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
+import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
 import com.google.common.collect.Maps;
 import java.util.Arrays;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.expressions.literals.Literal;
 import org.apache.gravitino.rel.expressions.literals.Literals;
@@ -50,7 +60,7 @@ public class TestPartitionOperationDispatcher extends 
TestOperationDispatcher {
           Maps.newHashMap());
 
   @BeforeAll
-  public static void initialize() {
+  public static void initialize() throws IllegalAccessException {
     prepareTable();
     partitionOperationDispatcher.addPartition(TABLE_IDENT, PARTITION);
 
@@ -66,7 +76,7 @@ public class TestPartitionOperationDispatcher extends 
TestOperationDispatcher {
         "Custom class loader is not used");
   }
 
-  protected static void prepareTable() {
+  protected static void prepareTable() throws IllegalAccessException {
     schemaOperationDispatcher =
         new SchemaOperationDispatcher(catalogManager, entityStore, 
idGenerator);
     tableOperationDispatcher =
@@ -74,6 +84,14 @@ public class TestPartitionOperationDispatcher extends 
TestOperationDispatcher {
     partitionOperationDispatcher =
         new PartitionOperationDispatcher(catalogManager, entityStore, 
idGenerator);
 
+    Config config = mock(Config.class);
+    doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
+    doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
+    doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "schemaDispatcher", 
schemaOperationDispatcher, true);
+
     NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema(metalake, 
catalog, SCHEMA);
     schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
     Column[] columns =
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
index f5f69b77a..1dd31fd33 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
@@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -44,8 +45,10 @@ import org.apache.gravitino.Config;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.TestCatalog;
 import org.apache.gravitino.TestColumn;
 import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.connector.TestCatalogOperations;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.meta.AuditInfo;
@@ -306,6 +309,9 @@ public class TestTableOperationDispatcher extends 
TestOperationDispatcher {
           
TestColumn.builder().withName("col2").withType(Types.StringType.get()).build()
         };
 
+    schemaOperationDispatcher.createSchema(
+        NameIdentifier.of(tableIdent.namespace().levels()), "comment", props);
+
     tableOperationDispatcher.createTable(tableIdent, columns, "comment", 
props, new Transform[0]);
 
     boolean dropped = tableOperationDispatcher.dropTable(tableIdent);
@@ -319,4 +325,24 @@ public class TestTableOperationDispatcher extends 
TestOperationDispatcher {
     Assertions.assertThrows(
         RuntimeException.class, () -> 
tableOperationDispatcher.dropTable(tableIdent));
   }
+
+  @Test
+  public void testCreateTableNeedImportingSchema() throws IOException {
+    Namespace tableNs = Namespace.of(metalake, catalog, "schema181");
+    NameIdentifier tableIdent = NameIdentifier.of(tableNs, "topic81");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    TestCatalog testCatalog =
+        (TestCatalog) catalogManager.loadCatalog(NameIdentifier.of(metalake, 
catalog));
+    TestCatalogOperations testCatalogOperations = (TestCatalogOperations) 
testCatalog.ops();
+    testCatalogOperations.createSchema(
+        NameIdentifier.of(tableNs.levels()), "", Collections.emptyMap());
+    Column[] columns =
+        new Column[] {
+          
TestColumn.builder().withName("col1").withType(Types.StringType.get()).build(),
+          
TestColumn.builder().withName("col2").withType(Types.StringType.get()).build()
+        };
+    tableOperationDispatcher.createTable(tableIdent, columns, "comment", 
props);
+    
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(tableNs.levels()), 
SCHEMA));
+    Assertions.assertTrue(entityStore.exists(tableIdent, TABLE));
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
index 27afc5090..ac694883d 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.catalog;
 import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
 import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
 import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
+import static org.apache.gravitino.Entity.EntityType.SCHEMA;
 import static org.apache.gravitino.StringIdentifier.ID_KEY;
 import static org.apache.gravitino.TestBasePropertiesMetadata.COMMENT_KEY;
 import static org.mockito.ArgumentMatchers.any;
@@ -33,6 +34,7 @@ import static org.mockito.Mockito.reset;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.time.Instant;
+import java.util.Collections;
 import java.util.Map;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Config;
@@ -40,7 +42,9 @@ import org.apache.gravitino.Entity;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.TestCatalog;
 import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.connector.TestCatalogOperations;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.messaging.Topic;
@@ -122,25 +126,23 @@ public class TestTopicOperationDispatcher extends 
TestOperationDispatcher {
     // Case 2: Test if the topic entity is not found in the entity store
     reset(entityStore);
     entityStore.delete(topicIdent1, Entity.EntityType.TOPIC);
-    entityStore.delete(NameIdentifier.of(topicNs.levels()), 
Entity.EntityType.SCHEMA);
+    entityStore.delete(NameIdentifier.of(topicNs.levels()), SCHEMA);
     doThrow(new NoSuchEntityException("")).when(entityStore).get(any(), any(), 
any());
     Topic loadedTopic2 = topicOperationDispatcher.loadTopic(topicIdent1);
     // Succeed to import the topic entity
     Assertions.assertTrue(entityStore.exists(topicIdent1, 
Entity.EntityType.TOPIC));
-    Assertions.assertTrue(
-        entityStore.exists(NameIdentifier.of(topicNs.levels()), 
Entity.EntityType.SCHEMA));
+    
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(topicNs.levels()), 
SCHEMA));
     // Audit info is gotten from the catalog, not from the entity store
     Assertions.assertEquals("test", loadedTopic2.auditInfo().creator());
 
     // Case 3: Test if the entity store is failed to get the topic entity
     reset(entityStore);
     entityStore.delete(topicIdent1, Entity.EntityType.TOPIC);
-    entityStore.delete(NameIdentifier.of(topicNs.levels()), 
Entity.EntityType.SCHEMA);
+    entityStore.delete(NameIdentifier.of(topicNs.levels()), SCHEMA);
     doThrow(new IOException()).when(entityStore).get(any(), any(), any());
     Topic loadedTopic3 = topicOperationDispatcher.loadTopic(topicIdent1);
     // Succeed to import the topic entity
-    Assertions.assertTrue(
-        entityStore.exists(NameIdentifier.of(topicNs.levels()), 
Entity.EntityType.SCHEMA));
+    
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(topicNs.levels()), 
SCHEMA));
     Assertions.assertTrue(entityStore.exists(topicIdent1, 
Entity.EntityType.TOPIC));
     // Audit info is gotten from the catalog, not from the entity store
     Assertions.assertEquals("test", loadedTopic3.auditInfo().creator());
@@ -250,4 +252,19 @@ public class TestTopicOperationDispatcher extends 
TestOperationDispatcher {
     Assertions.assertThrows(
         RuntimeException.class, () -> 
topicOperationDispatcher.dropTopic(topicIdent));
   }
+
+  @Test
+  public void testCreateTopicNeedImportingSchema() throws IOException {
+    Namespace topicNs = Namespace.of(metalake, catalog, "schema161");
+    NameIdentifier topicIdent = NameIdentifier.of(topicNs, "topic61");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    TestCatalog testCatalog =
+        (TestCatalog) catalogManager.loadCatalog(NameIdentifier.of(metalake, 
catalog));
+    TestCatalogOperations testCatalogOperations = (TestCatalogOperations) 
testCatalog.ops();
+    testCatalogOperations.createSchema(
+        NameIdentifier.of(topicNs.levels()), "", Collections.emptyMap());
+    topicOperationDispatcher.createTopic(topicIdent, "comment", null, props);
+    
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(topicNs.levels()), 
SCHEMA));
+    Assertions.assertTrue(entityStore.exists(topicIdent, 
Entity.EntityType.TOPIC));
+  }
 }
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
index e8ad56401..d5cf1ffc7 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
@@ -121,19 +121,15 @@ public class TableOperations {
                 NameIdentifierUtil.ofTable(metalake, catalog, schema, 
request.getName());
 
             Table table =
-                TreeLockUtils.doWithTreeLock(
-                    NameIdentifier.of(metalake, catalog, schema),
-                    LockType.WRITE,
-                    () ->
-                        dispatcher.createTable(
-                            ident,
-                            fromDTOs(request.getColumns()),
-                            request.getComment(),
-                            request.getProperties(),
-                            fromDTOs(request.getPartitioning()),
-                            fromDTO(request.getDistribution()),
-                            fromDTOs(request.getSortOrders()),
-                            fromDTOs(request.getIndexes())));
+                dispatcher.createTable(
+                    ident,
+                    fromDTOs(request.getColumns()),
+                    request.getComment(),
+                    request.getProperties(),
+                    fromDTOs(request.getPartitioning()),
+                    fromDTO(request.getDistribution()),
+                    fromDTOs(request.getSortOrders()),
+                    fromDTOs(request.getIndexes()));
             Response response = Utils.ok(new 
TableResponse(DTOConverters.toDTO(table)));
             LOG.info("Table created: {}.{}.{}.{}", metalake, catalog, schema, 
request.getName());
             return response;
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
index 5e32ff6d2..4e9bcd550 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
@@ -119,15 +119,11 @@ public class TopicOperations {
                 NameIdentifierUtil.ofTopic(metalake, catalog, schema, 
request.getName());
 
             Topic topic =
-                TreeLockUtils.doWithTreeLock(
-                    NameIdentifierUtil.ofSchema(metalake, catalog, schema),
-                    LockType.WRITE,
-                    () ->
-                        dispatcher.createTopic(
-                            ident,
-                            request.getComment(),
-                            null /* dataLayout, always null because it's not 
supported yet.*/,
-                            request.getProperties()));
+                dispatcher.createTopic(
+                    ident,
+                    request.getComment(),
+                    null /* dataLayout, always null because it's not supported 
yet.*/,
+                    request.getProperties());
             Response response = Utils.ok(new 
TopicResponse(DTOConverters.toDTO(topic)));
             LOG.info("Topic created: {}.{}.{}.{}", metalake, catalog, schema, 
topic.name());
             return response;

Reply via email to