This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 1cead451147fe4afd0e4c2c3a5d6e78da84c2025
Author: Riza Suminto <riza.sumi...@cloudera.com>
AuthorDate: Thu May 22 10:29:38 2025 -0700

    IMPALA-13947: Test local catalog mode by default
    
    Local catalog mode has been the default and works well in downstream
    Impala for over 5 years. This patch turn on local catalog mode by
    default (--catalog_topic_mode=minimal and --use_local_catalog=true) as
    preferred mode going forward.
    
    Implemented LocalCatalog.setIsReady() to facilitate using local catalog
    mode for FE tests. Some FE tests fail due to behavior differences in
    local catalog mode like IMPALA-7539. This is probably OK since Impala
    now largely hand over FileSystem permission check to Apache Ranger.
    
    The following custom cluster tests are pinned to evaluate under legacy
    catalog mode because their behavior changed in local catalog mode:
    
    TestCalcitePlanner.test_calcite_frontend
    TestCoordinators.test_executor_only_lib_cache
    TestMetadataReplicas
    TestTupleCacheCluster
    TestWorkloadManagementSQLDetailsCalcite.test_tpcds_8_decimal
    
    At TestHBaseHmsColumnOrder.test_hbase_hms_column_order, set
    --use_hms_column_order_for_hbase_tables=true flag for both impalad and
    catalogd to get consistent column order in either local or legacy
    catalog mode.
    
    Changed TestCatalogRpcErrors.test_register_subscriber_rpc_error
    assertions to be more fine grained by matching individual query id.
    
    Move most of test methods from TestRangerLegacyCatalog to
    TestRangerLocalCatalog, except for some that do need to run in legacy
    catalog mode. Also renamed TestRangerLocalCatalog to
    TestRangerDefaultCatalog. Table ownership issue in local catalog mode
    remains unresolved (see IMPALA-8937).
    
    Testing:
    Pass exhaustive tests.
    
    Change-Id: Ie303e294972d12b98f8354bf6bbc6d0cb920060f
    Reviewed-on: http://gerrit.cloudera.org:8080/23080
    Reviewed-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
    Tested-by: Impala Public Jenkins <impala-public-jenk...@cloudera.com>
---
 be/src/catalog/catalog-server.cc                   |   2 +-
 be/src/runtime/exec-env.cc                         |   2 +-
 be/src/testutil/in-process-servers.h               |   2 +-
 .../impala/catalog/local/CatalogdMetaProvider.java |   9 ++
 .../apache/impala/catalog/local/LocalCatalog.java  |   4 +-
 .../apache/impala/catalog/local/MetaProvider.java  |   6 +
 .../org/apache/impala/service/JniFrontend.java     |   1 +
 .../apache/impala/analysis/AnalyzeStmtsTest.java   |  36 +++---
 .../catalog/CatalogObjectToFromThriftTest.java     |   9 +-
 .../org/apache/impala/catalog/CatalogTest.java     |  18 ++-
 .../queries/QueryTest/kudu-timeouts-impalad.test   |   2 +-
 tests/authorization/test_ranger.py                 | 126 +++++++++++----------
 tests/common/impala_connection.py                  |   7 +-
 tests/custom_cluster/test_calcite_planner.py       |   6 +-
 tests/custom_cluster/test_coordinators.py          |   7 +-
 .../custom_cluster/test_hbase_hms_column_order.py  |   5 +-
 tests/custom_cluster/test_metadata_replicas.py     |   6 +-
 tests/custom_cluster/test_query_live.py            |   3 +-
 tests/custom_cluster/test_services_rpc_errors.py   |  13 ++-
 tests/custom_cluster/test_tuple_cache.py           |   7 +-
 tests/custom_cluster/test_web_pages.py             |   7 +-
 .../test_workload_mgmt_sql_details.py              |   9 +-
 22 files changed, 178 insertions(+), 109 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index e4fa671c7..63b054109 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -53,7 +53,7 @@ static const auto gt_0 = [](const char* name, int32_t val) {
 };
 
 DEFINE_int32(catalog_service_port, 26000, "port where the CatalogService is 
running");
-DEFINE_string(catalog_topic_mode, "full",
+DEFINE_string(catalog_topic_mode, "minimal",
     "The type of data that the catalog service will publish into the Catalog "
     "StateStore topic. Valid values are 'full', 'mixed', or 'minimal'.\n"
     "\n"
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index fd7658527..9bfa4beab 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -106,7 +106,7 @@ DEFINE_int32(admission_control_slots, 0,
 DEFINE_string(codegen_cache_capacity, "1GB",
     "Specify the capacity of the codegen cache. If set to 0, codegen cache is 
disabled.");
 
-DEFINE_bool(use_local_catalog, false,
+DEFINE_bool(use_local_catalog, true,
     "Use the on-demand metadata feature in coordinators. If this is set, 
coordinators "
     "pull metadata as needed from catalogd and cache it locally. The cached 
metadata "
     "gets evicted automatically under memory pressure or after an expiration 
time.");
diff --git a/be/src/testutil/in-process-servers.h 
b/be/src/testutil/in-process-servers.h
index 6279ba684..ca0a28e5a 100644
--- a/be/src/testutil/in-process-servers.h
+++ b/be/src/testutil/in-process-servers.h
@@ -49,7 +49,7 @@ class InProcessImpalaServer {
   /// ports used by a concurrently running normal Impala daemon. The hostname 
is set to
   /// "localhost" and the ports are picked from the ephemeral port range and 
exposed as
   /// member variables. Internally this will call StartWithClientservers() and
-  /// SetCatalogInitialized(). The default values for statestore_host and 
statestore_port
+  /// SetCatalogIsReady(). The default values for statestore_host and 
statestore_port
   /// indicate that a statestore connection should not be used. These values 
are directly
   /// forwarded to the ExecEnv. Returns ok and sets *server on success. On 
failure returns
   /// an error. *server may or may not be set on error, but is always invalid 
to use.
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 6734726bf..7db96fdcc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -446,6 +446,15 @@ public class CatalogdMetaProvider implements MetaProvider {
     return lastSeenCatalogVersion_.get() > Catalog.INITIAL_CATALOG_VERSION;
   }
 
+  // Only used for testing.
+  @Override
+  public void setIsReady(boolean isReady) {
+    lastSeenCatalogVersion_.incrementAndGet();
+    synchronized (catalogReadyNotifier_) {
+      catalogReadyNotifier_.notifyAll();
+    }
+  }
+
   public void setAuthzChecker(
       AtomicReference<? extends AuthorizationChecker> authzChecker) {
     authzChecker_ = authzChecker;
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java 
b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
index b689d0edc..6122fb309 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalCatalog.java
@@ -329,10 +329,10 @@ public class LocalCatalog implements FeCatalog {
     return metaProvider_.isReady();
   }
 
+  // Only used for testing.
   @Override
   public void setIsReady(boolean isReady) {
-    // No-op for local catalog.
-    // This appears to only be used in some tests.
+    metaProvider_.setIsReady(isReady);
   }
 
   public MetaProvider getMetaProvider() {
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index 992e7993b..0666d07d9 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -75,6 +75,12 @@ public interface MetaProvider {
    */
   void waitForIsReady(long timeoutMs);
 
+  /**
+   * Force the MetaProvider into a particular readiness state.
+   * Used only by tests.
+   */
+  default void setIsReady(boolean isReady) { /* NOOP */}
+
   ImmutableList<String> loadDbList() throws TException;
 
   Database loadDb(String dbName) throws TException;
diff --git a/fe/src/main/java/org/apache/impala/service/JniFrontend.java 
b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
index db419a78b..de476bbc0 100644
--- a/fe/src/main/java/org/apache/impala/service/JniFrontend.java
+++ b/fe/src/main/java/org/apache/impala/service/JniFrontend.java
@@ -661,6 +661,7 @@ public class JniFrontend {
 
   public void setCatalogIsReady() {
     Preconditions.checkNotNull(frontend_);
+    LOG.info("Forcing catalog to ready state for testing");
     frontend_.getCatalog().setIsReady(true);
   }
 
diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java 
b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
index 87fafd003..4d9811bc3 100644
--- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java
@@ -4008,13 +4008,19 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
           "functional_text_gzip.jointbl",
           "/test-warehouse/alltypes_text_gzip/year=2009/month=4", overwrite));
 
-      // Verify with a read-only table
-      AnalysisError(String.format("load data inpath '%s' into table " +
-          "functional_seq.alltypes partition(year=2009, month=3)",
-          "/test-warehouse/alltypes_seq/year=2009/month=5", overwrite),
-          "Unable to LOAD DATA into target table (functional_seq.alltypes) 
because " +
-          "Impala does not have WRITE access to HDFS location: " +
-          
"hdfs://localhost:20500/test-warehouse/alltypes_seq/year=2009/month=3");
+      if (!BackendConfig.INSTANCE.isMinimalTopicMode()) {
+        // Verify with a read-only table.
+        // This test does not work in minimal topic mode because minimal topic 
mode
+        // always assumeReadWriteAccess=true (see IMPALA-7539).
+        String query =
+            String.format("load data inpath '%s' into table 
functional_seq.alltypes "
+                    + "partition(year=2009, month=3)",
+                "/test-warehouse/alltypes_seq/year=2009/month=5");
+        AnalysisError(query,
+            "Unable to LOAD DATA into target table (functional_seq.alltypes) 
because "
+                + "Impala does not have WRITE access to HDFS location: "
+                + 
"hdfs://localhost:20500/test-warehouse/alltypes_seq/year=2009/month=3");
+      }
     }
   }
 
@@ -4043,12 +4049,16 @@ public class AnalyzeStmtsTest extends AnalyzerTest {
       testInsertWithPermutation(qualifier);
     }
 
-    // Test INSERT into a table that Impala does not have WRITE access to.
-    AnalysisError("insert into functional_seq.alltypes partition(year, month)" 
+
-        "select * from functional.alltypes",
-        "Unable to INSERT into target table (functional_seq.alltypes) because 
Impala " +
-        "does not have WRITE access to HDFS location: " +
-        "hdfs://localhost:20500/test-warehouse/alltypes_seq");
+    if (!BackendConfig.INSTANCE.isMinimalTopicMode()) {
+      // Test INSERT into a table that Impala does not have WRITE access to.
+      // This test does not work in minimal topic mode because minimal topic 
mode
+      // always assumeReadWriteAccess=true (see IMPALA-7539).
+      AnalysisError("insert into functional_seq.alltypes partition(year, 
month)"
+              + "select * from functional.alltypes",
+          "Unable to INSERT into target table (functional_seq.alltypes) 
because Impala "
+              + "does not have WRITE access to HDFS location: "
+              + "hdfs://localhost:20500/test-warehouse/alltypes_seq");
+    }
 
     // Insert with a correlated inline view.
     AnalyzesOk("insert into table functional.alltypessmall " +
diff --git 
a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java 
b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
index 5e2959613..eef796b0a 100644
--- 
a/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
+++ 
b/fe/src/test/java/org/apache/impala/catalog/CatalogObjectToFromThriftTest.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import org.apache.impala.analysis.LiteralExpr;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.SqlCastException;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.testutil.CatalogServiceTestCatalog;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.thrift.CatalogObjectsConstants;
@@ -107,9 +108,11 @@ public class CatalogObjectToFromThriftTest {
 
         // Verify the partition access level is getting set properly. The 
alltypes_seq
         // table has two partitions that are read_only.
-        if (dbName.equals("functional_seq") && (
-            hdfsPart.getPartitionName().equals("year=2009/month=1") ||
-            hdfsPart.getPartitionName().equals("year=2009/month=3"))) {
+        // TAccessLevel is always READ_WRITE in minimal topic mode (see 
IMPALA-7539).
+        if (!BackendConfig.INSTANCE.isMinimalTopicMode()
+            && dbName.equals("functional_seq")
+            && (hdfsPart.getPartitionName().equals("year=2009/month=1")
+                || hdfsPart.getPartitionName().equals("year=2009/month=3"))) {
           Assert.assertEquals(TAccessLevel.READ_ONLY, 
hdfsPart.getAccessLevel());
         } else {
           Assert.assertEquals(TAccessLevel.READ_WRITE, 
hdfsPart.getAccessLevel());
diff --git a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java 
b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
index 01dc27605..661bbb8b5 100644
--- a/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/CatalogTest.java
@@ -57,6 +57,7 @@ import 
org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
 import org.apache.impala.common.ImpalaException;
 import org.apache.impala.common.Reference;
 import org.apache.impala.compat.MetastoreShim;
+import org.apache.impala.service.BackendConfig;
 import org.apache.impala.testutil.CatalogServiceTestCatalog;
 import org.apache.impala.testutil.TestUtils;
 import org.apache.impala.thrift.TFunctionBinaryType;
@@ -367,7 +368,7 @@ public class CatalogTest {
         /*tblWasRemoved=*/new Reference<Boolean>(),
         /*dbWasAdded=*/new Reference<Boolean>(), NoOpEventSequence.INSTANCE);
 
-    HdfsTable table = (HdfsTable)catalog_.getOrLoadTable("functional", 
"AllTypes",
+    HdfsTable table = (HdfsTable)catalog_.getOrLoadTable("functional", 
"alltypes",
         "test", null);
     StorageStatistics opsCounts = stats.get(DFSOpsCountStatistics.NAME);
 
@@ -375,7 +376,9 @@ public class CatalogTest {
     // - one listLocatedStatus() per partition, to get the file info
     // - one listStatus() for the month=2010/ dir
     // - one listStatus() for the month=2009/ dir
-    long expectedCalls = table.getPartitionIds().size() + 2;
+    // If in minimal topic mode, the last two listStatus() are not made.
+    boolean isMinimalTopicMode = BackendConfig.INSTANCE.isMinimalTopicMode();
+    long expectedCalls = table.getPartitionIds().size() + (isMinimalTopicMode 
? 0 : 2);
     // Due to HDFS-13747, the listStatus calls are incorrectly accounted as
     // op_list_located_status. So, we'll just add up the two to make our
     // assertion resilient to this bug.
@@ -383,8 +386,10 @@ public class CatalogTest {
         opsCounts.getLong(LIST_STATUS);
     assertEquals(expectedCalls, seenCalls);
 
-    // We expect only one getFileStatus call, for the top-level directory.
-    assertEquals(1L, (long)opsCounts.getLong(GET_FILE_STATUS));
+    long expectedGetFileStatus = isMinimalTopicMode ? 0L : 1L;
+    // If not in minimal topic mode, we expect only one getFileStatus call,
+    // for the top-level directory.
+    assertEquals(expectedGetFileStatus, 
(long)opsCounts.getLong(GET_FILE_STATUS));
 
     // None of the underlying files changed so we should not do any ops for 
the files.
     assertEquals(0L, (long)opsCounts.getLong(GET_FILE_BLOCK_LOCS));
@@ -393,8 +398,9 @@ public class CatalogTest {
     stats.reset();
     catalog_.reloadTable(table, "test", NoOpEventSequence.INSTANCE);
 
-    // Again, we expect only one getFileStatus call, for the top-level 
directory.
-    assertEquals(1L, (long)opsCounts.getLong(GET_FILE_STATUS));
+    // Again, we expect only one getFileStatus call if not in minimal topic 
mode,
+    // for the top-level directory.
+    assertEquals(expectedGetFileStatus, 
(long)opsCounts.getLong(GET_FILE_STATUS));
     // REFRESH calls listStatus on each of the partitions, but doesn't re-check
     // the permissions of the partition directories themselves.
     seenCalls = opsCounts.getLong(LIST_LOCATED_STATUS) +
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test
 
b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test
index cde4df5df..84b9f6f65 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/kudu-timeouts-impalad.test
@@ -11,5 +11,5 @@
 # TODO: improve error messages (here and below) when KUDU-1734 is resolved
 select * from functional_kudu.alltypes
 ---- CATCH
-Unable to initialize the Kudu scan node
+Kudu error: cannot complete before timeout
 ====
diff --git a/tests/authorization/test_ranger.py 
b/tests/authorization/test_ranger.py
index 171c4d232..6e9f7fc22 100644
--- a/tests/authorization/test_ranger.py
+++ b/tests/authorization/test_ranger.py
@@ -55,14 +55,16 @@ ERROR_REVOKE = "User doesn't have necessary permission to 
revoke access"
 RANGER_AUTH = ("admin", "admin")
 RANGER_HOST = "http://localhost:6080";
 REST_HEADERS = {"Content-Type": "application/json", "Accept": 
"application/json"}
-IMPALAD_ARGS = "--server-name=server1 --ranger_service_type=hive " \
-               "--ranger_app_id=impala --authorization_provider=ranger"
-CATALOGD_ARGS = "--server-name=server1 --ranger_service_type=hive " \
-                "--ranger_app_id=impala --authorization_provider=ranger"
+LEGACY_CATALOG_IMPALAD_ARGS = "--server-name=server1 
--ranger_service_type=hive " \
+    "--ranger_app_id=impala --authorization_provider=ranger " \
+    "--use_local_catalog=false"
+LEGACY_CATALOG_CATALOGD_ARGS = "--server-name=server1 
--ranger_service_type=hive " \
+    "--ranger_app_id=impala --authorization_provider=ranger " \
+    "--catalog_topic_mode=full"
 
-LOCAL_CATALOG_IMPALAD_ARGS = "--server-name=server1 --ranger_service_type=hive 
" \
+IMPALAD_ARGS = "--server-name=server1 --ranger_service_type=hive " \
     "--ranger_app_id=impala --authorization_provider=ranger 
--use_local_catalog=true"
-LOCAL_CATALOG_CATALOGD_ARGS = "--server-name=server1 
--ranger_service_type=hive " \
+CATALOGD_ARGS = "--server-name=server1 --ranger_service_type=hive " \
     "--ranger_app_id=impala --authorization_provider=ranger 
--catalog_topic_mode=minimal"
 
 LOG = logging.getLogger('impala_test_suite')
@@ -1569,8 +1571,8 @@ class TestRangerIndependent(TestRanger):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS,
-    catalogd_args=CATALOGD_ARGS + " --hms_event_polling_interval_s=5")
+    impalad_args=LEGACY_CATALOG_IMPALAD_ARGS,
+    catalogd_args=LEGACY_CATALOG_CATALOGD_ARGS + " 
--hms_event_polling_interval_s=5")
   def test_alter_owner_hms_event_sync(self, unique_name):
     """Test Impala queries that depends on database ownership changes in Hive.
        Use a longer polling interval to mimic lag in event processing."""
@@ -1645,6 +1647,8 @@ class TestRangerIndependent(TestRanger):
       # SHOW TABLES should fail since user is not the owner of this db
       self.execute_query_expect_failure(user_client, "show tables in " + 
test_db)
       change_db_owner_to_user()
+      # IMPALA-8937: In local catalog mode, table ownership info is not 
reloaded
+      # automatically, and this assert will fail.
       assert ["foo"] == self.all_table_names(user_client, test_db)
       reset_db_owner_to_admin()
     finally:
@@ -1652,18 +1656,18 @@ class TestRangerIndependent(TestRanger):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-    impalad_args="{0} {1}".format(IMPALAD_ARGS,
+    impalad_args="{0} {1}".format(LEGACY_CATALOG_IMPALAD_ARGS,
                                   
"--allow_catalog_cache_op_from_masked_users=true"),
-    catalogd_args=CATALOGD_ARGS,
+    catalogd_args=LEGACY_CATALOG_CATALOGD_ARGS,
     disable_log_buffering=True)
-  def test_allow_metadata_update(self, unique_name):
+  def test_allow_metadata_update_legacy_catalog(self, unique_name):
     self._test_allow_catalog_cache_op_from_masked_users(unique_name)
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-    impalad_args="{0} {1}".format(LOCAL_CATALOG_IMPALAD_ARGS,
+    impalad_args="{0} {1}".format(IMPALAD_ARGS,
                                   
"--allow_catalog_cache_op_from_masked_users=true"),
-    catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS,
+    catalogd_args=CATALOGD_ARGS,
     disable_log_buffering=True)
   def test_allow_metadata_update_local_catalog(self, unique_name):
     self._test_allow_catalog_cache_op_from_masked_users(unique_name)
@@ -1774,8 +1778,8 @@ class TestRangerIndependent(TestRanger):
 
   @pytest.mark.execute_serially
   @CustomClusterTestSuite.with_args(
-    impalad_args=LOCAL_CATALOG_IMPALAD_ARGS,
-    catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS,
+    impalad_args=IMPALAD_ARGS,
+    catalogd_args=CATALOGD_ARGS,
     # We additionally set 'reset_ranger' to True, to reset all the policies in 
the
     # Ranger service, so even if there were roles before this test, they will 
be
     # deleted when this test runs. since the Ranger policies are reset before 
this
@@ -1787,8 +1791,8 @@ class TestRangerIndependent(TestRanger):
 
 
 @CustomClusterTestSuite.with_args(
-    impalad_args=IMPALAD_ARGS,
-    catalogd_args=CATALOGD_ARGS)
+    impalad_args=LEGACY_CATALOG_IMPALAD_ARGS,
+    catalogd_args=LEGACY_CATALOG_CATALOGD_ARGS)
 class TestRangerLegacyCatalog(TestRanger):
   """
   Tests for Apache Ranger integration with Apache Impala in legacy catalog 
mode.
@@ -1940,6 +1944,50 @@ class TestRangerLegacyCatalog(TestRanger):
       self.filesystem_client.delete_file_dir("{0}/{1}"
           .format(source_hdfs_dir, file_name))
 
+  @pytest.mark.execute_serially
+  def test_legacy_catalog_ownership(self):
+      self._test_ownership()
+
+  @pytest.mark.execute_serially
+  def test_grant_revoke_by_owner_legacy_catalog(self, unique_name):
+    self._test_grant_revoke_by_owner(unique_name)
+
+  @pytest.mark.execute_serially
+  def test_select_view_created_by_non_superuser_with_catalog_v1(self, 
unique_name):
+    self._test_select_view_created_by_non_superuser(unique_name)
+
+
+@CustomClusterTestSuite.with_args(
+    impalad_args=IMPALAD_ARGS,
+    catalogd_args=CATALOGD_ARGS)
+class TestRangerLocalCatalog(TestRanger):
+  """
+  Tests for Apache Ranger integration with Apache Impala in local catalog mode.
+  Test methods shares common cluster.
+  """
+
+  @pytest.mark.execute_serially
+  def test_grant_revoke_with_local_catalog(self, unique_name):
+    """Tests grant/revoke with catalog v2 (local catalog)."""
+    self._test_grant_revoke(unique_name, [None, "invalidate metadata",
+                                          "refresh authorization"])
+
+  @pytest.mark.execute_serially
+  def test_local_catalog_ownership(self):
+      # getTableIfCached() in LocalCatalog loads a minimal incomplete table
+      # that does not include the ownership information. Hence show tables
+      # *never* show owned tables. TODO(bharathv): Fix in a follow up commit
+      pytest.xfail("getTableIfCached() faulty behavior, known issue")
+      self._test_ownership()
+
+  @pytest.mark.execute_serially
+  def test_grant_revoke_by_owner_local_catalog(self, unique_name):
+    self._test_grant_revoke_by_owner(unique_name)
+
+  @pytest.mark.execute_serially
+  def test_select_view_created_by_non_superuser_with_local_catalog(self, 
unique_name):
+    self._test_select_view_created_by_non_superuser(unique_name)
+
   @pytest.mark.execute_serially
   def test_grant_option(self, unique_name):
     user1 = getuser()
@@ -2098,14 +2146,6 @@ class TestRangerLegacyCatalog(TestRanger):
                            .format(unique_db, user))
       admin_client.execute("drop database if exists {0} 
cascade".format(unique_db))
 
-  @pytest.mark.execute_serially
-  def test_legacy_catalog_ownership(self):
-      self._test_ownership()
-
-  @pytest.mark.execute_serially
-  def test_grant_revoke_by_owner_legacy_catalog(self, unique_name):
-    self._test_grant_revoke_by_owner(unique_name)
-
   @pytest.mark.execute_serially
   def test_unsupported_sql(self):
     """Tests unsupported SQL statements when running with Ranger."""
@@ -3072,42 +3112,6 @@ class TestRangerLegacyCatalog(TestRanger):
       for statement in cleanup_statements:
         admin_client.execute(statement)
 
-  @pytest.mark.execute_serially
-  def test_select_view_created_by_non_superuser_with_catalog_v1(self, 
unique_name):
-    self._test_select_view_created_by_non_superuser(unique_name)
-
-
-@CustomClusterTestSuite.with_args(
-    impalad_args=LOCAL_CATALOG_IMPALAD_ARGS,
-    catalogd_args=LOCAL_CATALOG_CATALOGD_ARGS)
-class TestRangerLocalCatalog(TestRanger):
-  """
-  Tests for Apache Ranger integration with Apache Impala in local catalog mode.
-  Test methods shares common cluster.
-  """
-
-  @pytest.mark.execute_serially
-  def test_grant_revoke_with_local_catalog(self, unique_name):
-    """Tests grant/revoke with catalog v2 (local catalog)."""
-    self._test_grant_revoke(unique_name, [None, "invalidate metadata",
-                                          "refresh authorization"])
-
-  @pytest.mark.execute_serially
-  def test_local_catalog_ownership(self):
-      # getTableIfCached() in LocalCatalog loads a minimal incomplete table
-      # that does not include the ownership information. Hence show tables
-      # *never* show owned tables. TODO(bharathv): Fix in a follow up commit
-      pytest.xfail("getTableIfCached() faulty behavior, known issue")
-      self._test_ownership()
-
-  @pytest.mark.execute_serially
-  def test_grant_revoke_by_owner_local_catalog(self, unique_name):
-    self._test_grant_revoke_by_owner(unique_name)
-
-  @pytest.mark.execute_serially
-  def test_select_view_created_by_non_superuser_with_local_catalog(self, 
unique_name):
-    self._test_select_view_created_by_non_superuser(unique_name)
-
 
 class TestRangerColumnMaskingTpchNested(CustomClusterTestSuite):
   """
diff --git a/tests/common/impala_connection.py 
b/tests/common/impala_connection.py
index a1636bb40..4547c15aa 100644
--- a/tests/common/impala_connection.py
+++ b/tests/common/impala_connection.py
@@ -1094,7 +1094,12 @@ class MinimalHS2Connection(ImpalaConnection):
     raise NotImplementedError()
 
   def get_log(self, operation_handle):
-    return self.__get_operation(operation_handle).get_log()
+    self.log_handle(operation_handle, 'getting log for operation')
+    # HS2 includes non-error log messages that we need to filter out.
+    cursor = operation_handle.get_handle()
+    lines = [line for line in cursor.get_log().split('\n')
+             if not PROGRESS_LOG_RE.match(line)]
+    return '\n'.join(lines)
 
   def set_configuration_option(self, name, value, is_log_sql=True):
     # Only set the option if it's not already set to the same value.
diff --git a/tests/custom_cluster/test_calcite_planner.py 
b/tests/custom_cluster/test_calcite_planner.py
index 062f75e33..4288d20dc 100644
--- a/tests/custom_cluster/test_calcite_planner.py
+++ b/tests/custom_cluster/test_calcite_planner.py
@@ -37,6 +37,10 @@ class TestCalcitePlanner(CustomClusterTestSuite):
     add_mandatory_exec_option(cls, 'use_calcite_planner', 'true')
 
   @pytest.mark.execute_serially
-  
@CustomClusterTestSuite.with_args(start_args="--env_vars=USE_CALCITE_PLANNER=true")
+  @CustomClusterTestSuite.with_args(
+      start_args="--env_vars=USE_CALCITE_PLANNER=true",
+      impalad_args="--use_local_catalog=false",
+      catalogd_args="--catalog_topic_mode=full")
   def test_calcite_frontend(self, vector, unique_database):
+    """Calcite planner does not work in local catalog mode yet."""
     self.run_test_case('QueryTest/calcite', vector, use_db=unique_database)
diff --git a/tests/custom_cluster/test_coordinators.py 
b/tests/custom_cluster/test_coordinators.py
index 84cc9cbd7..3e0f4d754 100644
--- a/tests/custom_cluster/test_coordinators.py
+++ b/tests/custom_cluster/test_coordinators.py
@@ -139,8 +139,10 @@ class TestCoordinators(CustomClusterTestSuite):
   def test_executor_only_lib_cache(self):
     """IMPALA-6670: checks that the lib-cache gets refreshed on executor-only 
nodes"""
 
-    self._start_impala_cluster([], cluster_size=3, num_coordinators=1,
-                               use_exclusive_coordinators=True)
+    self._start_impala_cluster(
+        ["--impalad_args=--use_local_catalog=false",
+         "--catalogd_args=--catalog_topic_mode=full"],
+        cluster_size=3, num_coordinators=1, use_exclusive_coordinators=True)
 
     db_name = 'TEST_EXEC_ONLY_CACHE'
 
@@ -236,6 +238,7 @@ class TestCoordinators(CustomClusterTestSuite):
 
       # Run the query. Expect the query fails due to mismatched libs at the
       # coordinator and one of the executors.
+      # This failure does not happen in local catalog mode.
       mismatch_query = (
           "select count(*) from functional.alltypes where "
           "`{0}`.mismatch_fn(string_col) = 'Old UDF'".format(db_name));
diff --git a/tests/custom_cluster/test_hbase_hms_column_order.py 
b/tests/custom_cluster/test_hbase_hms_column_order.py
index fcb62ecdb..2b2470e8e 100644
--- a/tests/custom_cluster/test_hbase_hms_column_order.py
+++ b/tests/custom_cluster/test_hbase_hms_column_order.py
@@ -37,6 +37,7 @@ class TestHBaseHmsColumnOrder(CustomClusterTestSuite):
         v.get_value('table_format').file_format == 'hbase')
 
   @CustomClusterTestSuite.with_args(
+      impalad_args="--use_hms_column_order_for_hbase_tables=true",
       catalogd_args="--use_hms_column_order_for_hbase_tables=true")
-  def test_hbase_hms_column_order(self, vector, unique_database):
-    self.run_test_case('QueryTest/hbase-hms-column-order', vector, 
unique_database)
+  def test_hbase_hms_column_order(self, vector):
+    self.run_test_case('QueryTest/hbase-hms-column-order', vector)
diff --git a/tests/custom_cluster/test_metadata_replicas.py 
b/tests/custom_cluster/test_metadata_replicas.py
index 7727ee1c7..e67ddb93e 100644
--- a/tests/custom_cluster/test_metadata_replicas.py
+++ b/tests/custom_cluster/test_metadata_replicas.py
@@ -24,8 +24,12 @@ from tests.util.hive_utils import HiveDbWrapper
 
 
 @SkipIfFS.hive
+@CustomClusterTestSuite.with_args(
+    impalad_args="--use_local_catalog=false",
+    catalogd_args="--catalog_topic_mode=full")
 class TestMetadataReplicas(CustomClusterTestSuite):
-  """ Validates metadata content across catalogd and impalad coordinators."""
+  """ Validates metadata content across catalogd and impalad coordinators.
+  This test is only valid in legacy catalog mode. """
 
   @classmethod
   def setup_class(cls):
diff --git a/tests/custom_cluster/test_query_live.py 
b/tests/custom_cluster/test_query_live.py
index 5f72337f0..1d14c9790 100644
--- a/tests/custom_cluster/test_query_live.py
+++ b/tests/custom_cluster/test_query_live.py
@@ -175,7 +175,8 @@ class TestQueryLive(CustomClusterTestSuite):
     insert_result = self.execute_query_expect_failure(self.client,
         'insert into sys.impala_query_live select * from sys.impala_query_live 
limit 1')
     assert 'UnsupportedOperationException: Cannot create data sink into table 
of type: '\
-        'org.apache.impala.catalog.SystemTable' in str(insert_result)
+        'org.apache.impala.catalog' in str(insert_result)
+    assert 'SystemTable' in str(insert_result)
 
     update_result = self.execute_query_expect_failure(self.client,
         'update sys.impala_query_live set query_id = ""')
diff --git a/tests/custom_cluster/test_services_rpc_errors.py 
b/tests/custom_cluster/test_services_rpc_errors.py
index 34e906e33..99d211348 100644
--- a/tests/custom_cluster/test_services_rpc_errors.py
+++ b/tests/custom_cluster/test_services_rpc_errors.py
@@ -70,33 +70,34 @@ class TestCatalogRpcErrors(CustomClusterTestSuite):
     """Validate that RPCs to the catalogd are retried by injecting a failure 
into the
     first RPC attempt for any catalogd RPC. Run a variety of queries that 
require
     catalogd interaction to ensure all RPCs are retried."""
+    log_pattern = "{0}.*Injected RPC error.*Debug Action: 
CATALOG_RPC_FIRST_ATTEMPT"
     # Validate create table queries.
     result = self.execute_query("create table {0}.tmp (col 
int)".format(unique_database))
     assert result.success
+    self.assert_impalad_log_contains("INFO", 
log_pattern.format(result.query_id), -1)
 
     # Validate insert queries.
     result = self.execute_query("insert into table {0}.tmp values (1)"
         .format(unique_database))
     assert result.success
+    self.assert_impalad_log_contains("INFO", 
log_pattern.format(result.query_id), -1)
 
     # Validate compute stats queries.
     result = self.execute_query("compute stats 
{0}.tmp".format(unique_database))
     assert result.success
+    self.assert_impalad_log_contains("INFO", 
log_pattern.format(result.query_id), -1)
 
     # Validate refresh table queries.
     result = self.execute_query("refresh {0}.tmp".format(unique_database))
     assert result.success
+    self.assert_impalad_log_contains("INFO", 
log_pattern.format(result.query_id), -1)
 
     # Validate drop table queries.
     result = self.execute_query("drop table {0}.tmp".format(unique_database))
     assert result.success
+    self.assert_impalad_log_contains("INFO", 
log_pattern.format(result.query_id), -1)
 
     # Validate select queries against pre-existing, but not-loaded tables.
     result = self.execute_query("select count(*) from 
functional_parquet.alltypes")
     assert result.success, str(result)
-
-    # The 6 queries above each should have triggered the DEBUG_ACTION, so 
assert that
-    # the DEBUG_ACTION was triggered 8 times (an extra 2 for the DROP and 
CREATE DATABASE
-    # queries needed to make the unique_database).
-    self.assert_impalad_log_contains("INFO",
-        "Injected RPC error.*Debug Action: CATALOG_RPC_FIRST_ATTEMPT", 8)
+    self.assert_impalad_log_contains("INFO", 
log_pattern.format(result.query_id), -1)
diff --git a/tests/custom_cluster/test_tuple_cache.py 
b/tests/custom_cluster/test_tuple_cache.py
index cd3e48ce6..b5e89f33c 100644
--- a/tests/custom_cluster/test_tuple_cache.py
+++ b/tests/custom_cluster/test_tuple_cache.py
@@ -470,7 +470,10 @@ class TestTupleCacheSingle(TestTupleCacheBase):
     assert sorted(result1.data) == sorted(result2.data)
 
 
-@CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS)
+@CustomClusterTestSuite.with_args(
+    start_args=CACHE_START_ARGS,
+    impalad_args="--use_local_catalog=false",
+    catalogd_args="--catalog_topic_mode=full")
 class TestTupleCacheCluster(TestTupleCacheBase):
   """Tests Impala with 3 executors and mt_dop=1."""
 
@@ -482,6 +485,8 @@ class TestTupleCacheCluster(TestTupleCacheBase):
   def test_runtime_filters(self, vector, unique_database):
     """
     This tests that adding files to a table results in different runtime 
filter keys.
+    The last assertions after 'invaidate metadata' only meet if Impala cluster 
is in
+    legacy catalog mode.
     """
     self.client.set_configuration(vector.get_value('exec_option'))
     fq_table = "{0}.runtime_filters".format(unique_database)
diff --git a/tests/custom_cluster/test_web_pages.py 
b/tests/custom_cluster/test_web_pages.py
index b6f77e115..b25b3c84c 100644
--- a/tests/custom_cluster/test_web_pages.py
+++ b/tests/custom_cluster/test_web_pages.py
@@ -356,9 +356,12 @@ class TestWebPage(CustomClusterTestSuite):
   @CustomClusterTestSuite.with_args(
     impalad_args="--catalog_client_rpc_timeout_ms=100 "
                  "--catalog_client_rpc_retry_interval_ms=10 "
-                 "--catalog_client_connection_num_retries=2")
+                 "--catalog_client_connection_num_retries=2 "
+                 "--use_local_catalog=false ",
+    catalogd_args="--catalog_topic_mode=full")
   def test_catalog_operations_with_rpc_retry(self):
-    """Test that catalog RPC retries are all shown in the /operations page"""
+    """Test that catalog RPC retries are all shown in the /operations page.
+    Timeout values in this test is specifically tailored for legacy catalog 
mode."""
     # Run a DESCRIBE to ensure the table is loaded. So the first RPC attempt 
will
     # time out in its real work. This triggers a PrioritizeLoad RPC which 
usually
     # finishes in 40ms. So 100ms for catalog RPC timeout is enough.
diff --git a/tests/custom_cluster/test_workload_mgmt_sql_details.py 
b/tests/custom_cluster/test_workload_mgmt_sql_details.py
index fc04ea125..5d9d2a882 100644
--- a/tests/custom_cluster/test_workload_mgmt_sql_details.py
+++ b/tests/custom_cluster/test_workload_mgmt_sql_details.py
@@ -427,12 +427,15 @@ class 
TestWorkloadManagementSQLDetailsCalcite(WorkloadManagementTestSuite):
     super(TestWorkloadManagementSQLDetailsCalcite, cls).add_test_dimensions()
     cls.ImpalaTestMatrix.add_dimension(hs2_client_protocol_dimension())
 
-  @CustomClusterTestSuite.with_args(start_args="--use_calcite_planner=true",
-                                    cluster_size=1, workload_mgmt=True)
+  @CustomClusterTestSuite.with_args(
+      start_args="--use_calcite_planner=true",
+      impalad_args="--use_local_catalog=false",
+      catalogd_args="--catalog_topic_mode=full",
+      cluster_size=1, workload_mgmt=True)
   def test_tpcds_8_decimal(self, vector):
     """Runs the tpcds-decimal_v2-q8 query using the calcite planner and 
asserts the query
        completes successfully. See IMPALA-13505 for details on why this query 
in
-       particular is tested."""
+       particular is tested. Calcite planner does not work in local catalog 
mode yet."""
 
     client = self.get_client(vector.get_value("protocol"))
     assert client.execute("use tpcds").success


Reply via email to