This is an automated email from the ASF dual-hosted git repository.
jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git
The following commit(s) were added to refs/heads/master by this push:
new 19282f9cb [AMORO-3533] Avoid bulk table removal when synchronizing
external catalogs and fetching database table listings fails (#3534)
19282f9cb is described below
commit 19282f9cb1e0fe68adb36c7216404a0faae1fd8e
Author: Jzjsnow <[email protected]>
AuthorDate: Tue Apr 29 14:20:45 2025 +0800
[AMORO-3533] Avoid bulk table removal when synchronizing external catalogs
and fetching database table listings fails (#3534)
* [AMORO-3533] Avoid bulk table removal when synchronizing external
catalogs and fetching database table listings fails
* fixup! [AMORO-3533] Avoid bulk table removal when synchronizing external
catalogs and fetching database table listings fails
---------
Co-authored-by: jzjsnow <[email protected]>
---
.../server/catalog/DefaultCatalogManager.java | 6 +
.../amoro/server/table/DefaultTableService.java | 24 ++-
.../table/TestSyncTableOfExternalCatalog.java | 206 +++++++++++++++++++++
3 files changed, 223 insertions(+), 13 deletions(-)
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/DefaultCatalogManager.java
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/DefaultCatalogManager.java
index 668b9a5fc..fbd03eb6d 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/DefaultCatalogManager.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/DefaultCatalogManager.java
@@ -28,6 +28,7 @@ import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.server.AmoroManagementConf;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
+import
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
import org.apache.amoro.shade.guava32.com.google.common.cache.CacheBuilder;
import org.apache.amoro.shade.guava32.com.google.common.cache.CacheLoader;
import org.apache.amoro.shade.guava32.com.google.common.cache.LoadingCache;
@@ -124,6 +125,11 @@ public class DefaultCatalogManager extends PersistentBase
implements CatalogMana
return serverCatalog;
}
+ @VisibleForTesting
+ public void setServerCatalog(ServerCatalog catalog) {
+ serverCatalogMap.put(catalog.name(), catalog);
+ }
+
@Override
public InternalCatalog getInternalCatalog(String catalogName) {
ServerCatalog serverCatalog = getServerCatalog(catalogName);
diff --git
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
index 614a1abc7..7dbe0dd59 100644
---
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
+++
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
@@ -48,7 +48,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -275,18 +274,17 @@ public class DefaultTableService extends PersistentBase
implements TableService
try {
tableIdentifiersFutures.add(
CompletableFuture.supplyAsync(
- () -> {
- try {
- return
externalCatalog.listTables(database).stream()
- .map(TableIdentity::new)
- .collect(Collectors.toSet());
- } catch (Exception e) {
- LOG.error(
- "TableExplorer list tables in database {}
error", database, e);
- return new HashSet<>();
- }
- },
- tableExplorerExecutors));
+ () ->
+ externalCatalog.listTables(database).stream()
+ .map(TableIdentity::new)
+ .collect(Collectors.toSet()),
+ tableExplorerExecutors)
+ .exceptionally(
+ ex -> {
+ LOG.error(
+ "TableExplorer list tables in database {}
error", database, ex);
+ throw new RuntimeException(ex);
+ }));
} catch (RejectedExecutionException e) {
LOG.error(
"The queue of table explorer is full, please increase the
queue size or thread count.");
diff --git
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java
index a5e8b1053..b8d2a32c5 100644
---
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java
+++
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java
@@ -21,17 +21,25 @@ package org.apache.amoro.server.table;
import static org.apache.amoro.TableTestHelper.TEST_DB_NAME;
import static org.apache.amoro.TableTestHelper.TEST_TABLE_NAME;
import static org.apache.amoro.catalog.CatalogTestHelper.TEST_CATALOG_NAME;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.CommonUnifiedCatalog;
import org.apache.amoro.ServerTableIdentifier;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableIDWithFormat;
import org.apache.amoro.TableTestHelper;
import org.apache.amoro.TestedCatalogs;
+import org.apache.amoro.UnifiedCatalog;
+import org.apache.amoro.api.CatalogMeta;
import org.apache.amoro.api.TableIdentifier;
import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.catalog.CatalogTestHelpers;
import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
import org.apache.amoro.hive.catalog.HiveTableTestHelper;
+import org.apache.amoro.properties.CatalogMetaProperties;
import org.apache.amoro.server.catalog.ExternalCatalog;
import org.apache.amoro.server.catalog.ServerCatalog;
import org.apache.amoro.server.manager.MetricManager;
@@ -39,12 +47,22 @@ import org.apache.amoro.server.metrics.MetricRegistry;
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.server.persistence.TableRuntimeMeta;
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import java.io.IOException;
import java.util.List;
+import java.util.Map;
@RunWith(Parameterized.class)
public class TestSyncTableOfExternalCatalog extends AMSTableTestBase {
@@ -239,6 +257,194 @@ public class TestSyncTableOfExternalCatalog extends
AMSTableTestBase {
dropDatabase();
}
+ @Mock private ExternalCatalog externalCatalog;
+
+ @Test
+ public void exploreExternalCatalog_ListDatabasesException() {
+ createTable();
+ ServerCatalog originalCatalog =
catalogManager().getServerCatalog(TEST_CATALOG_NAME);
+
+ // test list tables
+ List<TableRuntimeMeta> tableRuntimeMetaListAfterAddTable =
persistency.getTableRuntimeMetas();
+ Assert.assertEquals(1, tableRuntimeMetaListAfterAddTable.size());
+
+ MockitoAnnotations.openMocks(this);
+ when(externalCatalog.name()).thenReturn(TEST_CATALOG_NAME);
+ when(externalCatalog.toString()).thenReturn(originalCatalog.toString());
+
when(externalCatalog.getMetadata()).thenReturn(originalCatalog.getMetadata());
+ when(externalCatalog.listDatabases()).thenThrow(new RuntimeException("List
databases error"));
+
+ // This should throw an exception
+ Assert.assertThrows(
+ "List databases error",
+ RuntimeException.class,
+ () -> tableService().exploreExternalCatalog(externalCatalog));
+
+ verify(externalCatalog, times(1)).listDatabases();
+
+ // test tableRuntime not removed
+ List<TableRuntimeMeta> tableRuntimeMetaListAfterExploreCatalog =
+ persistency.getTableRuntimeMetas();
+ Assert.assertEquals(1, tableRuntimeMetaListAfterExploreCatalog.size());
+
+ dropTable();
+ dropDatabase();
+ }
+
+ @Test
+ public void exploreExternalCatalog_ListTablesException() {
+ createTable();
+ ServerCatalog originalCatalog =
catalogManager().getServerCatalog(TEST_CATALOG_NAME);
+
+ // test list tables
+ List<TableRuntimeMeta> tableRuntimeMetaListAfterAddTable =
persistency.getTableRuntimeMetas();
+ Assert.assertEquals(1, tableRuntimeMetaListAfterAddTable.size());
+
+ MockitoAnnotations.openMocks(this);
+ when(externalCatalog.name()).thenReturn(TEST_CATALOG_NAME);
+ when(externalCatalog.toString()).thenReturn(originalCatalog.toString());
+
when(externalCatalog.getMetadata()).thenReturn(originalCatalog.getMetadata());
+
when(externalCatalog.listDatabases()).thenReturn(originalCatalog.listDatabases());
+ when(externalCatalog.listTables(TEST_DB_NAME))
+ .thenThrow(new RuntimeException("List tables in " + TEST_DB_NAME + "
error"));
+
+ // This should throw an exception
+ Assert.assertThrows(
+ "List tables in " + TEST_DB_NAME + " error",
+ RuntimeException.class,
+ () -> tableService().exploreExternalCatalog(externalCatalog));
+
+ verify(externalCatalog, times(1)).listTables(TEST_DB_NAME);
+
+ // test tableRuntime not removed
+ List<TableRuntimeMeta> tableRuntimeMetaListAfterExploreCatalog =
+ persistency.getTableRuntimeMetas();
+ Assert.assertEquals(1, tableRuntimeMetaListAfterExploreCatalog.size());
+
+ dropTable();
+ dropDatabase();
+ }
+
+ @Test
+ public void exploreMultipleExternalCatalog_ListTablesException() throws
TException, IOException {
+ createTable();
+ ServerCatalog originalCatalog =
catalogManager().getServerCatalog(TEST_CATALOG_NAME);
+
+ // test list tables
+ List<TableRuntimeMeta> tableRuntimeMetaListAfterAddTable =
persistency.getTableRuntimeMetas();
+ Assert.assertEquals(1, tableRuntimeMetaListAfterAddTable.size());
+
+ MockitoAnnotations.openMocks(this);
+ when(externalCatalog.name()).thenReturn(TEST_CATALOG_NAME);
+ when(externalCatalog.toString()).thenReturn(originalCatalog.toString());
+
when(externalCatalog.getMetadata()).thenReturn(originalCatalog.getMetadata());
+
when(externalCatalog.listDatabases()).thenReturn(originalCatalog.listDatabases());
+ when(externalCatalog.listTables(TEST_DB_NAME))
+ .thenThrow(new RuntimeException("List tables in " + TEST_DB_NAME + "
error"));
+
+ catalogManager().setServerCatalog(externalCatalog);
+
+ // create a new catalog new_catalog with a new db and a new table new_table
+ String newCatalogName = "new_catalog";
+ String newDbName = "new_db";
+ String newTableName = "new_table";
+ UnifiedCatalog newCatalog = createNewCatalogTable(newCatalogName,
newDbName, newTableName);
+
+ // This should not throw an exception, and log an error message
"TableExplorer error when
+ // explore table runtimes for catalog:test_catalog". Meanwhile, the newly
added catalog can be
+ // synchronized normally
+ // without being affected.
+ tableService().exploreTableRuntimes();
+
+ // test tableRuntime of test_table not removed and tableRuntime of
new_table added
+ List<TableRuntimeMeta> tableRuntimeMetaListAfterExploreCatalog =
+ persistency.getTableRuntimeMetas();
+ Assert.assertEquals(2, tableRuntimeMetaListAfterExploreCatalog.size());
+ Assert.assertTrue(
+ tableRuntimeMetaListAfterExploreCatalog.stream()
+ .anyMatch(tableRuntimeMeta ->
tableRuntimeMeta.getTableName().equals(TEST_TABLE_NAME)));
+ Assert.assertTrue(
+ tableRuntimeMetaListAfterExploreCatalog.stream()
+ .anyMatch(tableRuntimeMeta ->
tableRuntimeMeta.getTableName().equals(newTableName)));
+
+ // test tableRuntime of new_table removed
+ disposeNewCatalogTable(newCatalog, newCatalogName, newDbName,
newTableName);
+ // This should not throw an exception, but log an error message
"TableExplorer error when
+ // explore table runtimes for catalog:test_catalog". Meanwhile, the
removed catalog and
+ // tableRuntime of new_table can
+ // be synchronized normally
+ // without being affected.
+ tableService().exploreTableRuntimes();
+
+ List<TableRuntimeMeta> tableRuntimeMetaListAfterDisposeNewCatalog =
+ persistency.getTableRuntimeMetas();
+ Assert.assertEquals(1, tableRuntimeMetaListAfterDisposeNewCatalog.size());
+ Assert.assertTrue(
+ tableRuntimeMetaListAfterExploreCatalog.stream()
+ .anyMatch(tableRuntimeMeta ->
tableRuntimeMeta.getTableName().equals(TEST_TABLE_NAME)));
+ Assert.assertFalse(
+ tableRuntimeMetaListAfterDisposeNewCatalog.stream()
+ .anyMatch(tableRuntimeMeta ->
tableRuntimeMeta.getTableName().equals(newTableName)));
+
+ catalogManager().setServerCatalog(originalCatalog);
+ dropTable();
+ dropDatabase();
+ }
+
+ private UnifiedCatalog createNewCatalogTable(String catalogName, String
dbName, String tableName)
+ throws IOException, TException {
+ // create new catalog
+ String warehouseDir = temp.newFolder().getPath();
+ Map<String, String> properties = Maps.newHashMap();
+ properties.put(CatalogMetaProperties.KEY_WAREHOUSE, warehouseDir);
+ CatalogMeta catalogMeta =
+ CatalogTestHelpers.buildCatalogMeta(
+ catalogName,
+ CatalogMetaProperties.CATALOG_TYPE_HADOOP,
+ properties,
+ TableFormat.ICEBERG);
+ UnifiedCatalog externalCatalog = new CommonUnifiedCatalog(() ->
catalogMeta, Maps.newHashMap());
+ catalogManager().createCatalog(catalogMeta);
+
+ // create new database
+ try {
+ Database database = new Database();
+ database.setName(dbName);
+ TEST_HMS.getHiveClient().createDatabase(database);
+ } catch (AlreadyExistsException e) {
+ // pass
+ }
+ List<String> databases = externalCatalog.listDatabases();
+ if (!(databases != null && databases.contains(dbName))) {
+ externalCatalog.createDatabase(dbName);
+ }
+ // create new table
+ Map<String, String> catalogProperties =
Maps.newHashMap(catalogMeta.getCatalogProperties());
+ catalogProperties.put(
+ CatalogUtil.ICEBERG_CATALOG_TYPE,
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+ org.apache.iceberg.catalog.Catalog catalog =
+ CatalogUtil.buildIcebergCatalog(catalogName, catalogProperties, new
Configuration());
+ catalog.createTable(
+ org.apache.iceberg.catalog.TableIdentifier.of(dbName, tableName),
+ tableTestHelper().tableSchema(),
+ tableTestHelper().partitionSpec(),
+ tableTestHelper().tableProperties());
+
+ return externalCatalog;
+ }
+
+ private void disposeNewCatalogTable(
+ UnifiedCatalog externalCatalog, String catalogName, String dbName,
String tableName)
+ throws TException {
+ // drop table
+ externalCatalog.dropTable(dbName, tableName, true);
+ // drop database
+ externalCatalog.dropDatabase(dbName);
+ TEST_HMS.getHiveClient().dropDatabase(dbName, false, true);
+ // drop catalog
+ catalogManager().dropCatalog(catalogName);
+ }
+
private static class Persistency extends PersistentBase {
public void addTableIdentifier(
String catalog, String database, String tableName, TableFormat format)
{