This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 9a3759f51 [#4812] fix(core): Make ensure the schema exists before 
creating tables and topics  (#4826)
9a3759f51 is described below

commit 9a3759f51dc9e51b05449cacfb08eaebcf05f2b4
Author: roryqi <[email protected]>
AuthorDate: Sat Aug 31 02:23:28 2024 +0800

    [#4812] fix(core): Make ensure the schema exists before creating tables and 
topics  (#4826)
    
    ### What changes were proposed in this pull request? If the schema is
    not created by Gravitino, the Gravitino will lack the metadata in the
    backend storage.
    If we create a table in this schema, the storage won't contain the
    metadata. So it will fail to set owner. Because the storage won't store
    the table. Because the storage won't contain schema id.
    
    This won't bring too much performance cost. Because loadSchema will use
    read lock after first loading. If we have cache, we could be more quick.
    
    ### Why are the changes needed?
    
    Fix: #4812
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added a ut.
---
 .../catalog/TableOperationDispatcher.java          | 164 ++++++++++++---------
 .../catalog/TopicOperationDispatcher.java          | 120 ++++++++-------
 .../catalog/TestPartitionNormalizeDispatcher.java  |   2 +-
 .../catalog/TestPartitionOperationDispatcher.java  |  22 ++-
 .../catalog/TestTableOperationDispatcher.java      |  26 ++++
 .../catalog/TestTopicOperationDispatcher.java      |  29 +++-
 .../gravitino/server/web/rest/TableOperations.java |  22 ++-
 .../gravitino/server/web/rest/TopicOperations.java |  14 +-
 8 files changed, 248 insertions(+), 151 deletions(-)

diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index 404af695a..9794a644e 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -143,74 +143,25 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
       SortOrder[] sortOrders,
       Index[] indexes)
       throws NoSuchSchemaException, TableAlreadyExistsException {
-    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
-    doWithCatalog(
-        catalogIdent,
-        c ->
-            c.doWithPropertiesMeta(
-                p -> {
-                  validatePropertyForCreate(p.tablePropertiesMetadata(), 
properties);
-                  return null;
-                }),
-        IllegalArgumentException.class);
-    long uid = idGenerator.nextId();
-    // Add StringIdentifier to the properties, the specific catalog will 
handle this
-    // StringIdentifier to make sure only when the operation is successful, 
the related
-    // TableEntity will be visible.
-    StringIdentifier stringId = StringIdentifier.fromId(uid);
-    Map<String, String> updatedProperties =
-        StringIdentifier.newPropertiesWithId(stringId, properties);
-
-    doWithCatalog(
-        catalogIdent,
-        c ->
-            c.doWithTableOps(
-                t ->
-                    t.createTable(
-                        ident,
-                        columns,
-                        comment,
-                        updatedProperties,
-                        partitions == null ? EMPTY_TRANSFORM : partitions,
-                        distribution == null ? Distributions.NONE : 
distribution,
-                        sortOrders == null ? new SortOrder[0] : sortOrders,
-                        indexes == null ? Indexes.EMPTY_INDEXES : indexes)),
-        NoSuchSchemaException.class,
-        TableAlreadyExistsException.class);
-
-    // Retrieve the Table again to obtain some values generated by underlying 
catalog
-    Table table =
-        doWithCatalog(
-            catalogIdent,
-            c -> c.doWithTableOps(t -> t.loadTable(ident)),
-            NoSuchTableException.class);
-
-    TableEntity tableEntity =
-        TableEntity.builder()
-            .withId(uid)
-            .withName(ident.name())
-            .withNamespace(ident.namespace())
-            .withAuditInfo(
-                AuditInfo.builder()
-                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
-                    .withCreateTime(Instant.now())
-                    .build())
-            .build();
-
-    try {
-      store.put(tableEntity, true /* overwrite */);
-    } catch (Exception e) {
-      LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
-      return EntityCombinedTable.of(table)
-          .withHiddenPropertiesSet(
-              getHiddenPropertyNames(
-                  catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, 
table.properties()));
-    }
 
-    return EntityCombinedTable.of(table, tableEntity)
-        .withHiddenPropertiesSet(
-            getHiddenPropertyNames(
-                catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, 
table.properties()));
+    // Load the schema to make sure the schema exists.
+    SchemaDispatcher schemaDispatcher = 
GravitinoEnv.getInstance().schemaDispatcher();
+    NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+    schemaDispatcher.loadSchema(schemaIdent);
+
+    return TreeLockUtils.doWithTreeLock(
+        NameIdentifier.of(ident.namespace().levels()),
+        LockType.WRITE,
+        () ->
+            internalCreateTable(
+                ident,
+                columns,
+                comment,
+                properties,
+                partitions,
+                distribution,
+                sortOrders,
+                indexes));
   }
 
   /**
@@ -476,4 +427,83 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
                 table.properties()))
         .withImported(tableEntity != null);
   }
+
+  private Table internalCreateTable(
+      NameIdentifier ident,
+      Column[] columns,
+      String comment,
+      Map<String, String> properties,
+      Transform[] partitions,
+      Distribution distribution,
+      SortOrder[] sortOrders,
+      Index[] indexes) {
+    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+    doWithCatalog(
+        catalogIdent,
+        c ->
+            c.doWithPropertiesMeta(
+                p -> {
+                  validatePropertyForCreate(p.tablePropertiesMetadata(), 
properties);
+                  return null;
+                }),
+        IllegalArgumentException.class);
+    long uid = idGenerator.nextId();
+    // Add StringIdentifier to the properties, the specific catalog will 
handle this
+    // StringIdentifier to make sure only when the operation is successful, 
the related
+    // TableEntity will be visible.
+    StringIdentifier stringId = StringIdentifier.fromId(uid);
+    Map<String, String> updatedProperties =
+        StringIdentifier.newPropertiesWithId(stringId, properties);
+
+    doWithCatalog(
+        catalogIdent,
+        c ->
+            c.doWithTableOps(
+                t ->
+                    t.createTable(
+                        ident,
+                        columns,
+                        comment,
+                        updatedProperties,
+                        partitions == null ? EMPTY_TRANSFORM : partitions,
+                        distribution == null ? Distributions.NONE : 
distribution,
+                        sortOrders == null ? new SortOrder[0] : sortOrders,
+                        indexes == null ? Indexes.EMPTY_INDEXES : indexes)),
+        NoSuchSchemaException.class,
+        TableAlreadyExistsException.class);
+
+    TableEntity tableEntity =
+        TableEntity.builder()
+            .withId(uid)
+            .withName(ident.name())
+            .withNamespace(ident.namespace())
+            .withAuditInfo(
+                AuditInfo.builder()
+                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                    .withCreateTime(Instant.now())
+                    .build())
+            .build();
+
+    // Retrieve the Table again to obtain some values generated by underlying 
catalog
+    Table table =
+        doWithCatalog(
+            catalogIdent,
+            c -> c.doWithTableOps(t -> t.loadTable(ident)),
+            NoSuchTableException.class);
+
+    try {
+      store.put(tableEntity, true /* overwrite */);
+    } catch (Exception e) {
+      LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
+      return EntityCombinedTable.of(table)
+          .withHiddenPropertiesSet(
+              getHiddenPropertyNames(
+                  catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, 
table.properties()));
+    }
+
+    return EntityCombinedTable.of(table, tableEntity)
+        .withHiddenPropertiesSet(
+            getHiddenPropertyNames(
+                catalogIdent, HasPropertyMetadata::tablePropertiesMetadata, 
table.properties()));
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
index 981d999f9..a84e3b567 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
@@ -128,61 +128,16 @@ public class TopicOperationDispatcher extends 
OperationDispatcher implements Top
   public Topic createTopic(
       NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, 
String> properties)
       throws NoSuchSchemaException, TopicAlreadyExistsException {
-    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
-    doWithCatalog(
-        catalogIdent,
-        c ->
-            c.doWithPropertiesMeta(
-                p -> {
-                  validatePropertyForCreate(p.topicPropertiesMetadata(), 
properties);
-                  return null;
-                }),
-        IllegalArgumentException.class);
-    Long uid = idGenerator.nextId();
-    StringIdentifier stringId = StringIdentifier.fromId(uid);
-    Map<String, String> updatedProperties =
-        StringIdentifier.newPropertiesWithId(stringId, properties);
 
-    doWithCatalog(
-        catalogIdent,
-        c -> c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout, 
updatedProperties)),
-        NoSuchSchemaException.class,
-        TopicAlreadyExistsException.class);
-
-    // Retrieve the Topic again to obtain some values generated by underlying 
catalog
-    Topic topic =
-        doWithCatalog(
-            catalogIdent,
-            c -> c.doWithTopicOps(t -> t.loadTopic(ident)),
-            NoSuchTopicException.class);
+    // Load the schema to make sure the schema exists.
+    SchemaDispatcher schemaDispatcher = 
GravitinoEnv.getInstance().schemaDispatcher();
+    NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+    schemaDispatcher.loadSchema(schemaIdent);
 
-    TopicEntity topicEntity =
-        TopicEntity.builder()
-            .withId(fromProperties(topic.properties()).id())
-            .withName(ident.name())
-            .withComment(comment)
-            .withNamespace(ident.namespace())
-            .withAuditInfo(
-                AuditInfo.builder()
-                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
-                    .withCreateTime(Instant.now())
-                    .build())
-            .build();
-
-    try {
-      store.put(topicEntity, true /* overwrite */);
-    } catch (Exception e) {
-      LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, 
"put", ident, e);
-      return EntityCombinedTopic.of(topic)
-          .withHiddenPropertiesSet(
-              getHiddenPropertyNames(
-                  catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, 
topic.properties()));
-    }
-
-    return EntityCombinedTopic.of(topic, topicEntity)
-        .withHiddenPropertiesSet(
-            getHiddenPropertyNames(
-                catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, 
topic.properties()));
+    return TreeLockUtils.doWithTreeLock(
+        NameIdentifier.of(ident.namespace().levels()),
+        LockType.WRITE,
+        () -> internalCreateTopic(ident, comment, dataLayout, properties));
   }
 
   /**
@@ -374,4 +329,63 @@ public class TopicOperationDispatcher extends 
OperationDispatcher implements Top
                 catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, 
topic.properties()))
         .withImported(topicEntity != null);
   }
+
+  private Topic internalCreateTopic(
+      NameIdentifier ident, String comment, DataLayout dataLayout, Map<String, 
String> properties) {
+    NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+    doWithCatalog(
+        catalogIdent,
+        c ->
+            c.doWithPropertiesMeta(
+                p -> {
+                  validatePropertyForCreate(p.topicPropertiesMetadata(), 
properties);
+                  return null;
+                }),
+        IllegalArgumentException.class);
+    Long uid = idGenerator.nextId();
+    StringIdentifier stringId = StringIdentifier.fromId(uid);
+    Map<String, String> updatedProperties =
+        StringIdentifier.newPropertiesWithId(stringId, properties);
+
+    doWithCatalog(
+        catalogIdent,
+        c -> c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout, 
updatedProperties)),
+        NoSuchSchemaException.class,
+        TopicAlreadyExistsException.class);
+
+    // Retrieve the Topic again to obtain some values generated by underlying 
catalog
+    Topic topic =
+        doWithCatalog(
+            catalogIdent,
+            c -> c.doWithTopicOps(t -> t.loadTopic(ident)),
+            NoSuchTopicException.class);
+
+    TopicEntity topicEntity =
+        TopicEntity.builder()
+            .withId(fromProperties(topic.properties()).id())
+            .withName(ident.name())
+            .withComment(comment)
+            .withNamespace(ident.namespace())
+            .withAuditInfo(
+                AuditInfo.builder()
+                    
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+                    .withCreateTime(Instant.now())
+                    .build())
+            .build();
+
+    try {
+      store.put(topicEntity, true /* overwrite */);
+    } catch (Exception e) {
+      LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE, 
"put", ident, e);
+      return EntityCombinedTopic.of(topic)
+          .withHiddenPropertiesSet(
+              getHiddenPropertyNames(
+                  catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, 
topic.properties()));
+    }
+
+    return EntityCombinedTopic.of(topic, topicEntity)
+        .withHiddenPropertiesSet(
+            getHiddenPropertyNames(
+                catalogIdent, HasPropertyMetadata::topicPropertiesMetadata, 
topic.properties()));
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
index 2c47d69bc..0151dcf24 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
@@ -39,7 +39,7 @@ public class TestPartitionNormalizeDispatcher extends 
TestOperationDispatcher {
       NameIdentifierUtil.ofTable(metalake, catalog, SCHEMA, 
"TEST_PARTITION_NORMALIZE_TABLE");
 
   @BeforeAll
-  public static void initialize() {
+  public static void initialize() throws IllegalAccessException {
     TestPartitionOperationDispatcher.prepareTable();
     partitionNormalizeDispatcher =
         new PartitionNormalizeDispatcher(
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java
index 3d81ee557..9ddc3b1d3 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java
@@ -18,9 +18,19 @@
  */
 package org.apache.gravitino.catalog;
 
+import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
+import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
+import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
 import com.google.common.collect.Maps;
 import java.util.Arrays;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.rel.Column;
 import org.apache.gravitino.rel.expressions.literals.Literal;
 import org.apache.gravitino.rel.expressions.literals.Literals;
@@ -50,7 +60,7 @@ public class TestPartitionOperationDispatcher extends 
TestOperationDispatcher {
           Maps.newHashMap());
 
   @BeforeAll
-  public static void initialize() {
+  public static void initialize() throws IllegalAccessException {
     prepareTable();
     partitionOperationDispatcher.addPartition(TABLE_IDENT, PARTITION);
 
@@ -66,7 +76,7 @@ public class TestPartitionOperationDispatcher extends 
TestOperationDispatcher {
         "Custom class loader is not used");
   }
 
-  protected static void prepareTable() {
+  protected static void prepareTable() throws IllegalAccessException {
     schemaOperationDispatcher =
         new SchemaOperationDispatcher(catalogManager, entityStore, 
idGenerator);
     tableOperationDispatcher =
@@ -74,6 +84,14 @@ public class TestPartitionOperationDispatcher extends 
TestOperationDispatcher {
     partitionOperationDispatcher =
         new PartitionOperationDispatcher(catalogManager, entityStore, 
idGenerator);
 
+    Config config = mock(Config.class);
+    doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
+    doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
+    doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+    FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new 
LockManager(config), true);
+    FieldUtils.writeField(
+        GravitinoEnv.getInstance(), "schemaDispatcher", 
schemaOperationDispatcher, true);
+
     NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema(metalake, 
catalog, SCHEMA);
     schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
     Column[] columns =
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
index f5f69b77a..1dd31fd33 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
@@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -44,8 +45,10 @@ import org.apache.gravitino.Config;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.TestCatalog;
 import org.apache.gravitino.TestColumn;
 import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.connector.TestCatalogOperations;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.meta.AuditInfo;
@@ -306,6 +309,9 @@ public class TestTableOperationDispatcher extends 
TestOperationDispatcher {
           
TestColumn.builder().withName("col2").withType(Types.StringType.get()).build()
         };
 
+    schemaOperationDispatcher.createSchema(
+        NameIdentifier.of(tableIdent.namespace().levels()), "comment", props);
+
     tableOperationDispatcher.createTable(tableIdent, columns, "comment", 
props, new Transform[0]);
 
     boolean dropped = tableOperationDispatcher.dropTable(tableIdent);
@@ -319,4 +325,24 @@ public class TestTableOperationDispatcher extends 
TestOperationDispatcher {
     Assertions.assertThrows(
         RuntimeException.class, () -> 
tableOperationDispatcher.dropTable(tableIdent));
   }
+
+  @Test
+  public void testCreateTableNeedImportingSchema() throws IOException {
+    Namespace tableNs = Namespace.of(metalake, catalog, "schema181");
+    NameIdentifier tableIdent = NameIdentifier.of(tableNs, "topic81");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    TestCatalog testCatalog =
+        (TestCatalog) catalogManager.loadCatalog(NameIdentifier.of(metalake, 
catalog));
+    TestCatalogOperations testCatalogOperations = (TestCatalogOperations) 
testCatalog.ops();
+    testCatalogOperations.createSchema(
+        NameIdentifier.of(tableNs.levels()), "", Collections.emptyMap());
+    Column[] columns =
+        new Column[] {
+          
TestColumn.builder().withName("col1").withType(Types.StringType.get()).build(),
+          
TestColumn.builder().withName("col2").withType(Types.StringType.get()).build()
+        };
+    tableOperationDispatcher.createTable(tableIdent, columns, "comment", 
props);
+    
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(tableNs.levels()), 
SCHEMA));
+    Assertions.assertTrue(entityStore.exists(tableIdent, TABLE));
+  }
 }
diff --git 
a/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
 
b/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
index 27afc5090..ac694883d 100644
--- 
a/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
+++ 
b/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.catalog;
 import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
 import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
 import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
+import static org.apache.gravitino.Entity.EntityType.SCHEMA;
 import static org.apache.gravitino.StringIdentifier.ID_KEY;
 import static org.apache.gravitino.TestBasePropertiesMetadata.COMMENT_KEY;
 import static org.mockito.ArgumentMatchers.any;
@@ -33,6 +34,7 @@ import static org.mockito.Mockito.reset;
 import com.google.common.collect.ImmutableMap;
 import java.io.IOException;
 import java.time.Instant;
+import java.util.Collections;
 import java.util.Map;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Config;
@@ -40,7 +42,9 @@ import org.apache.gravitino.Entity;
 import org.apache.gravitino.GravitinoEnv;
 import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Namespace;
+import org.apache.gravitino.TestCatalog;
 import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.connector.TestCatalogOperations;
 import org.apache.gravitino.exceptions.NoSuchEntityException;
 import org.apache.gravitino.lock.LockManager;
 import org.apache.gravitino.messaging.Topic;
@@ -122,25 +126,23 @@ public class TestTopicOperationDispatcher extends 
TestOperationDispatcher {
     // Case 2: Test if the topic entity is not found in the entity store
     reset(entityStore);
     entityStore.delete(topicIdent1, Entity.EntityType.TOPIC);
-    entityStore.delete(NameIdentifier.of(topicNs.levels()), 
Entity.EntityType.SCHEMA);
+    entityStore.delete(NameIdentifier.of(topicNs.levels()), SCHEMA);
     doThrow(new NoSuchEntityException("")).when(entityStore).get(any(), any(), 
any());
     Topic loadedTopic2 = topicOperationDispatcher.loadTopic(topicIdent1);
     // Succeed to import the topic entity
     Assertions.assertTrue(entityStore.exists(topicIdent1, 
Entity.EntityType.TOPIC));
-    Assertions.assertTrue(
-        entityStore.exists(NameIdentifier.of(topicNs.levels()), 
Entity.EntityType.SCHEMA));
+    
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(topicNs.levels()), 
SCHEMA));
     // Audit info is gotten from the catalog, not from the entity store
     Assertions.assertEquals("test", loadedTopic2.auditInfo().creator());
 
     // Case 3: Test if the entity store is failed to get the topic entity
     reset(entityStore);
     entityStore.delete(topicIdent1, Entity.EntityType.TOPIC);
-    entityStore.delete(NameIdentifier.of(topicNs.levels()), 
Entity.EntityType.SCHEMA);
+    entityStore.delete(NameIdentifier.of(topicNs.levels()), SCHEMA);
     doThrow(new IOException()).when(entityStore).get(any(), any(), any());
     Topic loadedTopic3 = topicOperationDispatcher.loadTopic(topicIdent1);
     // Succeed to import the topic entity
-    Assertions.assertTrue(
-        entityStore.exists(NameIdentifier.of(topicNs.levels()), 
Entity.EntityType.SCHEMA));
+    
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(topicNs.levels()), 
SCHEMA));
     Assertions.assertTrue(entityStore.exists(topicIdent1, 
Entity.EntityType.TOPIC));
     // Audit info is gotten from the catalog, not from the entity store
     Assertions.assertEquals("test", loadedTopic3.auditInfo().creator());
@@ -250,4 +252,19 @@ public class TestTopicOperationDispatcher extends 
TestOperationDispatcher {
     Assertions.assertThrows(
         RuntimeException.class, () -> 
topicOperationDispatcher.dropTopic(topicIdent));
   }
+
+  @Test
+  public void testCreateTopicNeedImportingSchema() throws IOException {
+    Namespace topicNs = Namespace.of(metalake, catalog, "schema161");
+    NameIdentifier topicIdent = NameIdentifier.of(topicNs, "topic61");
+    Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+    TestCatalog testCatalog =
+        (TestCatalog) catalogManager.loadCatalog(NameIdentifier.of(metalake, 
catalog));
+    TestCatalogOperations testCatalogOperations = (TestCatalogOperations) 
testCatalog.ops();
+    testCatalogOperations.createSchema(
+        NameIdentifier.of(topicNs.levels()), "", Collections.emptyMap());
+    topicOperationDispatcher.createTopic(topicIdent, "comment", null, props);
+    
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(topicNs.levels()), 
SCHEMA));
+    Assertions.assertTrue(entityStore.exists(topicIdent, 
Entity.EntityType.TOPIC));
+  }
 }
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
index e8ad56401..d5cf1ffc7 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
@@ -121,19 +121,15 @@ public class TableOperations {
                 NameIdentifierUtil.ofTable(metalake, catalog, schema, 
request.getName());
 
             Table table =
-                TreeLockUtils.doWithTreeLock(
-                    NameIdentifier.of(metalake, catalog, schema),
-                    LockType.WRITE,
-                    () ->
-                        dispatcher.createTable(
-                            ident,
-                            fromDTOs(request.getColumns()),
-                            request.getComment(),
-                            request.getProperties(),
-                            fromDTOs(request.getPartitioning()),
-                            fromDTO(request.getDistribution()),
-                            fromDTOs(request.getSortOrders()),
-                            fromDTOs(request.getIndexes())));
+                dispatcher.createTable(
+                    ident,
+                    fromDTOs(request.getColumns()),
+                    request.getComment(),
+                    request.getProperties(),
+                    fromDTOs(request.getPartitioning()),
+                    fromDTO(request.getDistribution()),
+                    fromDTOs(request.getSortOrders()),
+                    fromDTOs(request.getIndexes()));
             Response response = Utils.ok(new 
TableResponse(DTOConverters.toDTO(table)));
             LOG.info("Table created: {}.{}.{}.{}", metalake, catalog, schema, 
request.getName());
             return response;
diff --git 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
index 5e32ff6d2..4e9bcd550 100644
--- 
a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
+++ 
b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
@@ -119,15 +119,11 @@ public class TopicOperations {
                 NameIdentifierUtil.ofTopic(metalake, catalog, schema, 
request.getName());
 
             Topic topic =
-                TreeLockUtils.doWithTreeLock(
-                    NameIdentifierUtil.ofSchema(metalake, catalog, schema),
-                    LockType.WRITE,
-                    () ->
-                        dispatcher.createTopic(
-                            ident,
-                            request.getComment(),
-                            null /* dataLayout, always null because it's not 
supported yet.*/,
-                            request.getProperties()));
+                dispatcher.createTopic(
+                    ident,
+                    request.getComment(),
+                    null /* dataLayout, always null because it's not supported 
yet.*/,
+                    request.getProperties());
             Response response = Utils.ok(new 
TopicResponse(DTOConverters.toDTO(topic)));
             LOG.info("Topic created: {}.{}.{}.{}", metalake, catalog, schema, 
topic.name());
             return response;

Reply via email to