This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new f7a0c11cc [#4572][#3729][#3496] refactor(core): remove reload action 
from create and alter schema/table/topic (#4590)
f7a0c11cc is described below

commit f7a0c11cc62fb1b1343c1e5e5c82ecb303d869e6
Author: noidname01 <[email protected]>
AuthorDate: Wed Aug 21 03:00:25 2024 -0400

    [#4572][#3729][#3496] refactor(core): remove reload action from create and 
alter schema/table/topic (#4590)
    
    ### What changes were proposed in this pull request?
    
    Remove reload action and fix the following logic of ITs
    
    ### Why are the changes needed?
    
    some APIs in the underlying catalog are async, which will make the
    reload action failed
    
    Fix: #3729, #4572, #3496
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, The create/alter schema/table/topic functions will no longer
    automatically return the object with default values. Users must now
    manually call the load API to retrieve this information.
    
    
    
    ### How was this patch tested?
    
    Modified ITs
    
    ---------
    
    Co-authored-by: TimWang <[email protected]>
    Co-authored-by: mchades <[email protected]>
---
 .../java/org/apache/gravitino/SupportsSchemas.java |   8 +-
 .../apache/gravitino/messaging/TopicCatalog.java   |   8 +-
 .../org/apache/gravitino/rel/TableCatalog.java     |   8 +-
 .../hive/integration/test/CatalogHiveIT.java       |  62 +++++------
 .../hive/integration/test/ProxyCatalogHiveIT.java  |  14 +--
 .../integration/test/CatalogDorisDriverIT.java     |  28 -----
 .../doris/integration/test/CatalogDorisIT.java     |  86 +++++----------
 .../mysql/integration/test/CatalogMysqlIT.java     |  75 +++++--------
 .../integration/test/CatalogMysqlVersion5IT.java   |  21 ++--
 .../integration/test/CatalogPostgreSqlIT.java      | 120 +++++++++------------
 .../catalog/kafka/KafkaCatalogOperations.java      |  18 +++-
 .../kafka/integration/test/CatalogKafkaIT.java     |   7 +-
 .../org/apache/gravitino/client/GenericTopic.java  |   5 +
 .../catalog/SchemaOperationDispatcher.java         |  28 ++---
 .../catalog/TableOperationDispatcher.java          |  48 +++------
 .../catalog/TopicOperationDispatcher.java          |  34 +++---
 16 files changed, 230 insertions(+), 340 deletions(-)

diff --git a/api/src/main/java/org/apache/gravitino/SupportsSchemas.java 
b/api/src/main/java/org/apache/gravitino/SupportsSchemas.java
index 521934c0b..96fe10da0 100644
--- a/api/src/main/java/org/apache/gravitino/SupportsSchemas.java
+++ b/api/src/main/java/org/apache/gravitino/SupportsSchemas.java
@@ -65,12 +65,16 @@ public interface SupportsSchemas {
   }
 
   /**
-   * Create a schema in the catalog.
+   * Creates a schema in the catalog based on the provided details.
+   *
+   * <p>This method returns the schema as defined by the user without applying 
all defaults. If you
+   * need the schema with default values applied, use the {@link 
#loadSchema(String)} method after
+   * creation.
    *
    * @param schemaName The name of the schema.
    * @param comment The comment of the schema.
    * @param properties The properties of the schema.
-   * @return The created schema.
+   * @return The schema as defined by the caller, without all default values.
    * @throws NoSuchCatalogException If the catalog does not exist.
    * @throws SchemaAlreadyExistsException If the schema already exists.
    */
diff --git a/api/src/main/java/org/apache/gravitino/messaging/TopicCatalog.java 
b/api/src/main/java/org/apache/gravitino/messaging/TopicCatalog.java
index 362e5f342..b07155f8f 100644
--- a/api/src/main/java/org/apache/gravitino/messaging/TopicCatalog.java
+++ b/api/src/main/java/org/apache/gravitino/messaging/TopicCatalog.java
@@ -67,7 +67,11 @@ public interface TopicCatalog {
   }
 
   /**
-   * Create a topic in the catalog.
+   * Create a topic in the catalog based on the provided details.
+   *
+   * <p>This method returns the topic as defined by the user without applying 
all defaults. If you
+   * need the topic with default values applied, use the {@link 
#loadTopic(NameIdentifier)} method
+   * after creation.
    *
    * @param ident A topic identifier.
    * @param comment The comment of the topic object. Null is set if no comment 
is specified.
@@ -75,7 +79,7 @@ public interface TopicCatalog {
    *     supported yet.
    * @param properties The properties of the topic object. Empty map is set if 
no properties are
    *     specified.
-   * @return The topic metadata.
+   * @return The topic as defined by the caller, without all default values.
    * @throws NoSuchSchemaException If the schema does not exist.
    * @throws TopicAlreadyExistsException If the topic already exists.
    */
diff --git a/api/src/main/java/org/apache/gravitino/rel/TableCatalog.java 
b/api/src/main/java/org/apache/gravitino/rel/TableCatalog.java
index 5a82d38bc..f2eaafb76 100644
--- a/api/src/main/java/org/apache/gravitino/rel/TableCatalog.java
+++ b/api/src/main/java/org/apache/gravitino/rel/TableCatalog.java
@@ -75,13 +75,17 @@ public interface TableCatalog {
   }
 
   /**
-   * Create a table in the catalog.
+   * Create a table in the catalog based on the provided details.
+   *
+   * <p>This method returns the table as defined by the user without applying 
all defaults. If you
+   * need the table with default values applied, use the {@link 
#loadTable(NameIdentifier)} method
+   * after creation.
    *
    * @param ident A table identifier.
    * @param columns The columns of the new table.
    * @param comment The table comment.
    * @param properties The table properties.
-   * @return The created table metadata.
+   * @return The table as defined by the caller, without all default values.
    * @throws NoSuchSchemaException If the schema does not exist.
    * @throws TableAlreadyExistsException If the table already exists.
    */
diff --git 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
index 0018088a4..68e3b4e96 100644
--- 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
+++ 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
@@ -557,43 +557,39 @@ public class CatalogHiveIT extends AbstractIT {
     Column[] columns = createColumns();
     NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
     // test default properties
-    Table createdTable =
-        catalog
-            .asTableCatalog()
-            .createTable(
-                nameIdentifier,
-                columns,
-                TABLE_COMMENT,
-                ImmutableMap.of(),
-                Transforms.EMPTY_TRANSFORM);
+    catalog
+        .asTableCatalog()
+        .createTable(
+            nameIdentifier, columns, TABLE_COMMENT, ImmutableMap.of(), 
Transforms.EMPTY_TRANSFORM);
+    Table loadedTable1 = catalog.asTableCatalog().loadTable(nameIdentifier);
     HiveTablePropertiesMetadata tablePropertiesMetadata = new 
HiveTablePropertiesMetadata();
     org.apache.hadoop.hive.metastore.api.Table actualTable =
         hiveClientPool.run(client -> client.getTable(schemaName, tableName));
-    assertDefaultTableProperties(createdTable, actualTable);
+    assertDefaultTableProperties(loadedTable1, actualTable);
     checkTableReadWrite(actualTable);
 
     // test set properties
     String table2 = GravitinoITUtils.genRandomName(TABLE_PREFIX);
-    Table createdTable2 =
-        catalog
-            .asTableCatalog()
-            .createTable(
-                NameIdentifier.of(schemaName, table2),
-                columns,
-                TABLE_COMMENT,
-                ImmutableMap.of(
-                    TABLE_TYPE,
-                    "external_table",
-                    LOCATION,
-                    String.format(
-                        "hdfs://%s:%d/tmp",
-                        
containerSuite.getHiveContainer().getContainerIpAddress(),
-                        HiveContainer.HDFS_DEFAULTFS_PORT),
-                    FORMAT,
-                    "textfile",
-                    SERDE_LIB,
-                    HiveStorageConstants.OPENCSV_SERDE_CLASS),
-                Transforms.EMPTY_TRANSFORM);
+    catalog
+        .asTableCatalog()
+        .createTable(
+            NameIdentifier.of(schemaName, table2),
+            columns,
+            TABLE_COMMENT,
+            ImmutableMap.of(
+                TABLE_TYPE,
+                "external_table",
+                LOCATION,
+                String.format(
+                    "hdfs://%s:%d/tmp",
+                    containerSuite.getHiveContainer().getContainerIpAddress(),
+                    HiveContainer.HDFS_DEFAULTFS_PORT),
+                FORMAT,
+                "textfile",
+                SERDE_LIB,
+                HiveStorageConstants.OPENCSV_SERDE_CLASS),
+            Transforms.EMPTY_TRANSFORM);
+    Table loadedTable2 = 
catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, table2));
     org.apache.hadoop.hive.metastore.api.Table actualTable2 =
         hiveClientPool.run(client -> client.getTable(schemaName, table2));
 
@@ -612,9 +608,9 @@ public class CatalogHiveIT extends AbstractIT {
         ((Boolean) 
tablePropertiesMetadata.getDefaultValue(EXTERNAL)).toString().toUpperCase(),
         actualTable.getParameters().get(EXTERNAL));
     Assertions.assertTrue(actualTable2.getSd().getLocation().endsWith("/tmp"));
-    
Assertions.assertNotNull(createdTable2.properties().get(TRANSIENT_LAST_DDL_TIME));
-    Assertions.assertNotNull(createdTable2.properties().get(NUM_FILES));
-    Assertions.assertNotNull(createdTable2.properties().get(TOTAL_SIZE));
+    
Assertions.assertNotNull(loadedTable2.properties().get(TRANSIENT_LAST_DDL_TIME));
+    Assertions.assertNotNull(loadedTable2.properties().get(NUM_FILES));
+    Assertions.assertNotNull(loadedTable2.properties().get(TOTAL_SIZE));
     checkTableReadWrite(actualTable2);
 
     // test alter properties exception
diff --git 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/ProxyCatalogHiveIT.java
 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/ProxyCatalogHiveIT.java
index 0bdb1ad3f..a8a003fcf 100644
--- 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/ProxyCatalogHiveIT.java
+++ 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/ProxyCatalogHiveIT.java
@@ -181,15 +181,11 @@ public class ProxyCatalogHiveIT extends AbstractIT {
     String comment = "comment";
     createSchema(schemaName, comment);
 
-    Table createdTable =
-        catalog
-            .asTableCatalog()
-            .createTable(
-                nameIdentifier,
-                columns,
-                comment,
-                ImmutableMap.of(),
-                Partitioning.EMPTY_PARTITIONING);
+    catalog
+        .asTableCatalog()
+        .createTable(
+            nameIdentifier, columns, comment, ImmutableMap.of(), 
Partitioning.EMPTY_PARTITIONING);
+    Table createdTable = catalog.asTableCatalog().loadTable(nameIdentifier);
     String location = createdTable.properties().get("location");
     Assertions.assertEquals(EXPECT_USER, hdfs.getFileStatus(new 
Path(location)).getOwner());
     org.apache.hadoop.hive.metastore.api.Table hiveTab =
diff --git 
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisDriverIT.java
 
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisDriverIT.java
deleted file mode 100644
index b185264e5..000000000
--- 
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisDriverIT.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.catalog.doris.integration.test;
-
-import org.junit.jupiter.api.Tag;
-
-@Tag("gravitino-docker-test")
-public class CatalogDorisDriverIT extends CatalogDorisIT {
-  public CatalogDorisDriverIT() {
-    super();
-  }
-}
diff --git 
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
 
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
index fcfad401d..98478ad23 100644
--- 
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
+++ 
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
@@ -332,25 +332,15 @@ public class CatalogDorisIT extends AbstractIT {
 
     Map<String, String> properties = createTableProperties();
     TableCatalog tableCatalog = catalog.asTableCatalog();
-    Table createdTable =
-        tableCatalog.createTable(
-            tableIdentifier,
-            columns,
-            table_comment,
-            properties,
-            Transforms.EMPTY_TRANSFORM,
-            distribution,
-            null,
-            indexes);
-
-    ITUtils.assertionsTableInfo(
-        tableName,
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
         table_comment,
-        Arrays.asList(columns),
         properties,
-        indexes,
         Transforms.EMPTY_TRANSFORM,
-        createdTable);
+        distribution,
+        null,
+        indexes);
 
     // load table
     Table loadTable = tableCatalog.loadTable(tableIdentifier);
@@ -489,16 +479,16 @@ public class CatalogDorisIT extends AbstractIT {
 
     Map<String, String> properties = createTableProperties();
     TableCatalog tableCatalog = catalog.asTableCatalog();
-    Table createdTable =
-        tableCatalog.createTable(
-            tableIdentifier,
-            columns,
-            table_comment,
-            properties,
-            Transforms.EMPTY_TRANSFORM,
-            distribution,
-            null,
-            indexes);
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        distribution,
+        null,
+        indexes);
+    Table loadedTable = tableCatalog.loadTable(tableIdentifier);
 
     ITUtils.assertionsTableInfo(
         tableName,
@@ -507,7 +497,7 @@ public class CatalogDorisIT extends AbstractIT {
         properties,
         indexes,
         Transforms.EMPTY_TRANSFORM,
-        createdTable);
+        loadedTable);
 
     // Alter column type
     tableCatalog.alterTable(
@@ -641,24 +631,15 @@ public class CatalogDorisIT extends AbstractIT {
     Map<String, String> properties = createTableProperties();
     Transform[] partitioning = {Transforms.list(new String[][] 
{{DORIS_COL_NAME1}})};
     TableCatalog tableCatalog = catalog.asTableCatalog();
-    Table createdTable =
-        tableCatalog.createTable(
-            tableIdentifier,
-            columns,
-            table_comment,
-            properties,
-            partitioning,
-            distribution,
-            null,
-            indexes);
-    ITUtils.assertionsTableInfo(
-        tableName,
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
         table_comment,
-        Arrays.asList(columns),
         properties,
-        indexes,
         partitioning,
-        createdTable);
+        distribution,
+        null,
+        indexes);
 
     // load table
     Table loadTable = tableCatalog.loadTable(tableIdentifier);
@@ -872,24 +853,15 @@ public class CatalogDorisIT extends AbstractIT {
     Map<String, String> properties = createTableProperties();
     Transform[] partitioning = Transforms.EMPTY_TRANSFORM;
     TableCatalog tableCatalog = catalog.asTableCatalog();
-    Table createdTable =
-        tableCatalog.createTable(
-            tableIdentifier,
-            columns,
-            table_comment,
-            properties,
-            partitioning,
-            distribution,
-            null,
-            indexes);
-    ITUtils.assertionsTableInfo(
-        tableName,
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
         table_comment,
-        Arrays.asList(columns),
         properties,
-        indexes,
         partitioning,
-        createdTable);
+        distribution,
+        null,
+        indexes);
 
     // load table
     Table loadTable = tableCatalog.loadTable(tableIdentifier);
diff --git 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
 
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
index 6404283d8..bfa529452 100644
--- 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
+++ 
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
@@ -338,31 +338,19 @@ public class CatalogMysqlIT extends AbstractIT {
 
     Map<String, String> properties = createProperties();
     TableCatalog tableCatalog = catalog.asTableCatalog();
-    Table createdTable =
-        tableCatalog.createTable(
-            tableIdentifier,
-            columns,
-            table_comment,
-            properties,
-            partitioning,
-            distribution,
-            sortOrders);
-    Assertions.assertEquals(createdTable.name(), tableName);
-    Map<String, String> resultProp = createdTable.properties();
-    for (Map.Entry<String, String> entry : properties.entrySet()) {
-      Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
-      Assertions.assertEquals(entry.getValue(), 
resultProp.get(entry.getKey()));
-    }
-    Assertions.assertEquals(createdTable.columns().length, columns.length);
-
-    for (int i = 0; i < columns.length; i++) {
-      ITUtils.assertColumn(columns[i], createdTable.columns()[i]);
-    }
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        partitioning,
+        distribution,
+        sortOrders);
 
     Table loadTable = tableCatalog.loadTable(tableIdentifier);
     Assertions.assertEquals(tableName, loadTable.name());
     Assertions.assertEquals(table_comment, loadTable.comment());
-    resultProp = loadTable.properties();
+    Map<String, String> resultProp = loadTable.properties();
     for (Map.Entry<String, String> entry : properties.entrySet()) {
       Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
       Assertions.assertEquals(entry.getValue(), 
resultProp.get(entry.getKey()));
@@ -450,15 +438,10 @@ public class CatalogMysqlIT extends AbstractIT {
 
     Column[] newColumns = new Column[] {col1, col2, col3, col4, col5};
 
-    Table createdTable =
-        catalog
-            .asTableCatalog()
-            .createTable(
-                NameIdentifier.of(schemaName, 
GravitinoITUtils.genRandomName("mysql_it_table")),
-                newColumns,
-                null,
-                ImmutableMap.of());
-
+    NameIdentifier tableIdent =
+        NameIdentifier.of(schemaName, 
GravitinoITUtils.genRandomName("mysql_it_table"));
+    catalog.asTableCatalog().createTable(tableIdent, newColumns, null, 
ImmutableMap.of());
+    Table createdTable = catalog.asTableCatalog().loadTable(tableIdent);
     Assertions.assertEquals(
         UnparsedExpression.of("rand()"), 
createdTable.columns()[0].defaultValue());
     Assertions.assertEquals(
@@ -972,22 +955,22 @@ public class CatalogMysqlIT extends AbstractIT {
             illegalArgumentException.getMessage(),
             "Index does not support complex fields in MySQL"));
 
-    table =
-        tableCatalog.createTable(
-            NameIdentifier.of(schemaName, "test_null_key"),
-            newColumns,
-            table_comment,
-            properties,
-            Transforms.EMPTY_TRANSFORM,
-            Distributions.NONE,
-            new SortOrder[0],
-            new Index[] {
-              Indexes.of(
-                  Index.IndexType.UNIQUE_KEY,
-                  null,
-                  new String[][] {{"col_1"}, {"col_3"}, {"col_4"}}),
-              Indexes.of(Index.IndexType.UNIQUE_KEY, null, new String[][] 
{{"col_4"}}),
-            });
+    NameIdentifier tableIdent = NameIdentifier.of(schemaName, "test_null_key");
+    tableCatalog.createTable(
+        tableIdent,
+        newColumns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        new SortOrder[0],
+        new Index[] {
+          Indexes.of(
+              Index.IndexType.UNIQUE_KEY, null, new String[][] {{"col_1"}, 
{"col_3"}, {"col_4"}}),
+          Indexes.of(Index.IndexType.UNIQUE_KEY, null, new String[][] 
{{"col_4"}}),
+        });
+    table = tableCatalog.loadTable(tableIdent);
+
     Assertions.assertEquals(2, table.index().length);
     Assertions.assertNotNull(table.index()[0].name());
     Assertions.assertNotNull(table.index()[1].name());
diff --git 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlVersion5IT.java
 
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlVersion5IT.java
index 9922c6dfd..4f022a35e 100644
--- 
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlVersion5IT.java
+++ 
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlVersion5IT.java
@@ -75,21 +75,16 @@ public class CatalogMysqlVersion5IT extends CatalogMysqlIT {
 
     Column[] newColumns = new Column[] {col1, col2, col3, col4};
 
-    Table createdTable =
-        catalog
-            .asTableCatalog()
-            .createTable(
-                NameIdentifier.of(schemaName, 
GravitinoITUtils.genRandomName("mysql_it_table")),
-                newColumns,
-                null,
-                ImmutableMap.of());
+    NameIdentifier tableIdent =
+        NameIdentifier.of(schemaName, 
GravitinoITUtils.genRandomName("mysql_it_table"));
+    catalog.asTableCatalog().createTable(tableIdent, newColumns, null, 
ImmutableMap.of());
+    Table loadedTable = catalog.asTableCatalog().loadTable(tableIdent);
 
     Assertions.assertEquals(
-        DEFAULT_VALUE_OF_CURRENT_TIMESTAMP, 
createdTable.columns()[0].defaultValue());
-    Assertions.assertEquals(Literals.NULL, 
createdTable.columns()[1].defaultValue());
-    Assertions.assertEquals(Column.DEFAULT_VALUE_NOT_SET, 
createdTable.columns()[2].defaultValue());
+        DEFAULT_VALUE_OF_CURRENT_TIMESTAMP, 
loadedTable.columns()[0].defaultValue());
+    Assertions.assertEquals(Literals.NULL, 
loadedTable.columns()[1].defaultValue());
+    Assertions.assertEquals(Column.DEFAULT_VALUE_NOT_SET, 
loadedTable.columns()[2].defaultValue());
     Assertions.assertEquals(
-        Literals.varcharLiteral(255, "current_timestamp"),
-        createdTable.columns()[3].defaultValue());
+        Literals.varcharLiteral(255, "current_timestamp"), 
loadedTable.columns()[3].defaultValue());
   }
 }
diff --git 
a/catalogs/catalog-jdbc-postgresql/src/test/java/org/apache/gravitino/catalog/postgresql/integration/test/CatalogPostgreSqlIT.java
 
b/catalogs/catalog-jdbc-postgresql/src/test/java/org/apache/gravitino/catalog/postgresql/integration/test/CatalogPostgreSqlIT.java
index 685e04eb3..5b7ec298f 100644
--- 
a/catalogs/catalog-jdbc-postgresql/src/test/java/org/apache/gravitino/catalog/postgresql/integration/test/CatalogPostgreSqlIT.java
+++ 
b/catalogs/catalog-jdbc-postgresql/src/test/java/org/apache/gravitino/catalog/postgresql/integration/test/CatalogPostgreSqlIT.java
@@ -247,8 +247,8 @@ public class CatalogPostgreSqlIT extends AbstractIT {
     NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
 
     TableCatalog tableCatalog = catalog.asTableCatalog();
-    Table createdTable =
-        tableCatalog.createTable(tableIdentifier, columns, null, 
ImmutableMap.of());
+    tableCatalog.createTable(tableIdentifier, columns, null, 
ImmutableMap.of());
+    Table createdTable = tableCatalog.loadTable(tableIdentifier);
 
     Assertions.assertEquals(tableName, createdTable.name());
     Assertions.assertEquals(columns.length, createdTable.columns().length);
@@ -494,31 +494,19 @@ public class CatalogPostgreSqlIT extends AbstractIT {
 
     Map<String, String> properties = createProperties();
     TableCatalog tableCatalog = catalog.asTableCatalog();
-    Table createdTable =
-        tableCatalog.createTable(
-            tableIdentifier,
-            columns,
-            table_comment,
-            properties,
-            partitioning,
-            distribution,
-            sortOrders);
-    Assertions.assertEquals(createdTable.name(), tableName);
-    Map<String, String> resultProp = createdTable.properties();
-    for (Map.Entry<String, String> entry : properties.entrySet()) {
-      Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
-      Assertions.assertEquals(entry.getValue(), 
resultProp.get(entry.getKey()));
-    }
-    Assertions.assertEquals(createdTable.columns().length, columns.length);
-
-    for (int i = 0; i < columns.length; i++) {
-      ITUtils.assertColumn(columns[i], createdTable.columns()[i]);
-    }
+    tableCatalog.createTable(
+        tableIdentifier,
+        columns,
+        table_comment,
+        properties,
+        partitioning,
+        distribution,
+        sortOrders);
 
     Table loadTable = tableCatalog.loadTable(tableIdentifier);
     Assertions.assertEquals(tableName, loadTable.name());
     Assertions.assertEquals(table_comment, loadTable.comment());
-    resultProp = loadTable.properties();
+    Map<String, String> resultProp = loadTable.properties();
     for (Map.Entry<String, String> entry : properties.entrySet()) {
       Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
       Assertions.assertEquals(entry.getValue(), 
resultProp.get(entry.getKey()));
@@ -806,40 +794,42 @@ public class CatalogPostgreSqlIT extends AbstractIT {
             "Index does not support complex fields in PostgreSQL"));
 
     // Test create index with empty name success.
-    table =
-        tableCatalog.createTable(
-            NameIdentifier.of(schemaName, "test_null_key"),
-            newColumns,
-            table_comment,
-            properties,
-            Transforms.EMPTY_TRANSFORM,
-            Distributions.NONE,
-            new SortOrder[0],
-            new Index[] {
-              Indexes.of(
-                  Index.IndexType.UNIQUE_KEY,
-                  null,
-                  new String[][] {{"col_1"}, {"col_3"}, {"col_4"}}),
-              Indexes.of(Index.IndexType.UNIQUE_KEY, null, new String[][] 
{{"col_4"}}),
-            });
+    NameIdentifier tableIdent = NameIdentifier.of(schemaName, "test_null_key");
+    tableCatalog.createTable(
+        tableIdent,
+        newColumns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        new SortOrder[0],
+        new Index[] {
+          Indexes.of(
+              Index.IndexType.UNIQUE_KEY, null, new String[][] {{"col_1"}, 
{"col_3"}, {"col_4"}}),
+          Indexes.of(Index.IndexType.UNIQUE_KEY, null, new String[][] 
{{"col_4"}}),
+        });
+    table = tableCatalog.loadTable(tableIdent);
+
     Assertions.assertEquals(2, table.index().length);
     Assertions.assertNotNull(table.index()[0].name());
     Assertions.assertNotNull(table.index()[1].name());
 
     // Test create index with same col success.
-    table =
-        tableCatalog.createTable(
-            NameIdentifier.of(schemaName, "many_index"),
-            newColumns,
-            table_comment,
-            properties,
-            Transforms.EMPTY_TRANSFORM,
-            Distributions.NONE,
-            new SortOrder[0],
-            new Index[] {
-              Indexes.unique("u4_key_2", new String[][] {{"col_2"}, {"col_3"}, 
{"col_4"}}),
-              Indexes.unique("u5_key_3", new String[][] {{"col_2"}, {"col_3"}, 
{"col_4"}}),
-            });
+    tableIdent = NameIdentifier.of(schemaName, "many_index");
+    tableCatalog.createTable(
+        tableIdent,
+        newColumns,
+        table_comment,
+        properties,
+        Transforms.EMPTY_TRANSFORM,
+        Distributions.NONE,
+        new SortOrder[0],
+        new Index[] {
+          Indexes.unique("u4_key_2", new String[][] {{"col_2"}, {"col_3"}, 
{"col_4"}}),
+          Indexes.unique("u5_key_3", new String[][] {{"col_2"}, {"col_3"}, 
{"col_4"}}),
+        });
+    table = tableCatalog.loadTable(tableIdent);
+
     Assertions.assertEquals(1, table.index().length);
     Assertions.assertEquals("u4_key_2", table.index()[0].name());
   }
@@ -884,26 +874,20 @@ public class CatalogPostgreSqlIT extends AbstractIT {
 
     Column[] newColumns = new Column[] {col1, col2, col3, col4, col5, col6};
 
-    Table createdTable =
-        catalog
-            .asTableCatalog()
-            .createTable(
-                NameIdentifier.of(schemaName, 
GravitinoITUtils.genRandomName("pg_it_table")),
-                newColumns,
-                null,
-                ImmutableMap.of());
+    NameIdentifier tableIdent =
+        NameIdentifier.of(schemaName, 
GravitinoITUtils.genRandomName("pg_it_table"));
+    catalog.asTableCatalog().createTable(tableIdent, newColumns, null, 
ImmutableMap.of());
+    Table loadedTable = catalog.asTableCatalog().loadTable(tableIdent);
 
     Assertions.assertEquals(
-        UnparsedExpression.of("random()"), 
createdTable.columns()[0].defaultValue());
-    Assertions.assertEquals(
-        DEFAULT_VALUE_OF_CURRENT_TIMESTAMP, 
createdTable.columns()[1].defaultValue());
-    Assertions.assertEquals(Literals.NULL, 
createdTable.columns()[2].defaultValue());
-    Assertions.assertEquals(Column.DEFAULT_VALUE_NOT_SET, 
createdTable.columns()[3].defaultValue());
+        UnparsedExpression.of("random()"), 
loadedTable.columns()[0].defaultValue());
     Assertions.assertEquals(
-        Literals.varcharLiteral(255, "current_timestamp"),
-        createdTable.columns()[4].defaultValue());
+        DEFAULT_VALUE_OF_CURRENT_TIMESTAMP, 
loadedTable.columns()[1].defaultValue());
+    Assertions.assertEquals(Literals.NULL, 
loadedTable.columns()[2].defaultValue());
+    Assertions.assertEquals(Column.DEFAULT_VALUE_NOT_SET, 
loadedTable.columns()[3].defaultValue());
     Assertions.assertEquals(
-        Literals.integerLiteral(1000), 
createdTable.columns()[5].defaultValue());
+        Literals.varcharLiteral(255, "current_timestamp"), 
loadedTable.columns()[4].defaultValue());
+    Assertions.assertEquals(Literals.integerLiteral(1000), 
loadedTable.columns()[5].defaultValue());
   }
 
   @Test
diff --git 
a/catalogs/catalog-kafka/src/main/java/org/apache/gravitino/catalog/kafka/KafkaCatalogOperations.java
 
b/catalogs/catalog-kafka/src/main/java/org/apache/gravitino/catalog/kafka/KafkaCatalogOperations.java
index d3901218d..66f49a02c 100644
--- 
a/catalogs/catalog-kafka/src/main/java/org/apache/gravitino/catalog/kafka/KafkaCatalogOperations.java
+++ 
b/catalogs/catalog-kafka/src/main/java/org/apache/gravitino/catalog/kafka/KafkaCatalogOperations.java
@@ -260,19 +260,31 @@ public class KafkaCatalogOperations implements 
CatalogOperations, SupportsSchema
       CreateTopicsResult createTopicsResult =
           adminClient.createTopics(Collections.singleton(buildNewTopic(ident, 
properties)));
       Uuid topicId = createTopicsResult.topicId(ident.name()).get();
+      Integer numPartitions = 
createTopicsResult.numPartitions(ident.name()).get();
+      Integer replicationFactor = 
createTopicsResult.replicationFactor(ident.name()).get();
+      Config topicConfigs = createTopicsResult.config(ident.name()).get();
+
+      Map<String, String> created_properties = Maps.newHashMap();
+
       LOG.info(
           "Created topic {}[id: {}] with {} partitions and replication factor 
{}",
           ident,
           topicId,
-          createTopicsResult.numPartitions(ident.name()).get(),
-          createTopicsResult.replicationFactor(ident.name()).get());
+          numPartitions,
+          replicationFactor);
+
+      created_properties.put(
+          KafkaTopicPropertiesMetadata.PARTITION_COUNT, 
String.valueOf(numPartitions));
+      created_properties.put(
+          KafkaTopicPropertiesMetadata.REPLICATION_FACTOR, 
String.valueOf(replicationFactor));
+      topicConfigs.entries().forEach(e -> created_properties.put(e.name(), 
e.value()));
 
       return KafkaTopic.builder()
           .withName(ident.name())
           .withComment(comment)
           // Because there is no way to store the Gravitino ID in Kafka, 
therefor we use the topic
           // ID as the Gravitino ID
-          .withProperties(newPropertiesWithId(convertToGravitinoId(topicId), 
properties))
+          .withProperties(newPropertiesWithId(convertToGravitinoId(topicId), 
created_properties))
           .withAuditInfo(
               AuditInfo.builder()
                   .withCreator(PrincipalUtils.getCurrentPrincipal().getName())
diff --git 
a/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/integration/test/CatalogKafkaIT.java
 
b/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/integration/test/CatalogKafkaIT.java
index 84b5379fb..b73a6c1b8 100644
--- 
a/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/integration/test/CatalogKafkaIT.java
+++ 
b/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/integration/test/CatalogKafkaIT.java
@@ -358,12 +358,15 @@ public class CatalogKafkaIT extends AbstractIT {
     Topic loadedTopic =
         
catalog.asTopicCatalog().loadTopic(NameIdentifier.of(DEFAULT_SCHEMA_NAME, 
topicName));
 
-    Assertions.assertEquals(alteredTopic, loadedTopic);
     Assertions.assertEquals("new comment", alteredTopic.comment());
     Assertions.assertEquals("3", 
alteredTopic.properties().get(PARTITION_COUNT));
+    
Assertions.assertNull(alteredTopic.properties().get(TopicConfig.RETENTION_MS_CONFIG));
+
+    Assertions.assertEquals("new comment", loadedTopic.comment());
+    Assertions.assertEquals("3", 
loadedTopic.properties().get(PARTITION_COUNT));
     // retention.ms overridden was removed, so it should be the default value
     Assertions.assertEquals(
-        "604800000", 
alteredTopic.properties().get(TopicConfig.RETENTION_MS_CONFIG));
+        "604800000", 
loadedTopic.properties().get(TopicConfig.RETENTION_MS_CONFIG));
     checkTopicReadWrite(topicName);
   }
 
diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/GenericTopic.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/GenericTopic.java
index e317df069..55edfdd54 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/GenericTopic.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/GenericTopic.java
@@ -109,4 +109,9 @@ class GenericTopic implements Topic, SupportsTags {
   public int hashCode() {
     return topicDTO.hashCode();
   }
+
+  @Override
+  public String toString() {
+    return "GenericTopic{" + "topicDTO=" + topicDTO.toString() + '}';
+  }
 }
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
 
b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
index 758d19737..c59b36177 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
@@ -111,7 +111,9 @@ public class SchemaOperationDispatcher extends 
OperationDispatcher implements Sc
     Map<String, String> updatedProperties =
         StringIdentifier.newPropertiesWithId(stringId, properties);
 
-    Schema createdSchema =
+    // we do not retrieve the schema again (to obtain some values generated by 
underlying catalog)
+    // since some catalogs' API is async and the table may not be created 
immediately
+    Schema schema =
         doWithCatalog(
             catalogIdent,
             c -> c.doWithSchemaOps(s -> s.createSchema(ident, comment, 
updatedProperties)),
@@ -121,21 +123,14 @@ public class SchemaOperationDispatcher extends 
OperationDispatcher implements Sc
     // If the Schema is maintained by the Gravitino's store, we don't have to 
store again.
     boolean isManagedSchema = isManagedEntity(catalogIdent, 
Capability.Scope.SCHEMA);
     if (isManagedSchema) {
-      return EntityCombinedSchema.of(createdSchema)
+      return EntityCombinedSchema.of(schema)
           .withHiddenPropertiesSet(
               getHiddenPropertyNames(
                   catalogIdent,
                   HasPropertyMetadata::schemaPropertiesMetadata,
-                  createdSchema.properties()));
+                  schema.properties()));
     }
 
-    // Retrieve the Schema again to obtain some values generated by underlying 
catalog
-    Schema schema =
-        doWithCatalog(
-            catalogIdent,
-            c -> c.doWithSchemaOps(s -> s.loadSchema(ident)),
-            NoSuchSchemaException.class);
-
     SchemaEntity schemaEntity =
         SchemaEntity.builder()
             .withId(uid)
@@ -208,21 +203,10 @@ public class SchemaOperationDispatcher extends 
OperationDispatcher implements Sc
     validateAlterProperties(ident, 
HasPropertyMetadata::schemaPropertiesMetadata, changes);
 
     NameIdentifier catalogIdent = getCatalogIdentifier(ident);
-    Schema tempAlteredSchema =
-        doWithCatalog(
-            catalogIdent,
-            c -> c.doWithSchemaOps(s -> s.alterSchema(ident, changes)),
-            NoSuchSchemaException.class);
-
-    // Retrieve the Schema again to obtain some values generated by underlying 
catalog
     Schema alteredSchema =
         doWithCatalog(
             catalogIdent,
-            c ->
-                c.doWithSchemaOps(
-                    s ->
-                        s.loadSchema(
-                            NameIdentifier.of(ident.namespace(), 
tempAlteredSchema.name()))),
+            c -> c.doWithSchemaOps(s -> s.alterSchema(ident, changes)),
             NoSuchSchemaException.class);
 
     // If the Schema is maintained by the Gravitino's store, we don't have to 
alter again.
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index 404af695a..0562d99b7 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -161,29 +161,25 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
     Map<String, String> updatedProperties =
         StringIdentifier.newPropertiesWithId(stringId, properties);
 
-    doWithCatalog(
-        catalogIdent,
-        c ->
-            c.doWithTableOps(
-                t ->
-                    t.createTable(
-                        ident,
-                        columns,
-                        comment,
-                        updatedProperties,
-                        partitions == null ? EMPTY_TRANSFORM : partitions,
-                        distribution == null ? Distributions.NONE : 
distribution,
-                        sortOrders == null ? new SortOrder[0] : sortOrders,
-                        indexes == null ? Indexes.EMPTY_INDEXES : indexes)),
-        NoSuchSchemaException.class,
-        TableAlreadyExistsException.class);
-
-    // Retrieve the Table again to obtain some values generated by underlying 
catalog
+    // we do not retrieve the table again (to obtain some values generated by 
underlying catalog)
+    // since some catalogs' API is async and the table may not be created 
immediately
     Table table =
         doWithCatalog(
             catalogIdent,
-            c -> c.doWithTableOps(t -> t.loadTable(ident)),
-            NoSuchTableException.class);
+            c ->
+                c.doWithTableOps(
+                    t ->
+                        t.createTable(
+                            ident,
+                            columns,
+                            comment,
+                            updatedProperties,
+                            partitions == null ? EMPTY_TRANSFORM : partitions,
+                            distribution == null ? Distributions.NONE : 
distribution,
+                            sortOrders == null ? new SortOrder[0] : sortOrders,
+                            indexes == null ? Indexes.EMPTY_INDEXES : 
indexes)),
+            NoSuchSchemaException.class,
+            TableAlreadyExistsException.class);
 
     TableEntity tableEntity =
         TableEntity.builder()
@@ -229,7 +225,7 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
     validateAlterProperties(ident, 
HasPropertyMetadata::tablePropertiesMetadata, changes);
 
     NameIdentifier catalogIdent = getCatalogIdentifier(ident);
-    Table tempAlteredTable =
+    Table alteredTable =
         doWithCatalog(
             catalogIdent,
             c ->
@@ -238,16 +234,6 @@ public class TableOperationDispatcher extends 
OperationDispatcher implements Tab
             NoSuchTableException.class,
             IllegalArgumentException.class);
 
-    // Retrieve the Table again to obtain some values generated by underlying 
catalog
-    Table alteredTable =
-        doWithCatalog(
-            catalogIdent,
-            c ->
-                c.doWithTableOps(
-                    t ->
-                        t.loadTable(NameIdentifier.of(ident.namespace(), 
tempAlteredTable.name()))),
-            NoSuchTableException.class);
-
     StringIdentifier stringId = 
getStringIdFromProperties(alteredTable.properties());
     // Case 1: The table is not created by Gravitino.
     if (stringId == null) {
diff --git 
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java 
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
index 981d999f9..ec3c08b33 100644
--- 
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
+++ 
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
@@ -143,18 +143,15 @@ public class TopicOperationDispatcher extends 
OperationDispatcher implements Top
     Map<String, String> updatedProperties =
         StringIdentifier.newPropertiesWithId(stringId, properties);
 
-    doWithCatalog(
-        catalogIdent,
-        c -> c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout, 
updatedProperties)),
-        NoSuchSchemaException.class,
-        TopicAlreadyExistsException.class);
-
-    // Retrieve the Topic again to obtain some values generated by underlying 
catalog
+    // we do not retrieve the topic again (to obtain some values generated by 
underlying catalog)
+    // since some catalogs' API is async and the table may not be created 
immediately
     Topic topic =
         doWithCatalog(
             catalogIdent,
-            c -> c.doWithTopicOps(t -> t.loadTopic(ident)),
-            NoSuchTopicException.class);
+            c ->
+                c.doWithTopicOps(t -> t.createTopic(ident, comment, 
dataLayout, updatedProperties)),
+            NoSuchSchemaException.class,
+            TopicAlreadyExistsException.class);
 
     TopicEntity topicEntity =
         TopicEntity.builder()
@@ -200,23 +197,16 @@ public class TopicOperationDispatcher extends 
OperationDispatcher implements Top
     validateAlterProperties(ident, 
HasPropertyMetadata::topicPropertiesMetadata, changes);
 
     NameIdentifier catalogIdent = getCatalogIdentifier(ident);
-    Topic tempAlteredTopic =
+
+    // we do not retrieve the topic again (to obtain some values generated by 
underlying catalog)
+    // since some catalogs' API is async and the table may not be created 
immediately
+    Topic alteredTopic =
         doWithCatalog(
             catalogIdent,
             c -> c.doWithTopicOps(t -> t.alterTopic(ident, changes)),
             NoSuchTopicException.class,
             IllegalArgumentException.class);
 
-    // Retrieve the Topic again to obtain some values generated by underlying 
catalog
-    Topic alteredTopic =
-        doWithCatalog(
-            catalogIdent,
-            c ->
-                c.doWithTopicOps(
-                    t ->
-                        t.loadTopic(NameIdentifier.of(ident.namespace(), 
tempAlteredTopic.name()))),
-            NoSuchTopicException.class);
-
     TopicEntity updatedTopicEntity =
         operateOnEntity(
             ident,
@@ -231,9 +221,9 @@ public class TopicOperationDispatcher extends 
OperationDispatcher implements Top
                             .withName(topicEntity.name())
                             .withNamespace(ident.namespace())
                             .withComment(
-                                StringUtils.isBlank(tempAlteredTopic.comment())
+                                StringUtils.isBlank(alteredTopic.comment())
                                     ? topicEntity.comment()
-                                    : tempAlteredTopic.comment())
+                                    : alteredTopic.comment())
                             .withAuditInfo(
                                 AuditInfo.builder()
                                     
.withCreator(topicEntity.auditInfo().creator())


Reply via email to