This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 9a3759f51 [#4812] fix(core): Make ensure the schema exists before
creating tables and topics (#4826)
9a3759f51 is described below
commit 9a3759f51dc9e51b05449cacfb08eaebcf05f2b4
Author: roryqi <[email protected]>
AuthorDate: Sat Aug 31 02:23:28 2024 +0800
[#4812] fix(core): Make ensure the schema exists before creating tables and
topics (#4826)
### What changes were proposed in this pull request? If the schema is
not created by Gravitino, the Gravitino will lack the metadata in the
backend storage.
If we create a table in this schema, the storage won't contain the
metadata. So it will fail to set owner. Because the storage won't store
the table. Because the storage won't contain schema id.
This won't bring too much performance cost. Because loadSchema will use
read lock after first loading. If we have cache, we could be more quick.
### Why are the changes needed?
Fix: #4812
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a ut.
---
.../catalog/TableOperationDispatcher.java | 164 ++++++++++++---------
.../catalog/TopicOperationDispatcher.java | 120 ++++++++-------
.../catalog/TestPartitionNormalizeDispatcher.java | 2 +-
.../catalog/TestPartitionOperationDispatcher.java | 22 ++-
.../catalog/TestTableOperationDispatcher.java | 26 ++++
.../catalog/TestTopicOperationDispatcher.java | 29 +++-
.../gravitino/server/web/rest/TableOperations.java | 22 ++-
.../gravitino/server/web/rest/TopicOperations.java | 14 +-
8 files changed, 248 insertions(+), 151 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index 404af695a..9794a644e 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -143,74 +143,25 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
- NameIdentifier catalogIdent = getCatalogIdentifier(ident);
- doWithCatalog(
- catalogIdent,
- c ->
- c.doWithPropertiesMeta(
- p -> {
- validatePropertyForCreate(p.tablePropertiesMetadata(),
properties);
- return null;
- }),
- IllegalArgumentException.class);
- long uid = idGenerator.nextId();
- // Add StringIdentifier to the properties, the specific catalog will
handle this
- // StringIdentifier to make sure only when the operation is successful,
the related
- // TableEntity will be visible.
- StringIdentifier stringId = StringIdentifier.fromId(uid);
- Map<String, String> updatedProperties =
- StringIdentifier.newPropertiesWithId(stringId, properties);
-
- doWithCatalog(
- catalogIdent,
- c ->
- c.doWithTableOps(
- t ->
- t.createTable(
- ident,
- columns,
- comment,
- updatedProperties,
- partitions == null ? EMPTY_TRANSFORM : partitions,
- distribution == null ? Distributions.NONE :
distribution,
- sortOrders == null ? new SortOrder[0] : sortOrders,
- indexes == null ? Indexes.EMPTY_INDEXES : indexes)),
- NoSuchSchemaException.class,
- TableAlreadyExistsException.class);
-
- // Retrieve the Table again to obtain some values generated by underlying
catalog
- Table table =
- doWithCatalog(
- catalogIdent,
- c -> c.doWithTableOps(t -> t.loadTable(ident)),
- NoSuchTableException.class);
-
- TableEntity tableEntity =
- TableEntity.builder()
- .withId(uid)
- .withName(ident.name())
- .withNamespace(ident.namespace())
- .withAuditInfo(
- AuditInfo.builder()
-
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
- .withCreateTime(Instant.now())
- .build())
- .build();
-
- try {
- store.put(tableEntity, true /* overwrite */);
- } catch (Exception e) {
- LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
- return EntityCombinedTable.of(table)
- .withHiddenPropertiesSet(
- getHiddenPropertyNames(
- catalogIdent, HasPropertyMetadata::tablePropertiesMetadata,
table.properties()));
- }
- return EntityCombinedTable.of(table, tableEntity)
- .withHiddenPropertiesSet(
- getHiddenPropertyNames(
- catalogIdent, HasPropertyMetadata::tablePropertiesMetadata,
table.properties()));
+ // Load the schema to make sure the schema exists.
+ SchemaDispatcher schemaDispatcher =
GravitinoEnv.getInstance().schemaDispatcher();
+ NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+ schemaDispatcher.loadSchema(schemaIdent);
+
+ return TreeLockUtils.doWithTreeLock(
+ NameIdentifier.of(ident.namespace().levels()),
+ LockType.WRITE,
+ () ->
+ internalCreateTable(
+ ident,
+ columns,
+ comment,
+ properties,
+ partitions,
+ distribution,
+ sortOrders,
+ indexes));
}
/**
@@ -476,4 +427,83 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
table.properties()))
.withImported(tableEntity != null);
}
+
+ private Table internalCreateTable(
+ NameIdentifier ident,
+ Column[] columns,
+ String comment,
+ Map<String, String> properties,
+ Transform[] partitions,
+ Distribution distribution,
+ SortOrder[] sortOrders,
+ Index[] indexes) {
+ NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+ doWithCatalog(
+ catalogIdent,
+ c ->
+ c.doWithPropertiesMeta(
+ p -> {
+ validatePropertyForCreate(p.tablePropertiesMetadata(),
properties);
+ return null;
+ }),
+ IllegalArgumentException.class);
+ long uid = idGenerator.nextId();
+ // Add StringIdentifier to the properties, the specific catalog will
handle this
+ // StringIdentifier to make sure only when the operation is successful,
the related
+ // TableEntity will be visible.
+ StringIdentifier stringId = StringIdentifier.fromId(uid);
+ Map<String, String> updatedProperties =
+ StringIdentifier.newPropertiesWithId(stringId, properties);
+
+ doWithCatalog(
+ catalogIdent,
+ c ->
+ c.doWithTableOps(
+ t ->
+ t.createTable(
+ ident,
+ columns,
+ comment,
+ updatedProperties,
+ partitions == null ? EMPTY_TRANSFORM : partitions,
+ distribution == null ? Distributions.NONE :
distribution,
+ sortOrders == null ? new SortOrder[0] : sortOrders,
+ indexes == null ? Indexes.EMPTY_INDEXES : indexes)),
+ NoSuchSchemaException.class,
+ TableAlreadyExistsException.class);
+
+ TableEntity tableEntity =
+ TableEntity.builder()
+ .withId(uid)
+ .withName(ident.name())
+ .withNamespace(ident.namespace())
+ .withAuditInfo(
+ AuditInfo.builder()
+
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+ .withCreateTime(Instant.now())
+ .build())
+ .build();
+
+ // Retrieve the Table again to obtain some values generated by underlying
catalog
+ Table table =
+ doWithCatalog(
+ catalogIdent,
+ c -> c.doWithTableOps(t -> t.loadTable(ident)),
+ NoSuchTableException.class);
+
+ try {
+ store.put(tableEntity, true /* overwrite */);
+ } catch (Exception e) {
+ LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
+ return EntityCombinedTable.of(table)
+ .withHiddenPropertiesSet(
+ getHiddenPropertyNames(
+ catalogIdent, HasPropertyMetadata::tablePropertiesMetadata,
table.properties()));
+ }
+
+ return EntityCombinedTable.of(table, tableEntity)
+ .withHiddenPropertiesSet(
+ getHiddenPropertyNames(
+ catalogIdent, HasPropertyMetadata::tablePropertiesMetadata,
table.properties()));
+ }
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
index 981d999f9..a84e3b567 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
@@ -128,61 +128,16 @@ public class TopicOperationDispatcher extends
OperationDispatcher implements Top
public Topic createTopic(
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String,
String> properties)
throws NoSuchSchemaException, TopicAlreadyExistsException {
- NameIdentifier catalogIdent = getCatalogIdentifier(ident);
- doWithCatalog(
- catalogIdent,
- c ->
- c.doWithPropertiesMeta(
- p -> {
- validatePropertyForCreate(p.topicPropertiesMetadata(),
properties);
- return null;
- }),
- IllegalArgumentException.class);
- Long uid = idGenerator.nextId();
- StringIdentifier stringId = StringIdentifier.fromId(uid);
- Map<String, String> updatedProperties =
- StringIdentifier.newPropertiesWithId(stringId, properties);
- doWithCatalog(
- catalogIdent,
- c -> c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout,
updatedProperties)),
- NoSuchSchemaException.class,
- TopicAlreadyExistsException.class);
-
- // Retrieve the Topic again to obtain some values generated by underlying
catalog
- Topic topic =
- doWithCatalog(
- catalogIdent,
- c -> c.doWithTopicOps(t -> t.loadTopic(ident)),
- NoSuchTopicException.class);
+ // Load the schema to make sure the schema exists.
+ SchemaDispatcher schemaDispatcher =
GravitinoEnv.getInstance().schemaDispatcher();
+ NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+ schemaDispatcher.loadSchema(schemaIdent);
- TopicEntity topicEntity =
- TopicEntity.builder()
- .withId(fromProperties(topic.properties()).id())
- .withName(ident.name())
- .withComment(comment)
- .withNamespace(ident.namespace())
- .withAuditInfo(
- AuditInfo.builder()
-
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
- .withCreateTime(Instant.now())
- .build())
- .build();
-
- try {
- store.put(topicEntity, true /* overwrite */);
- } catch (Exception e) {
- LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE,
"put", ident, e);
- return EntityCombinedTopic.of(topic)
- .withHiddenPropertiesSet(
- getHiddenPropertyNames(
- catalogIdent, HasPropertyMetadata::topicPropertiesMetadata,
topic.properties()));
- }
-
- return EntityCombinedTopic.of(topic, topicEntity)
- .withHiddenPropertiesSet(
- getHiddenPropertyNames(
- catalogIdent, HasPropertyMetadata::topicPropertiesMetadata,
topic.properties()));
+ return TreeLockUtils.doWithTreeLock(
+ NameIdentifier.of(ident.namespace().levels()),
+ LockType.WRITE,
+ () -> internalCreateTopic(ident, comment, dataLayout, properties));
}
/**
@@ -374,4 +329,63 @@ public class TopicOperationDispatcher extends
OperationDispatcher implements Top
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata,
topic.properties()))
.withImported(topicEntity != null);
}
+
+ private Topic internalCreateTopic(
+ NameIdentifier ident, String comment, DataLayout dataLayout, Map<String,
String> properties) {
+ NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+ doWithCatalog(
+ catalogIdent,
+ c ->
+ c.doWithPropertiesMeta(
+ p -> {
+ validatePropertyForCreate(p.topicPropertiesMetadata(),
properties);
+ return null;
+ }),
+ IllegalArgumentException.class);
+ Long uid = idGenerator.nextId();
+ StringIdentifier stringId = StringIdentifier.fromId(uid);
+ Map<String, String> updatedProperties =
+ StringIdentifier.newPropertiesWithId(stringId, properties);
+
+ doWithCatalog(
+ catalogIdent,
+ c -> c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout,
updatedProperties)),
+ NoSuchSchemaException.class,
+ TopicAlreadyExistsException.class);
+
+ // Retrieve the Topic again to obtain some values generated by underlying
catalog
+ Topic topic =
+ doWithCatalog(
+ catalogIdent,
+ c -> c.doWithTopicOps(t -> t.loadTopic(ident)),
+ NoSuchTopicException.class);
+
+ TopicEntity topicEntity =
+ TopicEntity.builder()
+ .withId(fromProperties(topic.properties()).id())
+ .withName(ident.name())
+ .withComment(comment)
+ .withNamespace(ident.namespace())
+ .withAuditInfo(
+ AuditInfo.builder()
+
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+ .withCreateTime(Instant.now())
+ .build())
+ .build();
+
+ try {
+ store.put(topicEntity, true /* overwrite */);
+ } catch (Exception e) {
+ LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE,
"put", ident, e);
+ return EntityCombinedTopic.of(topic)
+ .withHiddenPropertiesSet(
+ getHiddenPropertyNames(
+ catalogIdent, HasPropertyMetadata::topicPropertiesMetadata,
topic.properties()));
+ }
+
+ return EntityCombinedTopic.of(topic, topicEntity)
+ .withHiddenPropertiesSet(
+ getHiddenPropertyNames(
+ catalogIdent, HasPropertyMetadata::topicPropertiesMetadata,
topic.properties()));
+ }
}
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
index 2c47d69bc..0151dcf24 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
@@ -39,7 +39,7 @@ public class TestPartitionNormalizeDispatcher extends
TestOperationDispatcher {
NameIdentifierUtil.ofTable(metalake, catalog, SCHEMA,
"TEST_PARTITION_NORMALIZE_TABLE");
@BeforeAll
- public static void initialize() {
+ public static void initialize() throws IllegalAccessException {
TestPartitionOperationDispatcher.prepareTable();
partitionNormalizeDispatcher =
new PartitionNormalizeDispatcher(
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java
index 3d81ee557..9ddc3b1d3 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java
@@ -18,9 +18,19 @@
*/
package org.apache.gravitino.catalog;
+import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
+import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
+import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
import com.google.common.collect.Maps;
import java.util.Arrays;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.lock.LockManager;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.expressions.literals.Literal;
import org.apache.gravitino.rel.expressions.literals.Literals;
@@ -50,7 +60,7 @@ public class TestPartitionOperationDispatcher extends
TestOperationDispatcher {
Maps.newHashMap());
@BeforeAll
- public static void initialize() {
+ public static void initialize() throws IllegalAccessException {
prepareTable();
partitionOperationDispatcher.addPartition(TABLE_IDENT, PARTITION);
@@ -66,7 +76,7 @@ public class TestPartitionOperationDispatcher extends
TestOperationDispatcher {
"Custom class loader is not used");
}
- protected static void prepareTable() {
+ protected static void prepareTable() throws IllegalAccessException {
schemaOperationDispatcher =
new SchemaOperationDispatcher(catalogManager, entityStore,
idGenerator);
tableOperationDispatcher =
@@ -74,6 +84,14 @@ public class TestPartitionOperationDispatcher extends
TestOperationDispatcher {
partitionOperationDispatcher =
new PartitionOperationDispatcher(catalogManager, entityStore,
idGenerator);
+ Config config = mock(Config.class);
+ doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
+ doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
+ doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new
LockManager(config), true);
+ FieldUtils.writeField(
+ GravitinoEnv.getInstance(), "schemaDispatcher",
schemaOperationDispatcher, true);
+
NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema(metalake,
catalog, SCHEMA);
schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
Column[] columns =
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
index f5f69b77a..1dd31fd33 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
@@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -44,8 +45,10 @@ import org.apache.gravitino.Config;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
+import org.apache.gravitino.TestCatalog;
import org.apache.gravitino.TestColumn;
import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.connector.TestCatalogOperations;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.lock.LockManager;
import org.apache.gravitino.meta.AuditInfo;
@@ -306,6 +309,9 @@ public class TestTableOperationDispatcher extends
TestOperationDispatcher {
TestColumn.builder().withName("col2").withType(Types.StringType.get()).build()
};
+ schemaOperationDispatcher.createSchema(
+ NameIdentifier.of(tableIdent.namespace().levels()), "comment", props);
+
tableOperationDispatcher.createTable(tableIdent, columns, "comment",
props, new Transform[0]);
boolean dropped = tableOperationDispatcher.dropTable(tableIdent);
@@ -319,4 +325,24 @@ public class TestTableOperationDispatcher extends
TestOperationDispatcher {
Assertions.assertThrows(
RuntimeException.class, () ->
tableOperationDispatcher.dropTable(tableIdent));
}
+
+ @Test
+ public void testCreateTableNeedImportingSchema() throws IOException {
+ Namespace tableNs = Namespace.of(metalake, catalog, "schema181");
+ NameIdentifier tableIdent = NameIdentifier.of(tableNs, "topic81");
+ Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+ TestCatalog testCatalog =
+ (TestCatalog) catalogManager.loadCatalog(NameIdentifier.of(metalake,
catalog));
+ TestCatalogOperations testCatalogOperations = (TestCatalogOperations)
testCatalog.ops();
+ testCatalogOperations.createSchema(
+ NameIdentifier.of(tableNs.levels()), "", Collections.emptyMap());
+ Column[] columns =
+ new Column[] {
+
TestColumn.builder().withName("col1").withType(Types.StringType.get()).build(),
+
TestColumn.builder().withName("col2").withType(Types.StringType.get()).build()
+ };
+ tableOperationDispatcher.createTable(tableIdent, columns, "comment",
props);
+
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(tableNs.levels()),
SCHEMA));
+ Assertions.assertTrue(entityStore.exists(tableIdent, TABLE));
+ }
}
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
index 27afc5090..ac694883d 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.catalog;
import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
+import static org.apache.gravitino.Entity.EntityType.SCHEMA;
import static org.apache.gravitino.StringIdentifier.ID_KEY;
import static org.apache.gravitino.TestBasePropertiesMetadata.COMMENT_KEY;
import static org.mockito.ArgumentMatchers.any;
@@ -33,6 +34,7 @@ import static org.mockito.Mockito.reset;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.time.Instant;
+import java.util.Collections;
import java.util.Map;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.Config;
@@ -40,7 +42,9 @@ import org.apache.gravitino.Entity;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
+import org.apache.gravitino.TestCatalog;
import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.connector.TestCatalogOperations;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.lock.LockManager;
import org.apache.gravitino.messaging.Topic;
@@ -122,25 +126,23 @@ public class TestTopicOperationDispatcher extends
TestOperationDispatcher {
// Case 2: Test if the topic entity is not found in the entity store
reset(entityStore);
entityStore.delete(topicIdent1, Entity.EntityType.TOPIC);
- entityStore.delete(NameIdentifier.of(topicNs.levels()),
Entity.EntityType.SCHEMA);
+ entityStore.delete(NameIdentifier.of(topicNs.levels()), SCHEMA);
doThrow(new NoSuchEntityException("")).when(entityStore).get(any(), any(),
any());
Topic loadedTopic2 = topicOperationDispatcher.loadTopic(topicIdent1);
// Succeed to import the topic entity
Assertions.assertTrue(entityStore.exists(topicIdent1,
Entity.EntityType.TOPIC));
- Assertions.assertTrue(
- entityStore.exists(NameIdentifier.of(topicNs.levels()),
Entity.EntityType.SCHEMA));
+
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(topicNs.levels()),
SCHEMA));
// Audit info is gotten from the catalog, not from the entity store
Assertions.assertEquals("test", loadedTopic2.auditInfo().creator());
// Case 3: Test if the entity store is failed to get the topic entity
reset(entityStore);
entityStore.delete(topicIdent1, Entity.EntityType.TOPIC);
- entityStore.delete(NameIdentifier.of(topicNs.levels()),
Entity.EntityType.SCHEMA);
+ entityStore.delete(NameIdentifier.of(topicNs.levels()), SCHEMA);
doThrow(new IOException()).when(entityStore).get(any(), any(), any());
Topic loadedTopic3 = topicOperationDispatcher.loadTopic(topicIdent1);
// Succeed to import the topic entity
- Assertions.assertTrue(
- entityStore.exists(NameIdentifier.of(topicNs.levels()),
Entity.EntityType.SCHEMA));
+
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(topicNs.levels()),
SCHEMA));
Assertions.assertTrue(entityStore.exists(topicIdent1,
Entity.EntityType.TOPIC));
// Audit info is gotten from the catalog, not from the entity store
Assertions.assertEquals("test", loadedTopic3.auditInfo().creator());
@@ -250,4 +252,19 @@ public class TestTopicOperationDispatcher extends
TestOperationDispatcher {
Assertions.assertThrows(
RuntimeException.class, () ->
topicOperationDispatcher.dropTopic(topicIdent));
}
+
+ @Test
+ public void testCreateTopicNeedImportingSchema() throws IOException {
+ Namespace topicNs = Namespace.of(metalake, catalog, "schema161");
+ NameIdentifier topicIdent = NameIdentifier.of(topicNs, "topic61");
+ Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+ TestCatalog testCatalog =
+ (TestCatalog) catalogManager.loadCatalog(NameIdentifier.of(metalake,
catalog));
+ TestCatalogOperations testCatalogOperations = (TestCatalogOperations)
testCatalog.ops();
+ testCatalogOperations.createSchema(
+ NameIdentifier.of(topicNs.levels()), "", Collections.emptyMap());
+ topicOperationDispatcher.createTopic(topicIdent, "comment", null, props);
+
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(topicNs.levels()),
SCHEMA));
+ Assertions.assertTrue(entityStore.exists(topicIdent,
Entity.EntityType.TOPIC));
+ }
}
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
index e8ad56401..d5cf1ffc7 100644
---
a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
+++
b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
@@ -121,19 +121,15 @@ public class TableOperations {
NameIdentifierUtil.ofTable(metalake, catalog, schema,
request.getName());
Table table =
- TreeLockUtils.doWithTreeLock(
- NameIdentifier.of(metalake, catalog, schema),
- LockType.WRITE,
- () ->
- dispatcher.createTable(
- ident,
- fromDTOs(request.getColumns()),
- request.getComment(),
- request.getProperties(),
- fromDTOs(request.getPartitioning()),
- fromDTO(request.getDistribution()),
- fromDTOs(request.getSortOrders()),
- fromDTOs(request.getIndexes())));
+ dispatcher.createTable(
+ ident,
+ fromDTOs(request.getColumns()),
+ request.getComment(),
+ request.getProperties(),
+ fromDTOs(request.getPartitioning()),
+ fromDTO(request.getDistribution()),
+ fromDTOs(request.getSortOrders()),
+ fromDTOs(request.getIndexes()));
Response response = Utils.ok(new
TableResponse(DTOConverters.toDTO(table)));
LOG.info("Table created: {}.{}.{}.{}", metalake, catalog, schema,
request.getName());
return response;
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
index 5e32ff6d2..4e9bcd550 100644
---
a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
+++
b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
@@ -119,15 +119,11 @@ public class TopicOperations {
NameIdentifierUtil.ofTopic(metalake, catalog, schema,
request.getName());
Topic topic =
- TreeLockUtils.doWithTreeLock(
- NameIdentifierUtil.ofSchema(metalake, catalog, schema),
- LockType.WRITE,
- () ->
- dispatcher.createTopic(
- ident,
- request.getComment(),
- null /* dataLayout, always null because it's not
supported yet.*/,
- request.getProperties()));
+ dispatcher.createTopic(
+ ident,
+ request.getComment(),
+ null /* dataLayout, always null because it's not supported
yet.*/,
+ request.getProperties());
Response response = Utils.ok(new
TopicResponse(DTOConverters.toDTO(topic)));
LOG.info("Topic created: {}.{}.{}.{}", metalake, catalog, schema,
topic.name());
return response;