This is an automated email from the ASF dual-hosted git repository.
yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new a6a69cc9f [#4812] fix(core): Make ensure the schema exists before
creating tables and topics (#4818)
a6a69cc9f is described below
commit a6a69cc9fcb88a0ea069711e1b7711da598e1f7c
Author: roryqi <[email protected]>
AuthorDate: Fri Aug 30 22:10:13 2024 +0800
[#4812] fix(core): Make ensure the schema exists before creating tables and
topics (#4818)
### What changes were proposed in this pull request?
If the schema is not created by Gravitino, the Gravitino will lack the
metadata in the backend storage.
If we create a table in this schema, the storage won't contain the
metadata. So it will fail to set owner. Because the storage won't store
the table. Because the storage won't contain schema id.
This won't bring too much performance cost. Because loadSchema will use
read lock after first loading. If we have cache, we could be more quick.
### Why are the changes needed?
Fix: #4812
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added a ut.
---
.../catalog/TableOperationDispatcher.java | 156 ++++++++++++---------
.../catalog/TopicOperationDispatcher.java | 114 ++++++++-------
.../catalog/TestPartitionNormalizeDispatcher.java | 2 +-
.../catalog/TestPartitionOperationDispatcher.java | 22 ++-
.../catalog/TestTableOperationDispatcher.java | 26 ++++
.../catalog/TestTopicOperationDispatcher.java | 29 +++-
.../gravitino/server/web/rest/TableOperations.java | 22 ++-
.../gravitino/server/web/rest/TopicOperations.java | 14 +-
8 files changed, 241 insertions(+), 144 deletions(-)
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index 0562d99b7..4472859d9 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -143,70 +143,25 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
SortOrder[] sortOrders,
Index[] indexes)
throws NoSuchSchemaException, TableAlreadyExistsException {
- NameIdentifier catalogIdent = getCatalogIdentifier(ident);
- doWithCatalog(
- catalogIdent,
- c ->
- c.doWithPropertiesMeta(
- p -> {
- validatePropertyForCreate(p.tablePropertiesMetadata(),
properties);
- return null;
- }),
- IllegalArgumentException.class);
- long uid = idGenerator.nextId();
- // Add StringIdentifier to the properties, the specific catalog will
handle this
- // StringIdentifier to make sure only when the operation is successful,
the related
- // TableEntity will be visible.
- StringIdentifier stringId = StringIdentifier.fromId(uid);
- Map<String, String> updatedProperties =
- StringIdentifier.newPropertiesWithId(stringId, properties);
-
- // we do not retrieve the table again (to obtain some values generated by
underlying catalog)
- // since some catalogs' API is async and the table may not be created
immediately
- Table table =
- doWithCatalog(
- catalogIdent,
- c ->
- c.doWithTableOps(
- t ->
- t.createTable(
- ident,
- columns,
- comment,
- updatedProperties,
- partitions == null ? EMPTY_TRANSFORM : partitions,
- distribution == null ? Distributions.NONE :
distribution,
- sortOrders == null ? new SortOrder[0] : sortOrders,
- indexes == null ? Indexes.EMPTY_INDEXES :
indexes)),
- NoSuchSchemaException.class,
- TableAlreadyExistsException.class);
-
- TableEntity tableEntity =
- TableEntity.builder()
- .withId(uid)
- .withName(ident.name())
- .withNamespace(ident.namespace())
- .withAuditInfo(
- AuditInfo.builder()
-
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
- .withCreateTime(Instant.now())
- .build())
- .build();
-
- try {
- store.put(tableEntity, true /* overwrite */);
- } catch (Exception e) {
- LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
- return EntityCombinedTable.of(table)
- .withHiddenPropertiesSet(
- getHiddenPropertyNames(
- catalogIdent, HasPropertyMetadata::tablePropertiesMetadata,
table.properties()));
- }
- return EntityCombinedTable.of(table, tableEntity)
- .withHiddenPropertiesSet(
- getHiddenPropertyNames(
- catalogIdent, HasPropertyMetadata::tablePropertiesMetadata,
table.properties()));
+ // Load the schema to make sure the schema exists.
+ SchemaDispatcher schemaDispatcher =
GravitinoEnv.getInstance().schemaDispatcher();
+ NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+ schemaDispatcher.loadSchema(schemaIdent);
+
+ return TreeLockUtils.doWithTreeLock(
+ NameIdentifier.of(ident.namespace().levels()),
+ LockType.WRITE,
+ () ->
+ internalCreateTable(
+ ident,
+ columns,
+ comment,
+ properties,
+ partitions,
+ distribution,
+ sortOrders,
+ indexes));
}
/**
@@ -462,4 +417,79 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
table.properties()))
.withImported(tableEntity != null);
}
+
+ private Table internalCreateTable(
+ NameIdentifier ident,
+ Column[] columns,
+ String comment,
+ Map<String, String> properties,
+ Transform[] partitions,
+ Distribution distribution,
+ SortOrder[] sortOrders,
+ Index[] indexes) {
+ NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+ doWithCatalog(
+ catalogIdent,
+ c ->
+ c.doWithPropertiesMeta(
+ p -> {
+ validatePropertyForCreate(p.tablePropertiesMetadata(),
properties);
+ return null;
+ }),
+ IllegalArgumentException.class);
+ long uid = idGenerator.nextId();
+ // Add StringIdentifier to the properties, the specific catalog will
handle this
+ // StringIdentifier to make sure only when the operation is successful,
the related
+ // TableEntity will be visible.
+ StringIdentifier stringId = StringIdentifier.fromId(uid);
+ Map<String, String> updatedProperties =
+ StringIdentifier.newPropertiesWithId(stringId, properties);
+
+ // we do not retrieve the table again (to obtain some values generated by
underlying catalog)
+ // since some catalogs' API is async and the table may not be created
immediately
+ Table table =
+ doWithCatalog(
+ catalogIdent,
+ c ->
+ c.doWithTableOps(
+ t ->
+ t.createTable(
+ ident,
+ columns,
+ comment,
+ updatedProperties,
+ partitions == null ? EMPTY_TRANSFORM : partitions,
+ distribution == null ? Distributions.NONE :
distribution,
+ sortOrders == null ? new SortOrder[0] : sortOrders,
+ indexes == null ? Indexes.EMPTY_INDEXES :
indexes)),
+ NoSuchSchemaException.class,
+ TableAlreadyExistsException.class);
+
+ TableEntity tableEntity =
+ TableEntity.builder()
+ .withId(uid)
+ .withName(ident.name())
+ .withNamespace(ident.namespace())
+ .withAuditInfo(
+ AuditInfo.builder()
+
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+ .withCreateTime(Instant.now())
+ .build())
+ .build();
+
+ try {
+ store.put(tableEntity, true /* overwrite */);
+ } catch (Exception e) {
+ LOG.error(FormattedErrorMessages.STORE_OP_FAILURE, "put", ident, e);
+ return EntityCombinedTable.of(table)
+ .withHiddenPropertiesSet(
+ getHiddenPropertyNames(
+ catalogIdent, HasPropertyMetadata::tablePropertiesMetadata,
table.properties()));
+ }
+
+ return EntityCombinedTable.of(table, tableEntity)
+ .withHiddenPropertiesSet(
+ getHiddenPropertyNames(
+ catalogIdent, HasPropertyMetadata::tablePropertiesMetadata,
table.properties()));
+ }
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
index ec3c08b33..3b6e4a478 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
@@ -128,58 +128,16 @@ public class TopicOperationDispatcher extends
OperationDispatcher implements Top
public Topic createTopic(
NameIdentifier ident, String comment, DataLayout dataLayout, Map<String,
String> properties)
throws NoSuchSchemaException, TopicAlreadyExistsException {
- NameIdentifier catalogIdent = getCatalogIdentifier(ident);
- doWithCatalog(
- catalogIdent,
- c ->
- c.doWithPropertiesMeta(
- p -> {
- validatePropertyForCreate(p.topicPropertiesMetadata(),
properties);
- return null;
- }),
- IllegalArgumentException.class);
- Long uid = idGenerator.nextId();
- StringIdentifier stringId = StringIdentifier.fromId(uid);
- Map<String, String> updatedProperties =
- StringIdentifier.newPropertiesWithId(stringId, properties);
-
- // we do not retrieve the topic again (to obtain some values generated by
underlying catalog)
- // since some catalogs' API is async and the table may not be created
immediately
- Topic topic =
- doWithCatalog(
- catalogIdent,
- c ->
- c.doWithTopicOps(t -> t.createTopic(ident, comment,
dataLayout, updatedProperties)),
- NoSuchSchemaException.class,
- TopicAlreadyExistsException.class);
-
- TopicEntity topicEntity =
- TopicEntity.builder()
- .withId(fromProperties(topic.properties()).id())
- .withName(ident.name())
- .withComment(comment)
- .withNamespace(ident.namespace())
- .withAuditInfo(
- AuditInfo.builder()
-
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
- .withCreateTime(Instant.now())
- .build())
- .build();
- try {
- store.put(topicEntity, true /* overwrite */);
- } catch (Exception e) {
- LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE,
"put", ident, e);
- return EntityCombinedTopic.of(topic)
- .withHiddenPropertiesSet(
- getHiddenPropertyNames(
- catalogIdent, HasPropertyMetadata::topicPropertiesMetadata,
topic.properties()));
- }
+ // Load the schema to make sure the schema exists.
+ SchemaDispatcher schemaDispatcher =
GravitinoEnv.getInstance().schemaDispatcher();
+ NameIdentifier schemaIdent = NameIdentifier.of(ident.namespace().levels());
+ schemaDispatcher.loadSchema(schemaIdent);
- return EntityCombinedTopic.of(topic, topicEntity)
- .withHiddenPropertiesSet(
- getHiddenPropertyNames(
- catalogIdent, HasPropertyMetadata::topicPropertiesMetadata,
topic.properties()));
+ return TreeLockUtils.doWithTreeLock(
+ NameIdentifier.of(ident.namespace().levels()),
+ LockType.WRITE,
+ () -> internalCreateTopic(ident, comment, dataLayout, properties));
}
/**
@@ -364,4 +322,60 @@ public class TopicOperationDispatcher extends
OperationDispatcher implements Top
catalogIdent, HasPropertyMetadata::topicPropertiesMetadata,
topic.properties()))
.withImported(topicEntity != null);
}
+
+ private Topic internalCreateTopic(
+ NameIdentifier ident, String comment, DataLayout dataLayout, Map<String,
String> properties) {
+ NameIdentifier catalogIdent = getCatalogIdentifier(ident);
+ doWithCatalog(
+ catalogIdent,
+ c ->
+ c.doWithPropertiesMeta(
+ p -> {
+ validatePropertyForCreate(p.topicPropertiesMetadata(),
properties);
+ return null;
+ }),
+ IllegalArgumentException.class);
+ Long uid = idGenerator.nextId();
+ StringIdentifier stringId = StringIdentifier.fromId(uid);
+ Map<String, String> updatedProperties =
+ StringIdentifier.newPropertiesWithId(stringId, properties);
+
+ // we do not retrieve the topic again (to obtain some values generated by
underlying catalog)
+ // since some catalogs' API is async and the table may not be created
immediately
+ Topic topic =
+ doWithCatalog(
+ catalogIdent,
+ c ->
+ c.doWithTopicOps(t -> t.createTopic(ident, comment,
dataLayout, updatedProperties)),
+ NoSuchSchemaException.class,
+ TopicAlreadyExistsException.class);
+
+ TopicEntity topicEntity =
+ TopicEntity.builder()
+ .withId(fromProperties(topic.properties()).id())
+ .withName(ident.name())
+ .withComment(comment)
+ .withNamespace(ident.namespace())
+ .withAuditInfo(
+ AuditInfo.builder()
+
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
+ .withCreateTime(Instant.now())
+ .build())
+ .build();
+
+ try {
+ store.put(topicEntity, true /* overwrite */);
+ } catch (Exception e) {
+ LOG.error(OperationDispatcher.FormattedErrorMessages.STORE_OP_FAILURE,
"put", ident, e);
+ return EntityCombinedTopic.of(topic)
+ .withHiddenPropertiesSet(
+ getHiddenPropertyNames(
+ catalogIdent, HasPropertyMetadata::topicPropertiesMetadata,
topic.properties()));
+ }
+
+ return EntityCombinedTopic.of(topic, topicEntity)
+ .withHiddenPropertiesSet(
+ getHiddenPropertyNames(
+ catalogIdent, HasPropertyMetadata::topicPropertiesMetadata,
topic.properties()));
+ }
}
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
index 2c47d69bc..0151dcf24 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionNormalizeDispatcher.java
@@ -39,7 +39,7 @@ public class TestPartitionNormalizeDispatcher extends
TestOperationDispatcher {
NameIdentifierUtil.ofTable(metalake, catalog, SCHEMA,
"TEST_PARTITION_NORMALIZE_TABLE");
@BeforeAll
- public static void initialize() {
+ public static void initialize() throws IllegalAccessException {
TestPartitionOperationDispatcher.prepareTable();
partitionNormalizeDispatcher =
new PartitionNormalizeDispatcher(
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java
index 3d81ee557..9ddc3b1d3 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestPartitionOperationDispatcher.java
@@ -18,9 +18,19 @@
*/
package org.apache.gravitino.catalog;
+import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
+import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
+import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
import com.google.common.collect.Maps;
import java.util.Arrays;
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.apache.gravitino.Config;
+import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.lock.LockManager;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.expressions.literals.Literal;
import org.apache.gravitino.rel.expressions.literals.Literals;
@@ -50,7 +60,7 @@ public class TestPartitionOperationDispatcher extends
TestOperationDispatcher {
Maps.newHashMap());
@BeforeAll
- public static void initialize() {
+ public static void initialize() throws IllegalAccessException {
prepareTable();
partitionOperationDispatcher.addPartition(TABLE_IDENT, PARTITION);
@@ -66,7 +76,7 @@ public class TestPartitionOperationDispatcher extends
TestOperationDispatcher {
"Custom class loader is not used");
}
- protected static void prepareTable() {
+ protected static void prepareTable() throws IllegalAccessException {
schemaOperationDispatcher =
new SchemaOperationDispatcher(catalogManager, entityStore,
idGenerator);
tableOperationDispatcher =
@@ -74,6 +84,14 @@ public class TestPartitionOperationDispatcher extends
TestOperationDispatcher {
partitionOperationDispatcher =
new PartitionOperationDispatcher(catalogManager, entityStore,
idGenerator);
+ Config config = mock(Config.class);
+ doReturn(100000L).when(config).get(TREE_LOCK_MAX_NODE_IN_MEMORY);
+ doReturn(1000L).when(config).get(TREE_LOCK_MIN_NODE_IN_MEMORY);
+ doReturn(36000L).when(config).get(TREE_LOCK_CLEAN_INTERVAL);
+ FieldUtils.writeField(GravitinoEnv.getInstance(), "lockManager", new
LockManager(config), true);
+ FieldUtils.writeField(
+ GravitinoEnv.getInstance(), "schemaDispatcher",
schemaOperationDispatcher, true);
+
NameIdentifier schemaIdent = NameIdentifierUtil.ofSchema(metalake,
catalog, SCHEMA);
schemaOperationDispatcher.createSchema(schemaIdent, "comment", null);
Column[] columns =
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
index f5f69b77a..1dd31fd33 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestTableOperationDispatcher.java
@@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -44,8 +45,10 @@ import org.apache.gravitino.Config;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
+import org.apache.gravitino.TestCatalog;
import org.apache.gravitino.TestColumn;
import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.connector.TestCatalogOperations;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.lock.LockManager;
import org.apache.gravitino.meta.AuditInfo;
@@ -306,6 +309,9 @@ public class TestTableOperationDispatcher extends
TestOperationDispatcher {
TestColumn.builder().withName("col2").withType(Types.StringType.get()).build()
};
+ schemaOperationDispatcher.createSchema(
+ NameIdentifier.of(tableIdent.namespace().levels()), "comment", props);
+
tableOperationDispatcher.createTable(tableIdent, columns, "comment",
props, new Transform[0]);
boolean dropped = tableOperationDispatcher.dropTable(tableIdent);
@@ -319,4 +325,24 @@ public class TestTableOperationDispatcher extends
TestOperationDispatcher {
Assertions.assertThrows(
RuntimeException.class, () ->
tableOperationDispatcher.dropTable(tableIdent));
}
+
+ @Test
+ public void testCreateTableNeedImportingSchema() throws IOException {
+ Namespace tableNs = Namespace.of(metalake, catalog, "schema181");
+ NameIdentifier tableIdent = NameIdentifier.of(tableNs, "topic81");
+ Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+ TestCatalog testCatalog =
+ (TestCatalog) catalogManager.loadCatalog(NameIdentifier.of(metalake,
catalog));
+ TestCatalogOperations testCatalogOperations = (TestCatalogOperations)
testCatalog.ops();
+ testCatalogOperations.createSchema(
+ NameIdentifier.of(tableNs.levels()), "", Collections.emptyMap());
+ Column[] columns =
+ new Column[] {
+
TestColumn.builder().withName("col1").withType(Types.StringType.get()).build(),
+
TestColumn.builder().withName("col2").withType(Types.StringType.get()).build()
+ };
+ tableOperationDispatcher.createTable(tableIdent, columns, "comment",
props);
+
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(tableNs.levels()),
SCHEMA));
+ Assertions.assertTrue(entityStore.exists(tableIdent, TABLE));
+ }
}
diff --git
a/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
b/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
index 27afc5090..ac694883d 100644
---
a/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
+++
b/core/src/test/java/org/apache/gravitino/catalog/TestTopicOperationDispatcher.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.catalog;
import static org.apache.gravitino.Configs.TREE_LOCK_CLEAN_INTERVAL;
import static org.apache.gravitino.Configs.TREE_LOCK_MAX_NODE_IN_MEMORY;
import static org.apache.gravitino.Configs.TREE_LOCK_MIN_NODE_IN_MEMORY;
+import static org.apache.gravitino.Entity.EntityType.SCHEMA;
import static org.apache.gravitino.StringIdentifier.ID_KEY;
import static org.apache.gravitino.TestBasePropertiesMetadata.COMMENT_KEY;
import static org.mockito.ArgumentMatchers.any;
@@ -33,6 +34,7 @@ import static org.mockito.Mockito.reset;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.time.Instant;
+import java.util.Collections;
import java.util.Map;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.gravitino.Config;
@@ -40,7 +42,9 @@ import org.apache.gravitino.Entity;
import org.apache.gravitino.GravitinoEnv;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.Namespace;
+import org.apache.gravitino.TestCatalog;
import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.connector.TestCatalogOperations;
import org.apache.gravitino.exceptions.NoSuchEntityException;
import org.apache.gravitino.lock.LockManager;
import org.apache.gravitino.messaging.Topic;
@@ -122,25 +126,23 @@ public class TestTopicOperationDispatcher extends
TestOperationDispatcher {
// Case 2: Test if the topic entity is not found in the entity store
reset(entityStore);
entityStore.delete(topicIdent1, Entity.EntityType.TOPIC);
- entityStore.delete(NameIdentifier.of(topicNs.levels()),
Entity.EntityType.SCHEMA);
+ entityStore.delete(NameIdentifier.of(topicNs.levels()), SCHEMA);
doThrow(new NoSuchEntityException("")).when(entityStore).get(any(), any(),
any());
Topic loadedTopic2 = topicOperationDispatcher.loadTopic(topicIdent1);
// Succeed to import the topic entity
Assertions.assertTrue(entityStore.exists(topicIdent1,
Entity.EntityType.TOPIC));
- Assertions.assertTrue(
- entityStore.exists(NameIdentifier.of(topicNs.levels()),
Entity.EntityType.SCHEMA));
+
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(topicNs.levels()),
SCHEMA));
// Audit info is gotten from the catalog, not from the entity store
Assertions.assertEquals("test", loadedTopic2.auditInfo().creator());
// Case 3: Test if the entity store is failed to get the topic entity
reset(entityStore);
entityStore.delete(topicIdent1, Entity.EntityType.TOPIC);
- entityStore.delete(NameIdentifier.of(topicNs.levels()),
Entity.EntityType.SCHEMA);
+ entityStore.delete(NameIdentifier.of(topicNs.levels()), SCHEMA);
doThrow(new IOException()).when(entityStore).get(any(), any(), any());
Topic loadedTopic3 = topicOperationDispatcher.loadTopic(topicIdent1);
// Succeed to import the topic entity
- Assertions.assertTrue(
- entityStore.exists(NameIdentifier.of(topicNs.levels()),
Entity.EntityType.SCHEMA));
+
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(topicNs.levels()),
SCHEMA));
Assertions.assertTrue(entityStore.exists(topicIdent1,
Entity.EntityType.TOPIC));
// Audit info is gotten from the catalog, not from the entity store
Assertions.assertEquals("test", loadedTopic3.auditInfo().creator());
@@ -250,4 +252,19 @@ public class TestTopicOperationDispatcher extends
TestOperationDispatcher {
Assertions.assertThrows(
RuntimeException.class, () ->
topicOperationDispatcher.dropTopic(topicIdent));
}
+
+ @Test
+ public void testCreateTopicNeedImportingSchema() throws IOException {
+ Namespace topicNs = Namespace.of(metalake, catalog, "schema161");
+ NameIdentifier topicIdent = NameIdentifier.of(topicNs, "topic61");
+ Map<String, String> props = ImmutableMap.of("k1", "v1", "k2", "v2");
+ TestCatalog testCatalog =
+ (TestCatalog) catalogManager.loadCatalog(NameIdentifier.of(metalake,
catalog));
+ TestCatalogOperations testCatalogOperations = (TestCatalogOperations)
testCatalog.ops();
+ testCatalogOperations.createSchema(
+ NameIdentifier.of(topicNs.levels()), "", Collections.emptyMap());
+ topicOperationDispatcher.createTopic(topicIdent, "comment", null, props);
+
Assertions.assertTrue(entityStore.exists(NameIdentifier.of(topicNs.levels()),
SCHEMA));
+ Assertions.assertTrue(entityStore.exists(topicIdent,
Entity.EntityType.TOPIC));
+ }
}
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
index e8ad56401..d5cf1ffc7 100644
---
a/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
+++
b/server/src/main/java/org/apache/gravitino/server/web/rest/TableOperations.java
@@ -121,19 +121,15 @@ public class TableOperations {
NameIdentifierUtil.ofTable(metalake, catalog, schema,
request.getName());
Table table =
- TreeLockUtils.doWithTreeLock(
- NameIdentifier.of(metalake, catalog, schema),
- LockType.WRITE,
- () ->
- dispatcher.createTable(
- ident,
- fromDTOs(request.getColumns()),
- request.getComment(),
- request.getProperties(),
- fromDTOs(request.getPartitioning()),
- fromDTO(request.getDistribution()),
- fromDTOs(request.getSortOrders()),
- fromDTOs(request.getIndexes())));
+ dispatcher.createTable(
+ ident,
+ fromDTOs(request.getColumns()),
+ request.getComment(),
+ request.getProperties(),
+ fromDTOs(request.getPartitioning()),
+ fromDTO(request.getDistribution()),
+ fromDTOs(request.getSortOrders()),
+ fromDTOs(request.getIndexes()));
Response response = Utils.ok(new
TableResponse(DTOConverters.toDTO(table)));
LOG.info("Table created: {}.{}.{}.{}", metalake, catalog, schema,
request.getName());
return response;
diff --git
a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
index 5e32ff6d2..4e9bcd550 100644
---
a/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
+++
b/server/src/main/java/org/apache/gravitino/server/web/rest/TopicOperations.java
@@ -119,15 +119,11 @@ public class TopicOperations {
NameIdentifierUtil.ofTopic(metalake, catalog, schema,
request.getName());
Topic topic =
- TreeLockUtils.doWithTreeLock(
- NameIdentifierUtil.ofSchema(metalake, catalog, schema),
- LockType.WRITE,
- () ->
- dispatcher.createTopic(
- ident,
- request.getComment(),
- null /* dataLayout, always null because it's not
supported yet.*/,
- request.getProperties()));
+ dispatcher.createTopic(
+ ident,
+ request.getComment(),
+ null /* dataLayout, always null because it's not supported
yet.*/,
+ request.getProperties());
Response response = Utils.ok(new
TopicResponse(DTOConverters.toDTO(topic)));
LOG.info("Topic created: {}.{}.{}.{}", metalake, catalog, schema,
topic.name());
return response;