This is an automated email from the ASF dual-hosted git repository.

jinsongzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/amoro.git


The following commit(s) were added to refs/heads/master by this push:
     new 19282f9cb [AMORO-3533] Avoid bulk table removal when synchronizing 
external catalogs and fetching database table listings fails (#3534)
19282f9cb is described below

commit 19282f9cb1e0fe68adb36c7216404a0faae1fd8e
Author: Jzjsnow <[email protected]>
AuthorDate: Tue Apr 29 14:20:45 2025 +0800

    [AMORO-3533] Avoid bulk table removal when synchronizing external catalogs 
and fetching database table listings fails (#3534)
    
    * [AMORO-3533] Avoid bulk table removal when synchronizing external 
catalogs and fetching database table listings fails
    
    * fixup! [AMORO-3533] Avoid bulk table removal when synchronizing external 
catalogs and fetching database table listings fails
    
    ---------
    
    Co-authored-by: jzjsnow <[email protected]>
---
 .../server/catalog/DefaultCatalogManager.java      |   6 +
 .../amoro/server/table/DefaultTableService.java    |  24 ++-
 .../table/TestSyncTableOfExternalCatalog.java      | 206 +++++++++++++++++++++
 3 files changed, 223 insertions(+), 13 deletions(-)

diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/DefaultCatalogManager.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/DefaultCatalogManager.java
index 668b9a5fc..fbd03eb6d 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/catalog/DefaultCatalogManager.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/catalog/DefaultCatalogManager.java
@@ -28,6 +28,7 @@ import org.apache.amoro.properties.CatalogMetaProperties;
 import org.apache.amoro.server.AmoroManagementConf;
 import org.apache.amoro.server.persistence.PersistentBase;
 import org.apache.amoro.server.persistence.mapper.CatalogMetaMapper;
+import 
org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting;
 import org.apache.amoro.shade.guava32.com.google.common.cache.CacheBuilder;
 import org.apache.amoro.shade.guava32.com.google.common.cache.CacheLoader;
 import org.apache.amoro.shade.guava32.com.google.common.cache.LoadingCache;
@@ -124,6 +125,11 @@ public class DefaultCatalogManager extends PersistentBase 
implements CatalogMana
     return serverCatalog;
   }
 
+  @VisibleForTesting
+  public void setServerCatalog(ServerCatalog catalog) {
+    serverCatalogMap.put(catalog.name(), catalog);
+  }
+
   @Override
   public InternalCatalog getInternalCatalog(String catalogName) {
     ServerCatalog serverCatalog = getServerCatalog(catalogName);
diff --git 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
index 614a1abc7..7dbe0dd59 100644
--- 
a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
+++ 
b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java
@@ -48,7 +48,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -275,18 +274,17 @@ public class DefaultTableService extends PersistentBase 
implements TableService
               try {
                 tableIdentifiersFutures.add(
                     CompletableFuture.supplyAsync(
-                        () -> {
-                          try {
-                            return 
externalCatalog.listTables(database).stream()
-                                .map(TableIdentity::new)
-                                .collect(Collectors.toSet());
-                          } catch (Exception e) {
-                            LOG.error(
-                                "TableExplorer list tables in database {} 
error", database, e);
-                            return new HashSet<>();
-                          }
-                        },
-                        tableExplorerExecutors));
+                            () ->
+                                externalCatalog.listTables(database).stream()
+                                    .map(TableIdentity::new)
+                                    .collect(Collectors.toSet()),
+                            tableExplorerExecutors)
+                        .exceptionally(
+                            ex -> {
+                              LOG.error(
+                                  "TableExplorer list tables in database {} 
error", database, ex);
+                              throw new RuntimeException(ex);
+                            }));
               } catch (RejectedExecutionException e) {
                 LOG.error(
                     "The queue of table explorer is full, please increase the 
queue size or thread count.");
diff --git 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java
 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java
index a5e8b1053..b8d2a32c5 100644
--- 
a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java
+++ 
b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestSyncTableOfExternalCatalog.java
@@ -21,17 +21,25 @@ package org.apache.amoro.server.table;
 import static org.apache.amoro.TableTestHelper.TEST_DB_NAME;
 import static org.apache.amoro.TableTestHelper.TEST_TABLE_NAME;
 import static org.apache.amoro.catalog.CatalogTestHelper.TEST_CATALOG_NAME;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import org.apache.amoro.BasicTableTestHelper;
+import org.apache.amoro.CommonUnifiedCatalog;
 import org.apache.amoro.ServerTableIdentifier;
 import org.apache.amoro.TableFormat;
 import org.apache.amoro.TableIDWithFormat;
 import org.apache.amoro.TableTestHelper;
 import org.apache.amoro.TestedCatalogs;
+import org.apache.amoro.UnifiedCatalog;
+import org.apache.amoro.api.CatalogMeta;
 import org.apache.amoro.api.TableIdentifier;
 import org.apache.amoro.catalog.CatalogTestHelper;
+import org.apache.amoro.catalog.CatalogTestHelpers;
 import org.apache.amoro.hive.catalog.HiveCatalogTestHelper;
 import org.apache.amoro.hive.catalog.HiveTableTestHelper;
+import org.apache.amoro.properties.CatalogMetaProperties;
 import org.apache.amoro.server.catalog.ExternalCatalog;
 import org.apache.amoro.server.catalog.ServerCatalog;
 import org.apache.amoro.server.manager.MetricManager;
@@ -39,12 +47,22 @@ import org.apache.amoro.server.metrics.MetricRegistry;
 import org.apache.amoro.server.persistence.PersistentBase;
 import org.apache.amoro.server.persistence.TableRuntimeMeta;
 import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.thrift.TException;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
+import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 @RunWith(Parameterized.class)
 public class TestSyncTableOfExternalCatalog extends AMSTableTestBase {
@@ -239,6 +257,194 @@ public class TestSyncTableOfExternalCatalog extends 
AMSTableTestBase {
     dropDatabase();
   }
 
+  @Mock private ExternalCatalog externalCatalog;
+
+  @Test
+  public void exploreExternalCatalog_ListDatabasesException() {
+    createTable();
+    ServerCatalog originalCatalog = 
catalogManager().getServerCatalog(TEST_CATALOG_NAME);
+
+    // test list tables
+    List<TableRuntimeMeta> tableRuntimeMetaListAfterAddTable = 
persistency.getTableRuntimeMetas();
+    Assert.assertEquals(1, tableRuntimeMetaListAfterAddTable.size());
+
+    MockitoAnnotations.openMocks(this);
+    when(externalCatalog.name()).thenReturn(TEST_CATALOG_NAME);
+    when(externalCatalog.toString()).thenReturn(originalCatalog.toString());
+    
when(externalCatalog.getMetadata()).thenReturn(originalCatalog.getMetadata());
+    when(externalCatalog.listDatabases()).thenThrow(new RuntimeException("List 
databases error"));
+
+    // This should throw an exception
+    Assert.assertThrows(
+        "List databases error",
+        RuntimeException.class,
+        () -> tableService().exploreExternalCatalog(externalCatalog));
+
+    verify(externalCatalog, times(1)).listDatabases();
+
+    // test tableRuntime not removed
+    List<TableRuntimeMeta> tableRuntimeMetaListAfterExploreCatalog =
+        persistency.getTableRuntimeMetas();
+    Assert.assertEquals(1, tableRuntimeMetaListAfterExploreCatalog.size());
+
+    dropTable();
+    dropDatabase();
+  }
+
+  @Test
+  public void exploreExternalCatalog_ListTablesException() {
+    createTable();
+    ServerCatalog originalCatalog = 
catalogManager().getServerCatalog(TEST_CATALOG_NAME);
+
+    // test list tables
+    List<TableRuntimeMeta> tableRuntimeMetaListAfterAddTable = 
persistency.getTableRuntimeMetas();
+    Assert.assertEquals(1, tableRuntimeMetaListAfterAddTable.size());
+
+    MockitoAnnotations.openMocks(this);
+    when(externalCatalog.name()).thenReturn(TEST_CATALOG_NAME);
+    when(externalCatalog.toString()).thenReturn(originalCatalog.toString());
+    
when(externalCatalog.getMetadata()).thenReturn(originalCatalog.getMetadata());
+    
when(externalCatalog.listDatabases()).thenReturn(originalCatalog.listDatabases());
+    when(externalCatalog.listTables(TEST_DB_NAME))
+        .thenThrow(new RuntimeException("List tables in " + TEST_DB_NAME + " 
error"));
+
+    // This should throw an exception
+    Assert.assertThrows(
+        "List tables in " + TEST_DB_NAME + " error",
+        RuntimeException.class,
+        () -> tableService().exploreExternalCatalog(externalCatalog));
+
+    verify(externalCatalog, times(1)).listTables(TEST_DB_NAME);
+
+    // test tableRuntime not removed
+    List<TableRuntimeMeta> tableRuntimeMetaListAfterExploreCatalog =
+        persistency.getTableRuntimeMetas();
+    Assert.assertEquals(1, tableRuntimeMetaListAfterExploreCatalog.size());
+
+    dropTable();
+    dropDatabase();
+  }
+
+  @Test
+  public void exploreMultipleExternalCatalog_ListTablesException() throws 
TException, IOException {
+    createTable();
+    ServerCatalog originalCatalog = 
catalogManager().getServerCatalog(TEST_CATALOG_NAME);
+
+    // test list tables
+    List<TableRuntimeMeta> tableRuntimeMetaListAfterAddTable = 
persistency.getTableRuntimeMetas();
+    Assert.assertEquals(1, tableRuntimeMetaListAfterAddTable.size());
+
+    MockitoAnnotations.openMocks(this);
+    when(externalCatalog.name()).thenReturn(TEST_CATALOG_NAME);
+    when(externalCatalog.toString()).thenReturn(originalCatalog.toString());
+    
when(externalCatalog.getMetadata()).thenReturn(originalCatalog.getMetadata());
+    
when(externalCatalog.listDatabases()).thenReturn(originalCatalog.listDatabases());
+    when(externalCatalog.listTables(TEST_DB_NAME))
+        .thenThrow(new RuntimeException("List tables in " + TEST_DB_NAME + " 
error"));
+
+    catalogManager().setServerCatalog(externalCatalog);
+
+    // create a new catalog new_catalog with a new db and a new table new_table
+    String newCatalogName = "new_catalog";
+    String newDbName = "new_db";
+    String newTableName = "new_table";
+    UnifiedCatalog newCatalog = createNewCatalogTable(newCatalogName, 
newDbName, newTableName);
+
+    // This should not throw an exception, and log an error message 
"TableExplorer error when
+    // explore table runtimes for catalog:test_catalog". Meanwhile, the newly 
added catalog can be
+    // synchronized normally
+    // without being affected.
+    tableService().exploreTableRuntimes();
+
+    // test tableRuntime of test_table not removed and tableRuntime of 
new_table added
+    List<TableRuntimeMeta> tableRuntimeMetaListAfterExploreCatalog =
+        persistency.getTableRuntimeMetas();
+    Assert.assertEquals(2, tableRuntimeMetaListAfterExploreCatalog.size());
+    Assert.assertTrue(
+        tableRuntimeMetaListAfterExploreCatalog.stream()
+            .anyMatch(tableRuntimeMeta -> 
tableRuntimeMeta.getTableName().equals(TEST_TABLE_NAME)));
+    Assert.assertTrue(
+        tableRuntimeMetaListAfterExploreCatalog.stream()
+            .anyMatch(tableRuntimeMeta -> 
tableRuntimeMeta.getTableName().equals(newTableName)));
+
+    // test tableRuntime of new_table removed
+    disposeNewCatalogTable(newCatalog, newCatalogName, newDbName, 
newTableName);
+    // This should not throw an exception, but log an error message 
"TableExplorer error when
+    // explore table runtimes for catalog:test_catalog". Meanwhile, the 
removed catalog and
+    // tableRuntime of new_table can
+    // be synchronized normally
+    // without being affected.
+    tableService().exploreTableRuntimes();
+
+    List<TableRuntimeMeta> tableRuntimeMetaListAfterDisposeNewCatalog =
+        persistency.getTableRuntimeMetas();
+    Assert.assertEquals(1, tableRuntimeMetaListAfterDisposeNewCatalog.size());
+    Assert.assertTrue(
+        tableRuntimeMetaListAfterExploreCatalog.stream()
+            .anyMatch(tableRuntimeMeta -> 
tableRuntimeMeta.getTableName().equals(TEST_TABLE_NAME)));
+    Assert.assertFalse(
+        tableRuntimeMetaListAfterDisposeNewCatalog.stream()
+            .anyMatch(tableRuntimeMeta -> 
tableRuntimeMeta.getTableName().equals(newTableName)));
+
+    catalogManager().setServerCatalog(originalCatalog);
+    dropTable();
+    dropDatabase();
+  }
+
+  private UnifiedCatalog createNewCatalogTable(String catalogName, String 
dbName, String tableName)
+      throws IOException, TException {
+    // create new catalog
+    String warehouseDir = temp.newFolder().getPath();
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put(CatalogMetaProperties.KEY_WAREHOUSE, warehouseDir);
+    CatalogMeta catalogMeta =
+        CatalogTestHelpers.buildCatalogMeta(
+            catalogName,
+            CatalogMetaProperties.CATALOG_TYPE_HADOOP,
+            properties,
+            TableFormat.ICEBERG);
+    UnifiedCatalog externalCatalog = new CommonUnifiedCatalog(() -> 
catalogMeta, Maps.newHashMap());
+    catalogManager().createCatalog(catalogMeta);
+
+    // create new database
+    try {
+      Database database = new Database();
+      database.setName(dbName);
+      TEST_HMS.getHiveClient().createDatabase(database);
+    } catch (AlreadyExistsException e) {
+      // pass
+    }
+    List<String> databases = externalCatalog.listDatabases();
+    if (!(databases != null && databases.contains(dbName))) {
+      externalCatalog.createDatabase(dbName);
+    }
+    // create new table
+    Map<String, String> catalogProperties = 
Maps.newHashMap(catalogMeta.getCatalogProperties());
+    catalogProperties.put(
+        CatalogUtil.ICEBERG_CATALOG_TYPE, 
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
+    org.apache.iceberg.catalog.Catalog catalog =
+        CatalogUtil.buildIcebergCatalog(catalogName, catalogProperties, new 
Configuration());
+    catalog.createTable(
+        org.apache.iceberg.catalog.TableIdentifier.of(dbName, tableName),
+        tableTestHelper().tableSchema(),
+        tableTestHelper().partitionSpec(),
+        tableTestHelper().tableProperties());
+
+    return externalCatalog;
+  }
+
+  private void disposeNewCatalogTable(
+      UnifiedCatalog externalCatalog, String catalogName, String dbName, 
String tableName)
+      throws TException {
+    // drop table
+    externalCatalog.dropTable(dbName, tableName, true);
+    // drop database
+    externalCatalog.dropDatabase(dbName);
+    TEST_HMS.getHiveClient().dropDatabase(dbName, false, true);
+    // drop catalog
+    catalogManager().dropCatalog(catalogName);
+  }
+
   private static class Persistency extends PersistentBase {
     public void addTableIdentifier(
         String catalog, String database, String tableName, TableFormat format) 
{

Reply via email to