This is an automated email from the ASF dual-hosted git repository.
jshao pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new f7a0c11cc [#4572][#3729][#3496] refactor(core): remove reload action
from create and alter schema/table/topic (#4590)
f7a0c11cc is described below
commit f7a0c11cc62fb1b1343c1e5e5c82ecb303d869e6
Author: noidname01 <[email protected]>
AuthorDate: Wed Aug 21 03:00:25 2024 -0400
[#4572][#3729][#3496] refactor(core): remove reload action from create and
alter schema/table/topic (#4590)
### What changes were proposed in this pull request?
Remove reload action and fix the following logic of ITs
### Why are the changes needed?
some APIs in the underlying catalog are async, which will make the
reload action failed
Fix: #3729, #4572, #3496
### Does this PR introduce _any_ user-facing change?
yes, The create/alter schema/table/topic functions will no longer
automatically return the object with default values. Users must now
manually call the load API to retrieve this information.
### How was this patch tested?
Modified ITs
---------
Co-authored-by: TimWang <[email protected]>
Co-authored-by: mchades <[email protected]>
---
.../java/org/apache/gravitino/SupportsSchemas.java | 8 +-
.../apache/gravitino/messaging/TopicCatalog.java | 8 +-
.../org/apache/gravitino/rel/TableCatalog.java | 8 +-
.../hive/integration/test/CatalogHiveIT.java | 62 +++++------
.../hive/integration/test/ProxyCatalogHiveIT.java | 14 +--
.../integration/test/CatalogDorisDriverIT.java | 28 -----
.../doris/integration/test/CatalogDorisIT.java | 86 +++++----------
.../mysql/integration/test/CatalogMysqlIT.java | 75 +++++--------
.../integration/test/CatalogMysqlVersion5IT.java | 21 ++--
.../integration/test/CatalogPostgreSqlIT.java | 120 +++++++++------------
.../catalog/kafka/KafkaCatalogOperations.java | 18 +++-
.../kafka/integration/test/CatalogKafkaIT.java | 7 +-
.../org/apache/gravitino/client/GenericTopic.java | 5 +
.../catalog/SchemaOperationDispatcher.java | 28 ++---
.../catalog/TableOperationDispatcher.java | 48 +++------
.../catalog/TopicOperationDispatcher.java | 34 +++---
16 files changed, 230 insertions(+), 340 deletions(-)
diff --git a/api/src/main/java/org/apache/gravitino/SupportsSchemas.java
b/api/src/main/java/org/apache/gravitino/SupportsSchemas.java
index 521934c0b..96fe10da0 100644
--- a/api/src/main/java/org/apache/gravitino/SupportsSchemas.java
+++ b/api/src/main/java/org/apache/gravitino/SupportsSchemas.java
@@ -65,12 +65,16 @@ public interface SupportsSchemas {
}
/**
- * Create a schema in the catalog.
+ * Creates a schema in the catalog based on the provided details.
+ *
+ * <p>This method returns the schema as defined by the user without applying
all defaults. If you
+ * need the schema with default values applied, use the {@link
#loadSchema(String)} method after
+ * creation.
*
* @param schemaName The name of the schema.
* @param comment The comment of the schema.
* @param properties The properties of the schema.
- * @return The created schema.
+ * @return The schema as defined by the caller, without all default values.
* @throws NoSuchCatalogException If the catalog does not exist.
* @throws SchemaAlreadyExistsException If the schema already exists.
*/
diff --git a/api/src/main/java/org/apache/gravitino/messaging/TopicCatalog.java
b/api/src/main/java/org/apache/gravitino/messaging/TopicCatalog.java
index 362e5f342..b07155f8f 100644
--- a/api/src/main/java/org/apache/gravitino/messaging/TopicCatalog.java
+++ b/api/src/main/java/org/apache/gravitino/messaging/TopicCatalog.java
@@ -67,7 +67,11 @@ public interface TopicCatalog {
}
/**
- * Create a topic in the catalog.
+ * Create a topic in the catalog based on the provided details.
+ *
+ * <p>This method returns the topic as defined by the user without applying
all defaults. If you
+ * need the topic with default values applied, use the {@link
#loadTopic(NameIdentifier)} method
+ * after creation.
*
* @param ident A topic identifier.
* @param comment The comment of the topic object. Null is set if no comment
is specified.
@@ -75,7 +79,7 @@ public interface TopicCatalog {
* supported yet.
* @param properties The properties of the topic object. Empty map is set if
no properties are
* specified.
- * @return The topic metadata.
+ * @return The topic as defined by the caller, without all default values.
* @throws NoSuchSchemaException If the schema does not exist.
* @throws TopicAlreadyExistsException If the topic already exists.
*/
diff --git a/api/src/main/java/org/apache/gravitino/rel/TableCatalog.java
b/api/src/main/java/org/apache/gravitino/rel/TableCatalog.java
index 5a82d38bc..f2eaafb76 100644
--- a/api/src/main/java/org/apache/gravitino/rel/TableCatalog.java
+++ b/api/src/main/java/org/apache/gravitino/rel/TableCatalog.java
@@ -75,13 +75,17 @@ public interface TableCatalog {
}
/**
- * Create a table in the catalog.
+ * Create a table in the catalog based on the provided details.
+ *
+ * <p>This method returns the table as defined by the user without applying
all defaults. If you
+ * need the table with default values applied, use the {@link
#loadTable(NameIdentifier)} method
+ * after creation.
*
* @param ident A table identifier.
* @param columns The columns of the new table.
* @param comment The table comment.
* @param properties The table properties.
- * @return The created table metadata.
+ * @return The table as defined by the caller, without all default values.
* @throws NoSuchSchemaException If the schema does not exist.
* @throws TableAlreadyExistsException If the table already exists.
*/
diff --git
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
index 0018088a4..68e3b4e96 100644
---
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
+++
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/CatalogHiveIT.java
@@ -557,43 +557,39 @@ public class CatalogHiveIT extends AbstractIT {
Column[] columns = createColumns();
NameIdentifier nameIdentifier = NameIdentifier.of(schemaName, tableName);
// test default properties
- Table createdTable =
- catalog
- .asTableCatalog()
- .createTable(
- nameIdentifier,
- columns,
- TABLE_COMMENT,
- ImmutableMap.of(),
- Transforms.EMPTY_TRANSFORM);
+ catalog
+ .asTableCatalog()
+ .createTable(
+ nameIdentifier, columns, TABLE_COMMENT, ImmutableMap.of(),
Transforms.EMPTY_TRANSFORM);
+ Table loadedTable1 = catalog.asTableCatalog().loadTable(nameIdentifier);
HiveTablePropertiesMetadata tablePropertiesMetadata = new
HiveTablePropertiesMetadata();
org.apache.hadoop.hive.metastore.api.Table actualTable =
hiveClientPool.run(client -> client.getTable(schemaName, tableName));
- assertDefaultTableProperties(createdTable, actualTable);
+ assertDefaultTableProperties(loadedTable1, actualTable);
checkTableReadWrite(actualTable);
// test set properties
String table2 = GravitinoITUtils.genRandomName(TABLE_PREFIX);
- Table createdTable2 =
- catalog
- .asTableCatalog()
- .createTable(
- NameIdentifier.of(schemaName, table2),
- columns,
- TABLE_COMMENT,
- ImmutableMap.of(
- TABLE_TYPE,
- "external_table",
- LOCATION,
- String.format(
- "hdfs://%s:%d/tmp",
-
containerSuite.getHiveContainer().getContainerIpAddress(),
- HiveContainer.HDFS_DEFAULTFS_PORT),
- FORMAT,
- "textfile",
- SERDE_LIB,
- HiveStorageConstants.OPENCSV_SERDE_CLASS),
- Transforms.EMPTY_TRANSFORM);
+ catalog
+ .asTableCatalog()
+ .createTable(
+ NameIdentifier.of(schemaName, table2),
+ columns,
+ TABLE_COMMENT,
+ ImmutableMap.of(
+ TABLE_TYPE,
+ "external_table",
+ LOCATION,
+ String.format(
+ "hdfs://%s:%d/tmp",
+ containerSuite.getHiveContainer().getContainerIpAddress(),
+ HiveContainer.HDFS_DEFAULTFS_PORT),
+ FORMAT,
+ "textfile",
+ SERDE_LIB,
+ HiveStorageConstants.OPENCSV_SERDE_CLASS),
+ Transforms.EMPTY_TRANSFORM);
+ Table loadedTable2 =
catalog.asTableCatalog().loadTable(NameIdentifier.of(schemaName, table2));
org.apache.hadoop.hive.metastore.api.Table actualTable2 =
hiveClientPool.run(client -> client.getTable(schemaName, table2));
@@ -612,9 +608,9 @@ public class CatalogHiveIT extends AbstractIT {
((Boolean)
tablePropertiesMetadata.getDefaultValue(EXTERNAL)).toString().toUpperCase(),
actualTable.getParameters().get(EXTERNAL));
Assertions.assertTrue(actualTable2.getSd().getLocation().endsWith("/tmp"));
-
Assertions.assertNotNull(createdTable2.properties().get(TRANSIENT_LAST_DDL_TIME));
- Assertions.assertNotNull(createdTable2.properties().get(NUM_FILES));
- Assertions.assertNotNull(createdTable2.properties().get(TOTAL_SIZE));
+
Assertions.assertNotNull(loadedTable2.properties().get(TRANSIENT_LAST_DDL_TIME));
+ Assertions.assertNotNull(loadedTable2.properties().get(NUM_FILES));
+ Assertions.assertNotNull(loadedTable2.properties().get(TOTAL_SIZE));
checkTableReadWrite(actualTable2);
// test alter properties exception
diff --git
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/ProxyCatalogHiveIT.java
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/ProxyCatalogHiveIT.java
index 0bdb1ad3f..a8a003fcf 100644
---
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/ProxyCatalogHiveIT.java
+++
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/ProxyCatalogHiveIT.java
@@ -181,15 +181,11 @@ public class ProxyCatalogHiveIT extends AbstractIT {
String comment = "comment";
createSchema(schemaName, comment);
- Table createdTable =
- catalog
- .asTableCatalog()
- .createTable(
- nameIdentifier,
- columns,
- comment,
- ImmutableMap.of(),
- Partitioning.EMPTY_PARTITIONING);
+ catalog
+ .asTableCatalog()
+ .createTable(
+ nameIdentifier, columns, comment, ImmutableMap.of(),
Partitioning.EMPTY_PARTITIONING);
+ Table createdTable = catalog.asTableCatalog().loadTable(nameIdentifier);
String location = createdTable.properties().get("location");
Assertions.assertEquals(EXPECT_USER, hdfs.getFileStatus(new
Path(location)).getOwner());
org.apache.hadoop.hive.metastore.api.Table hiveTab =
diff --git
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisDriverIT.java
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisDriverIT.java
deleted file mode 100644
index b185264e5..000000000
---
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisDriverIT.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.gravitino.catalog.doris.integration.test;
-
-import org.junit.jupiter.api.Tag;
-
-@Tag("gravitino-docker-test")
-public class CatalogDorisDriverIT extends CatalogDorisIT {
- public CatalogDorisDriverIT() {
- super();
- }
-}
diff --git
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
index fcfad401d..98478ad23 100644
---
a/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
+++
b/catalogs/catalog-jdbc-doris/src/test/java/org/apache/gravitino/catalog/doris/integration/test/CatalogDorisIT.java
@@ -332,25 +332,15 @@ public class CatalogDorisIT extends AbstractIT {
Map<String, String> properties = createTableProperties();
TableCatalog tableCatalog = catalog.asTableCatalog();
- Table createdTable =
- tableCatalog.createTable(
- tableIdentifier,
- columns,
- table_comment,
- properties,
- Transforms.EMPTY_TRANSFORM,
- distribution,
- null,
- indexes);
-
- ITUtils.assertionsTableInfo(
- tableName,
+ tableCatalog.createTable(
+ tableIdentifier,
+ columns,
table_comment,
- Arrays.asList(columns),
properties,
- indexes,
Transforms.EMPTY_TRANSFORM,
- createdTable);
+ distribution,
+ null,
+ indexes);
// load table
Table loadTable = tableCatalog.loadTable(tableIdentifier);
@@ -489,16 +479,16 @@ public class CatalogDorisIT extends AbstractIT {
Map<String, String> properties = createTableProperties();
TableCatalog tableCatalog = catalog.asTableCatalog();
- Table createdTable =
- tableCatalog.createTable(
- tableIdentifier,
- columns,
- table_comment,
- properties,
- Transforms.EMPTY_TRANSFORM,
- distribution,
- null,
- indexes);
+ tableCatalog.createTable(
+ tableIdentifier,
+ columns,
+ table_comment,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ distribution,
+ null,
+ indexes);
+ Table loadedTable = tableCatalog.loadTable(tableIdentifier);
ITUtils.assertionsTableInfo(
tableName,
@@ -507,7 +497,7 @@ public class CatalogDorisIT extends AbstractIT {
properties,
indexes,
Transforms.EMPTY_TRANSFORM,
- createdTable);
+ loadedTable);
// Alter column type
tableCatalog.alterTable(
@@ -641,24 +631,15 @@ public class CatalogDorisIT extends AbstractIT {
Map<String, String> properties = createTableProperties();
Transform[] partitioning = {Transforms.list(new String[][]
{{DORIS_COL_NAME1}})};
TableCatalog tableCatalog = catalog.asTableCatalog();
- Table createdTable =
- tableCatalog.createTable(
- tableIdentifier,
- columns,
- table_comment,
- properties,
- partitioning,
- distribution,
- null,
- indexes);
- ITUtils.assertionsTableInfo(
- tableName,
+ tableCatalog.createTable(
+ tableIdentifier,
+ columns,
table_comment,
- Arrays.asList(columns),
properties,
- indexes,
partitioning,
- createdTable);
+ distribution,
+ null,
+ indexes);
// load table
Table loadTable = tableCatalog.loadTable(tableIdentifier);
@@ -872,24 +853,15 @@ public class CatalogDorisIT extends AbstractIT {
Map<String, String> properties = createTableProperties();
Transform[] partitioning = Transforms.EMPTY_TRANSFORM;
TableCatalog tableCatalog = catalog.asTableCatalog();
- Table createdTable =
- tableCatalog.createTable(
- tableIdentifier,
- columns,
- table_comment,
- properties,
- partitioning,
- distribution,
- null,
- indexes);
- ITUtils.assertionsTableInfo(
- tableName,
+ tableCatalog.createTable(
+ tableIdentifier,
+ columns,
table_comment,
- Arrays.asList(columns),
properties,
- indexes,
partitioning,
- createdTable);
+ distribution,
+ null,
+ indexes);
// load table
Table loadTable = tableCatalog.loadTable(tableIdentifier);
diff --git
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
index 6404283d8..bfa529452 100644
---
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
+++
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlIT.java
@@ -338,31 +338,19 @@ public class CatalogMysqlIT extends AbstractIT {
Map<String, String> properties = createProperties();
TableCatalog tableCatalog = catalog.asTableCatalog();
- Table createdTable =
- tableCatalog.createTable(
- tableIdentifier,
- columns,
- table_comment,
- properties,
- partitioning,
- distribution,
- sortOrders);
- Assertions.assertEquals(createdTable.name(), tableName);
- Map<String, String> resultProp = createdTable.properties();
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
- Assertions.assertEquals(entry.getValue(),
resultProp.get(entry.getKey()));
- }
- Assertions.assertEquals(createdTable.columns().length, columns.length);
-
- for (int i = 0; i < columns.length; i++) {
- ITUtils.assertColumn(columns[i], createdTable.columns()[i]);
- }
+ tableCatalog.createTable(
+ tableIdentifier,
+ columns,
+ table_comment,
+ properties,
+ partitioning,
+ distribution,
+ sortOrders);
Table loadTable = tableCatalog.loadTable(tableIdentifier);
Assertions.assertEquals(tableName, loadTable.name());
Assertions.assertEquals(table_comment, loadTable.comment());
- resultProp = loadTable.properties();
+ Map<String, String> resultProp = loadTable.properties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
Assertions.assertEquals(entry.getValue(),
resultProp.get(entry.getKey()));
@@ -450,15 +438,10 @@ public class CatalogMysqlIT extends AbstractIT {
Column[] newColumns = new Column[] {col1, col2, col3, col4, col5};
- Table createdTable =
- catalog
- .asTableCatalog()
- .createTable(
- NameIdentifier.of(schemaName,
GravitinoITUtils.genRandomName("mysql_it_table")),
- newColumns,
- null,
- ImmutableMap.of());
-
+ NameIdentifier tableIdent =
+ NameIdentifier.of(schemaName,
GravitinoITUtils.genRandomName("mysql_it_table"));
+ catalog.asTableCatalog().createTable(tableIdent, newColumns, null,
ImmutableMap.of());
+ Table createdTable = catalog.asTableCatalog().loadTable(tableIdent);
Assertions.assertEquals(
UnparsedExpression.of("rand()"),
createdTable.columns()[0].defaultValue());
Assertions.assertEquals(
@@ -972,22 +955,22 @@ public class CatalogMysqlIT extends AbstractIT {
illegalArgumentException.getMessage(),
"Index does not support complex fields in MySQL"));
- table =
- tableCatalog.createTable(
- NameIdentifier.of(schemaName, "test_null_key"),
- newColumns,
- table_comment,
- properties,
- Transforms.EMPTY_TRANSFORM,
- Distributions.NONE,
- new SortOrder[0],
- new Index[] {
- Indexes.of(
- Index.IndexType.UNIQUE_KEY,
- null,
- new String[][] {{"col_1"}, {"col_3"}, {"col_4"}}),
- Indexes.of(Index.IndexType.UNIQUE_KEY, null, new String[][]
{{"col_4"}}),
- });
+ NameIdentifier tableIdent = NameIdentifier.of(schemaName, "test_null_key");
+ tableCatalog.createTable(
+ tableIdent,
+ newColumns,
+ table_comment,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ new SortOrder[0],
+ new Index[] {
+ Indexes.of(
+ Index.IndexType.UNIQUE_KEY, null, new String[][] {{"col_1"},
{"col_3"}, {"col_4"}}),
+ Indexes.of(Index.IndexType.UNIQUE_KEY, null, new String[][]
{{"col_4"}}),
+ });
+ table = tableCatalog.loadTable(tableIdent);
+
Assertions.assertEquals(2, table.index().length);
Assertions.assertNotNull(table.index()[0].name());
Assertions.assertNotNull(table.index()[1].name());
diff --git
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlVersion5IT.java
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlVersion5IT.java
index 9922c6dfd..4f022a35e 100644
---
a/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlVersion5IT.java
+++
b/catalogs/catalog-jdbc-mysql/src/test/java/org/apache/gravitino/catalog/mysql/integration/test/CatalogMysqlVersion5IT.java
@@ -75,21 +75,16 @@ public class CatalogMysqlVersion5IT extends CatalogMysqlIT {
Column[] newColumns = new Column[] {col1, col2, col3, col4};
- Table createdTable =
- catalog
- .asTableCatalog()
- .createTable(
- NameIdentifier.of(schemaName,
GravitinoITUtils.genRandomName("mysql_it_table")),
- newColumns,
- null,
- ImmutableMap.of());
+ NameIdentifier tableIdent =
+ NameIdentifier.of(schemaName,
GravitinoITUtils.genRandomName("mysql_it_table"));
+ catalog.asTableCatalog().createTable(tableIdent, newColumns, null,
ImmutableMap.of());
+ Table loadedTable = catalog.asTableCatalog().loadTable(tableIdent);
Assertions.assertEquals(
- DEFAULT_VALUE_OF_CURRENT_TIMESTAMP,
createdTable.columns()[0].defaultValue());
- Assertions.assertEquals(Literals.NULL,
createdTable.columns()[1].defaultValue());
- Assertions.assertEquals(Column.DEFAULT_VALUE_NOT_SET,
createdTable.columns()[2].defaultValue());
+ DEFAULT_VALUE_OF_CURRENT_TIMESTAMP,
loadedTable.columns()[0].defaultValue());
+ Assertions.assertEquals(Literals.NULL,
loadedTable.columns()[1].defaultValue());
+ Assertions.assertEquals(Column.DEFAULT_VALUE_NOT_SET,
loadedTable.columns()[2].defaultValue());
Assertions.assertEquals(
- Literals.varcharLiteral(255, "current_timestamp"),
- createdTable.columns()[3].defaultValue());
+ Literals.varcharLiteral(255, "current_timestamp"),
loadedTable.columns()[3].defaultValue());
}
}
diff --git
a/catalogs/catalog-jdbc-postgresql/src/test/java/org/apache/gravitino/catalog/postgresql/integration/test/CatalogPostgreSqlIT.java
b/catalogs/catalog-jdbc-postgresql/src/test/java/org/apache/gravitino/catalog/postgresql/integration/test/CatalogPostgreSqlIT.java
index 685e04eb3..5b7ec298f 100644
---
a/catalogs/catalog-jdbc-postgresql/src/test/java/org/apache/gravitino/catalog/postgresql/integration/test/CatalogPostgreSqlIT.java
+++
b/catalogs/catalog-jdbc-postgresql/src/test/java/org/apache/gravitino/catalog/postgresql/integration/test/CatalogPostgreSqlIT.java
@@ -247,8 +247,8 @@ public class CatalogPostgreSqlIT extends AbstractIT {
NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
TableCatalog tableCatalog = catalog.asTableCatalog();
- Table createdTable =
- tableCatalog.createTable(tableIdentifier, columns, null,
ImmutableMap.of());
+ tableCatalog.createTable(tableIdentifier, columns, null,
ImmutableMap.of());
+ Table createdTable = tableCatalog.loadTable(tableIdentifier);
Assertions.assertEquals(tableName, createdTable.name());
Assertions.assertEquals(columns.length, createdTable.columns().length);
@@ -494,31 +494,19 @@ public class CatalogPostgreSqlIT extends AbstractIT {
Map<String, String> properties = createProperties();
TableCatalog tableCatalog = catalog.asTableCatalog();
- Table createdTable =
- tableCatalog.createTable(
- tableIdentifier,
- columns,
- table_comment,
- properties,
- partitioning,
- distribution,
- sortOrders);
- Assertions.assertEquals(createdTable.name(), tableName);
- Map<String, String> resultProp = createdTable.properties();
- for (Map.Entry<String, String> entry : properties.entrySet()) {
- Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
- Assertions.assertEquals(entry.getValue(),
resultProp.get(entry.getKey()));
- }
- Assertions.assertEquals(createdTable.columns().length, columns.length);
-
- for (int i = 0; i < columns.length; i++) {
- ITUtils.assertColumn(columns[i], createdTable.columns()[i]);
- }
+ tableCatalog.createTable(
+ tableIdentifier,
+ columns,
+ table_comment,
+ properties,
+ partitioning,
+ distribution,
+ sortOrders);
Table loadTable = tableCatalog.loadTable(tableIdentifier);
Assertions.assertEquals(tableName, loadTable.name());
Assertions.assertEquals(table_comment, loadTable.comment());
- resultProp = loadTable.properties();
+ Map<String, String> resultProp = loadTable.properties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
Assertions.assertTrue(resultProp.containsKey(entry.getKey()));
Assertions.assertEquals(entry.getValue(),
resultProp.get(entry.getKey()));
@@ -806,40 +794,42 @@ public class CatalogPostgreSqlIT extends AbstractIT {
"Index does not support complex fields in PostgreSQL"));
// Test create index with empty name success.
- table =
- tableCatalog.createTable(
- NameIdentifier.of(schemaName, "test_null_key"),
- newColumns,
- table_comment,
- properties,
- Transforms.EMPTY_TRANSFORM,
- Distributions.NONE,
- new SortOrder[0],
- new Index[] {
- Indexes.of(
- Index.IndexType.UNIQUE_KEY,
- null,
- new String[][] {{"col_1"}, {"col_3"}, {"col_4"}}),
- Indexes.of(Index.IndexType.UNIQUE_KEY, null, new String[][]
{{"col_4"}}),
- });
+ NameIdentifier tableIdent = NameIdentifier.of(schemaName, "test_null_key");
+ tableCatalog.createTable(
+ tableIdent,
+ newColumns,
+ table_comment,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ new SortOrder[0],
+ new Index[] {
+ Indexes.of(
+ Index.IndexType.UNIQUE_KEY, null, new String[][] {{"col_1"},
{"col_3"}, {"col_4"}}),
+ Indexes.of(Index.IndexType.UNIQUE_KEY, null, new String[][]
{{"col_4"}}),
+ });
+ table = tableCatalog.loadTable(tableIdent);
+
Assertions.assertEquals(2, table.index().length);
Assertions.assertNotNull(table.index()[0].name());
Assertions.assertNotNull(table.index()[1].name());
// Test create index with same col success.
- table =
- tableCatalog.createTable(
- NameIdentifier.of(schemaName, "many_index"),
- newColumns,
- table_comment,
- properties,
- Transforms.EMPTY_TRANSFORM,
- Distributions.NONE,
- new SortOrder[0],
- new Index[] {
- Indexes.unique("u4_key_2", new String[][] {{"col_2"}, {"col_3"},
{"col_4"}}),
- Indexes.unique("u5_key_3", new String[][] {{"col_2"}, {"col_3"},
{"col_4"}}),
- });
+ tableIdent = NameIdentifier.of(schemaName, "many_index");
+ tableCatalog.createTable(
+ tableIdent,
+ newColumns,
+ table_comment,
+ properties,
+ Transforms.EMPTY_TRANSFORM,
+ Distributions.NONE,
+ new SortOrder[0],
+ new Index[] {
+ Indexes.unique("u4_key_2", new String[][] {{"col_2"}, {"col_3"},
{"col_4"}}),
+ Indexes.unique("u5_key_3", new String[][] {{"col_2"}, {"col_3"},
{"col_4"}}),
+ });
+ table = tableCatalog.loadTable(tableIdent);
+
Assertions.assertEquals(1, table.index().length);
Assertions.assertEquals("u4_key_2", table.index()[0].name());
}
@@ -884,26 +874,20 @@ public class CatalogPostgreSqlIT extends AbstractIT {
Column[] newColumns = new Column[] {col1, col2, col3, col4, col5, col6};
- Table createdTable =
- catalog
- .asTableCatalog()
- .createTable(
- NameIdentifier.of(schemaName,
GravitinoITUtils.genRandomName("pg_it_table")),
- newColumns,
- null,
- ImmutableMap.of());
+ NameIdentifier tableIdent =
+ NameIdentifier.of(schemaName,
GravitinoITUtils.genRandomName("pg_it_table"));
+ catalog.asTableCatalog().createTable(tableIdent, newColumns, null,
ImmutableMap.of());
+ Table loadedTable = catalog.asTableCatalog().loadTable(tableIdent);
Assertions.assertEquals(
- UnparsedExpression.of("random()"),
createdTable.columns()[0].defaultValue());
- Assertions.assertEquals(
- DEFAULT_VALUE_OF_CURRENT_TIMESTAMP,
createdTable.columns()[1].defaultValue());
- Assertions.assertEquals(Literals.NULL,
createdTable.columns()[2].defaultValue());
- Assertions.assertEquals(Column.DEFAULT_VALUE_NOT_SET,
createdTable.columns()[3].defaultValue());
+ UnparsedExpression.of("random()"),
loadedTable.columns()[0].defaultValue());
Assertions.assertEquals(
- Literals.varcharLiteral(255, "current_timestamp"),
- createdTable.columns()[4].defaultValue());
+ DEFAULT_VALUE_OF_CURRENT_TIMESTAMP,
loadedTable.columns()[1].defaultValue());
+ Assertions.assertEquals(Literals.NULL,
loadedTable.columns()[2].defaultValue());
+ Assertions.assertEquals(Column.DEFAULT_VALUE_NOT_SET,
loadedTable.columns()[3].defaultValue());
Assertions.assertEquals(
- Literals.integerLiteral(1000),
createdTable.columns()[5].defaultValue());
+ Literals.varcharLiteral(255, "current_timestamp"),
loadedTable.columns()[4].defaultValue());
+ Assertions.assertEquals(Literals.integerLiteral(1000),
loadedTable.columns()[5].defaultValue());
}
@Test
diff --git
a/catalogs/catalog-kafka/src/main/java/org/apache/gravitino/catalog/kafka/KafkaCatalogOperations.java
b/catalogs/catalog-kafka/src/main/java/org/apache/gravitino/catalog/kafka/KafkaCatalogOperations.java
index d3901218d..66f49a02c 100644
---
a/catalogs/catalog-kafka/src/main/java/org/apache/gravitino/catalog/kafka/KafkaCatalogOperations.java
+++
b/catalogs/catalog-kafka/src/main/java/org/apache/gravitino/catalog/kafka/KafkaCatalogOperations.java
@@ -260,19 +260,31 @@ public class KafkaCatalogOperations implements
CatalogOperations, SupportsSchema
CreateTopicsResult createTopicsResult =
adminClient.createTopics(Collections.singleton(buildNewTopic(ident,
properties)));
Uuid topicId = createTopicsResult.topicId(ident.name()).get();
+ Integer numPartitions =
createTopicsResult.numPartitions(ident.name()).get();
+ Integer replicationFactor =
createTopicsResult.replicationFactor(ident.name()).get();
+ Config topicConfigs = createTopicsResult.config(ident.name()).get();
+
+ Map<String, String> created_properties = Maps.newHashMap();
+
LOG.info(
"Created topic {}[id: {}] with {} partitions and replication factor
{}",
ident,
topicId,
- createTopicsResult.numPartitions(ident.name()).get(),
- createTopicsResult.replicationFactor(ident.name()).get());
+ numPartitions,
+ replicationFactor);
+
+ created_properties.put(
+ KafkaTopicPropertiesMetadata.PARTITION_COUNT,
String.valueOf(numPartitions));
+ created_properties.put(
+ KafkaTopicPropertiesMetadata.REPLICATION_FACTOR,
String.valueOf(replicationFactor));
+ topicConfigs.entries().forEach(e -> created_properties.put(e.name(),
e.value()));
return KafkaTopic.builder()
.withName(ident.name())
.withComment(comment)
// Because there is no way to store the Gravitino ID in Kafka,
therefor we use the topic
// ID as the Gravitino ID
- .withProperties(newPropertiesWithId(convertToGravitinoId(topicId),
properties))
+ .withProperties(newPropertiesWithId(convertToGravitinoId(topicId),
created_properties))
.withAuditInfo(
AuditInfo.builder()
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
diff --git
a/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/integration/test/CatalogKafkaIT.java
b/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/integration/test/CatalogKafkaIT.java
index 84b5379fb..b73a6c1b8 100644
---
a/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/integration/test/CatalogKafkaIT.java
+++
b/catalogs/catalog-kafka/src/test/java/org/apache/gravitino/catalog/kafka/integration/test/CatalogKafkaIT.java
@@ -358,12 +358,15 @@ public class CatalogKafkaIT extends AbstractIT {
Topic loadedTopic =
catalog.asTopicCatalog().loadTopic(NameIdentifier.of(DEFAULT_SCHEMA_NAME,
topicName));
- Assertions.assertEquals(alteredTopic, loadedTopic);
Assertions.assertEquals("new comment", alteredTopic.comment());
Assertions.assertEquals("3",
alteredTopic.properties().get(PARTITION_COUNT));
+
Assertions.assertNull(alteredTopic.properties().get(TopicConfig.RETENTION_MS_CONFIG));
+
+ Assertions.assertEquals("new comment", loadedTopic.comment());
+ Assertions.assertEquals("3",
loadedTopic.properties().get(PARTITION_COUNT));
// retention.ms overridden was removed, so it should be the default value
Assertions.assertEquals(
- "604800000",
alteredTopic.properties().get(TopicConfig.RETENTION_MS_CONFIG));
+ "604800000",
loadedTopic.properties().get(TopicConfig.RETENTION_MS_CONFIG));
checkTopicReadWrite(topicName);
}
diff --git
a/clients/client-java/src/main/java/org/apache/gravitino/client/GenericTopic.java
b/clients/client-java/src/main/java/org/apache/gravitino/client/GenericTopic.java
index e317df069..55edfdd54 100644
---
a/clients/client-java/src/main/java/org/apache/gravitino/client/GenericTopic.java
+++
b/clients/client-java/src/main/java/org/apache/gravitino/client/GenericTopic.java
@@ -109,4 +109,9 @@ class GenericTopic implements Topic, SupportsTags {
public int hashCode() {
return topicDTO.hashCode();
}
+
+ @Override
+ public String toString() {
+ return "GenericTopic{" + "topicDTO=" + topicDTO.toString() + '}';
+ }
}
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
index 758d19737..c59b36177 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/SchemaOperationDispatcher.java
@@ -111,7 +111,9 @@ public class SchemaOperationDispatcher extends
OperationDispatcher implements Sc
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);
- Schema createdSchema =
+ // we do not retrieve the schema again (to obtain some values generated by
underlying catalog)
+ // since some catalogs' API is async and the table may not be created
immediately
+ Schema schema =
doWithCatalog(
catalogIdent,
c -> c.doWithSchemaOps(s -> s.createSchema(ident, comment,
updatedProperties)),
@@ -121,21 +123,14 @@ public class SchemaOperationDispatcher extends
OperationDispatcher implements Sc
// If the Schema is maintained by the Gravitino's store, we don't have to
store again.
boolean isManagedSchema = isManagedEntity(catalogIdent,
Capability.Scope.SCHEMA);
if (isManagedSchema) {
- return EntityCombinedSchema.of(createdSchema)
+ return EntityCombinedSchema.of(schema)
.withHiddenPropertiesSet(
getHiddenPropertyNames(
catalogIdent,
HasPropertyMetadata::schemaPropertiesMetadata,
- createdSchema.properties()));
+ schema.properties()));
}
- // Retrieve the Schema again to obtain some values generated by underlying
catalog
- Schema schema =
- doWithCatalog(
- catalogIdent,
- c -> c.doWithSchemaOps(s -> s.loadSchema(ident)),
- NoSuchSchemaException.class);
-
SchemaEntity schemaEntity =
SchemaEntity.builder()
.withId(uid)
@@ -208,21 +203,10 @@ public class SchemaOperationDispatcher extends
OperationDispatcher implements Sc
validateAlterProperties(ident,
HasPropertyMetadata::schemaPropertiesMetadata, changes);
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
- Schema tempAlteredSchema =
- doWithCatalog(
- catalogIdent,
- c -> c.doWithSchemaOps(s -> s.alterSchema(ident, changes)),
- NoSuchSchemaException.class);
-
- // Retrieve the Schema again to obtain some values generated by underlying
catalog
Schema alteredSchema =
doWithCatalog(
catalogIdent,
- c ->
- c.doWithSchemaOps(
- s ->
- s.loadSchema(
- NameIdentifier.of(ident.namespace(),
tempAlteredSchema.name()))),
+ c -> c.doWithSchemaOps(s -> s.alterSchema(ident, changes)),
NoSuchSchemaException.class);
// If the Schema is maintained by the Gravitino's store, we don't have to
alter again.
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
index 404af695a..0562d99b7 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/TableOperationDispatcher.java
@@ -161,29 +161,25 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);
- doWithCatalog(
- catalogIdent,
- c ->
- c.doWithTableOps(
- t ->
- t.createTable(
- ident,
- columns,
- comment,
- updatedProperties,
- partitions == null ? EMPTY_TRANSFORM : partitions,
- distribution == null ? Distributions.NONE :
distribution,
- sortOrders == null ? new SortOrder[0] : sortOrders,
- indexes == null ? Indexes.EMPTY_INDEXES : indexes)),
- NoSuchSchemaException.class,
- TableAlreadyExistsException.class);
-
- // Retrieve the Table again to obtain some values generated by underlying
catalog
+ // we do not retrieve the table again (to obtain some values generated by
underlying catalog)
+ // since some catalogs' API is async and the table may not be created
immediately
Table table =
doWithCatalog(
catalogIdent,
- c -> c.doWithTableOps(t -> t.loadTable(ident)),
- NoSuchTableException.class);
+ c ->
+ c.doWithTableOps(
+ t ->
+ t.createTable(
+ ident,
+ columns,
+ comment,
+ updatedProperties,
+ partitions == null ? EMPTY_TRANSFORM : partitions,
+ distribution == null ? Distributions.NONE :
distribution,
+ sortOrders == null ? new SortOrder[0] : sortOrders,
+ indexes == null ? Indexes.EMPTY_INDEXES :
indexes)),
+ NoSuchSchemaException.class,
+ TableAlreadyExistsException.class);
TableEntity tableEntity =
TableEntity.builder()
@@ -229,7 +225,7 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
validateAlterProperties(ident,
HasPropertyMetadata::tablePropertiesMetadata, changes);
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
- Table tempAlteredTable =
+ Table alteredTable =
doWithCatalog(
catalogIdent,
c ->
@@ -238,16 +234,6 @@ public class TableOperationDispatcher extends
OperationDispatcher implements Tab
NoSuchTableException.class,
IllegalArgumentException.class);
- // Retrieve the Table again to obtain some values generated by underlying
catalog
- Table alteredTable =
- doWithCatalog(
- catalogIdent,
- c ->
- c.doWithTableOps(
- t ->
- t.loadTable(NameIdentifier.of(ident.namespace(),
tempAlteredTable.name()))),
- NoSuchTableException.class);
-
StringIdentifier stringId =
getStringIdFromProperties(alteredTable.properties());
// Case 1: The table is not created by Gravitino.
if (stringId == null) {
diff --git
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
index 981d999f9..ec3c08b33 100644
---
a/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
+++
b/core/src/main/java/org/apache/gravitino/catalog/TopicOperationDispatcher.java
@@ -143,18 +143,15 @@ public class TopicOperationDispatcher extends
OperationDispatcher implements Top
Map<String, String> updatedProperties =
StringIdentifier.newPropertiesWithId(stringId, properties);
- doWithCatalog(
- catalogIdent,
- c -> c.doWithTopicOps(t -> t.createTopic(ident, comment, dataLayout,
updatedProperties)),
- NoSuchSchemaException.class,
- TopicAlreadyExistsException.class);
-
- // Retrieve the Topic again to obtain some values generated by underlying
catalog
+ // we do not retrieve the topic again (to obtain some values generated by
underlying catalog)
+ // since some catalogs' API is async and the table may not be created
immediately
Topic topic =
doWithCatalog(
catalogIdent,
- c -> c.doWithTopicOps(t -> t.loadTopic(ident)),
- NoSuchTopicException.class);
+ c ->
+ c.doWithTopicOps(t -> t.createTopic(ident, comment,
dataLayout, updatedProperties)),
+ NoSuchSchemaException.class,
+ TopicAlreadyExistsException.class);
TopicEntity topicEntity =
TopicEntity.builder()
@@ -200,23 +197,16 @@ public class TopicOperationDispatcher extends
OperationDispatcher implements Top
validateAlterProperties(ident,
HasPropertyMetadata::topicPropertiesMetadata, changes);
NameIdentifier catalogIdent = getCatalogIdentifier(ident);
- Topic tempAlteredTopic =
+
+ // we do not retrieve the topic again (to obtain some values generated by
underlying catalog)
+ // since some catalogs' API is async and the table may not be created
immediately
+ Topic alteredTopic =
doWithCatalog(
catalogIdent,
c -> c.doWithTopicOps(t -> t.alterTopic(ident, changes)),
NoSuchTopicException.class,
IllegalArgumentException.class);
- // Retrieve the Topic again to obtain some values generated by underlying
catalog
- Topic alteredTopic =
- doWithCatalog(
- catalogIdent,
- c ->
- c.doWithTopicOps(
- t ->
- t.loadTopic(NameIdentifier.of(ident.namespace(),
tempAlteredTopic.name()))),
- NoSuchTopicException.class);
-
TopicEntity updatedTopicEntity =
operateOnEntity(
ident,
@@ -231,9 +221,9 @@ public class TopicOperationDispatcher extends
OperationDispatcher implements Top
.withName(topicEntity.name())
.withNamespace(ident.namespace())
.withComment(
- StringUtils.isBlank(tempAlteredTopic.comment())
+ StringUtils.isBlank(alteredTopic.comment())
? topicEntity.comment()
- : tempAlteredTopic.comment())
+ : alteredTopic.comment())
.withAuditInfo(
AuditInfo.builder()
.withCreator(topicEntity.auditInfo().creator())