This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4ddacac14f03b4713aa6a9a0da2c3a45ffe03715
Author: stiga-huang <[email protected]>
AuthorDate: Tue Feb 25 15:27:55 2025 +0800

    IMPALA-11402: Add limit on files fetched by a single 
getPartialCatalogObject request
    
    getPartialCatalogObject is a catalogd RPC used by local catalog mode
    coordinators to fetch metadata on-demand from catalogd.
    For a table with a huge number (e.g. 6M) of files, catalogd might hit
    OOM of exceeding the JVM array limit when serializing the response of
    a getPartialCatalogObject request for all partitions (thus all files).
    
    This patch adds a new flag, catalog_partial_fetch_max_files, to define
    the max number of file descriptors allowed in a response of
    getPartialCatalogObject. Catalogd will truncate the response in
    partition level when it's too big, and only return a subset of the
    requested partitions. Coordinator should send new requests to fetch the
    remaining partitions. Note that it's possible that table metadata
    changes between the requests. Coordinator will detect the catalog
    version changes and throws an InconsistentMetadataFetchException for the
    planner to replan the query. This is an existing mechanism for other
    kinds of table metadata.
    
    Here are some metrics of the number of files in a single response and
    the corresponding byte array size and duration of a single response:
     * 1000000: 371.71MB, 1s487ms
     * 2000000: 744.51MB, 4s035ms
     * 3000000: 1.09GB, 6s643ms
     * 4000000: 1.46GB, duration not measured due to GC pauses
     * 5000000: 1.82GB, duration not measured due to GC pauses
     * 6000000: >2GB (hit OOM)
    Choose 1000000 as the default value for now. We can tune it in the
    future.
    
    Tests:
     - Added custom-cluster test
     - Ran e2e tests in local-catalog mode with
       catalog_partial_fetch_max_files=1000 so the new codes are used.
    
    Change-Id: Ibb13fec20de5a17e7fc33613ca5cdebb9ac1a1e5
    Reviewed-on: http://gerrit.cloudera.org:8080/22559
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/catalog/catalog-server.cc                   | 15 ++++--
 be/src/util/backend-gflag-util.cc                  |  2 +
 common/thrift/BackendGflags.thrift                 |  2 +
 .../org/apache/impala/catalog/HdfsPartition.java   |  6 +++
 .../java/org/apache/impala/catalog/HdfsTable.java  | 34 ++++++++++++
 .../impala/catalog/local/CatalogdMetaProvider.java | 63 ++++++++++++++--------
 .../org/apache/impala/service/BackendConfig.java   |  4 ++
 tests/custom_cluster/test_local_catalog.py         | 53 ++++++++++++++++++
 8 files changed, 155 insertions(+), 24 deletions(-)

diff --git a/be/src/catalog/catalog-server.cc b/be/src/catalog/catalog-server.cc
index 49a11876a..26b5038e1 100644
--- a/be/src/catalog/catalog-server.cc
+++ b/be/src/catalog/catalog-server.cc
@@ -76,6 +76,12 @@ 
DEFINE_int64_hidden(catalog_partial_fetch_rpc_queue_timeout_s, LLONG_MAX, "Maxim
     "(in seconds) a partial catalog object fetch RPC spends in the queue 
waiting "
     "to run. Must be set to a value greater than zero.");
 
+DEFINE_int32(catalog_partial_fetch_max_files, 1000000, "Maximum number of file 
"
+    "descriptors allowed to return in a single getPartialCatalogObject RPC. 
Used to "
+    "avoid hitting the JVM array limit when catalogd serializing the thrift 
response. "
+    "Note that getPartialCatalogObject RPCs are only used in local catalog 
mode "
+    "coordinators so this is unrelated to the legacy catalog mode.");
+
 DEFINE_int32(catalog_max_lock_skipped_topic_updates, 3, "Maximum number of 
topic "
     "updates skipped for a table due to lock contention in catalogd after 
which it must"
     "be added to the topic the update log. This limit only applies to distinct 
lock "
@@ -379,9 +385,12 @@ class CatalogServiceThriftIf : public CatalogServiceIf {
       status = catalog_server_->catalog()->GetPartialCatalogObject(req, &resp);
     }
     if (!status.ok()) LOG(ERROR) << status.GetDetail();
-    TStatus thrift_status;
-    status.ToThrift(&thrift_status);
-    resp.__set_status(thrift_status);
+    // Don't overwrite the non-OK status returned from catalogd
+    if (!resp.__isset.status || resp.status.status_code == TErrorCode::OK) {
+      TStatus thrift_status;
+      status.ToThrift(&thrift_status);
+      resp.__set_status(thrift_status);
+    }
     VLOG_RPC << "GetPartialCatalogObject(): response=" << 
ThriftDebugStringNoThrow(resp);
   }
 
diff --git a/be/src/util/backend-gflag-util.cc 
b/be/src/util/backend-gflag-util.cc
index 8b8d631fd..3826b09bd 100644
--- a/be/src/util/backend-gflag-util.cc
+++ b/be/src/util/backend-gflag-util.cc
@@ -66,6 +66,7 @@ DECLARE_int64(kudu_scanner_thread_estimated_bytes_per_column);
 DECLARE_int64(kudu_scanner_thread_max_estimated_bytes);
 DECLARE_int32(catalog_max_parallel_partial_fetch_rpc);
 DECLARE_int64(catalog_partial_fetch_rpc_queue_timeout_s);
+DECLARE_int32(catalog_partial_fetch_max_files);
 DECLARE_int64(exchg_node_buffer_size_bytes);
 DECLARE_int32(kudu_mutation_buffer_size);
 DECLARE_int32(kudu_error_buffer_size);
@@ -380,6 +381,7 @@ Status PopulateThriftBackendGflags(TBackendGflags& cfg) {
       FLAGS_catalog_max_parallel_partial_fetch_rpc);
   cfg.__set_catalog_partial_fetch_rpc_queue_timeout_s(
       FLAGS_catalog_partial_fetch_rpc_queue_timeout_s);
+  
cfg.__set_catalog_partial_fetch_max_files(FLAGS_catalog_partial_fetch_max_files);
   cfg.__set_exchg_node_buffer_size_bytes(
       FLAGS_exchg_node_buffer_size_bytes);
   cfg.__set_kudu_mutation_buffer_size(FLAGS_kudu_mutation_buffer_size);
diff --git a/common/thrift/BackendGflags.thrift 
b/common/thrift/BackendGflags.thrift
index 065c25ec2..ffa8c2d8a 100644
--- a/common/thrift/BackendGflags.thrift
+++ b/common/thrift/BackendGflags.thrift
@@ -323,4 +323,6 @@ struct TBackendGflags {
   145: required bool catalogd_deployed
 
   146: required string catalog_config_dir
+
+  147: required i32 catalog_partial_fetch_max_files
 }
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 7cece6268..d642abb60 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -595,6 +595,12 @@ public class HdfsPartition extends CatalogObjectImpl
            encodedDeleteFileDescriptors_.size();
   }
 
+  public static int getNumFds(TPartialPartitionInfo partInfo) {
+    return partInfo.getFile_descriptorsSize() +
+        partInfo.getInsert_file_descriptorsSize() +
+        partInfo.getDelete_file_descriptorsSize();
+  }
+
   /**
    * Returns the requested partitions info and the number of files filtered 
out based on
    * the ACID writeIdList.
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java 
b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 00994419b..f1c6852b3 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -73,6 +73,7 @@ import org.apache.impala.thrift.TAccessLevel;
 import org.apache.impala.thrift.TCatalogObject;
 import org.apache.impala.thrift.TCatalogObjectType;
 import org.apache.impala.thrift.TColumn;
+import org.apache.impala.thrift.TErrorCode;
 import org.apache.impala.thrift.TGetPartialCatalogObjectRequest;
 import org.apache.impala.thrift.TGetPartialCatalogObjectResponse;
 import org.apache.impala.thrift.THdfsPartition;
@@ -83,6 +84,7 @@ import org.apache.impala.thrift.TPartitionKeyValue;
 import org.apache.impala.thrift.TResultSet;
 import org.apache.impala.thrift.TResultSetMetadata;
 import org.apache.impala.thrift.TSqlConstraints;
+import org.apache.impala.thrift.TStatus;
 import org.apache.impala.thrift.TTable;
 import org.apache.impala.thrift.TTableDescriptor;
 import org.apache.impala.thrift.TTableType;
@@ -2288,6 +2290,7 @@ public class HdfsTable extends Table implements FeFsTable 
{
     Counter misses = metrics_.getCounter(FILEMETADATA_CACHE_MISS_METRIC);
     Counter hits = metrics_.getCounter(FILEMETADATA_CACHE_HIT_METRIC);
     int numFilesFiltered = 0;
+    int numFilesCollected = 0;
     if (partIds != null) {
       resp.table_info.partitions = 
Lists.newArrayListWithCapacity(partIds.size());
       for (long partId : partIds) {
@@ -2302,6 +2305,9 @@ public class HdfsTable extends Table implements FeFsTable 
{
             part.getPartialPartitionInfo(req, reqWriteIdList);
         if (partInfoStatus.second != null) {
           hits.inc();
+          int numFds = HdfsPartition.getNumFds(partInfoStatus.first);
+          if (hitNumFilesLimit(resp, partIds, part, numFds, 
numFilesCollected)) break;
+          numFilesCollected += numFds;
           numFilesFiltered += partInfoStatus.second;
         } else {
           misses.inc();
@@ -2309,6 +2315,7 @@ public class HdfsTable extends Table implements FeFsTable 
{
         }
         resp.table_info.partitions.add(partInfoStatus.first);
       }
+      if (resp.isSetStatus() && resp.status.status_code != TErrorCode.OK) 
return resp;
     }
     // In most of the cases, the prefix map only contains one item for the 
table location.
     // Here we always send it since it's small.
@@ -2338,6 +2345,33 @@ public class HdfsTable extends Table implements 
FeFsTable {
     return resp;
   }
 
+  private boolean hitNumFilesLimit(TGetPartialCatalogObjectResponse resp,
+      Collection<Long> partIds, HdfsPartition part, int numFds, int 
numFilesCollected) {
+    if (numFilesCollected + numFds >
+        BackendConfig.INSTANCE.getCatalogPartialFetchMaxFiles()) {
+      if (numFilesCollected == 0) {
+        // Even collecting the first partition will exceed the limit which 
means no files
+        // can be returned. Return an unrecoverable error to the coordinator.
+        String err = String.format("Too many files to collect in table %s%s: 
%d. " +
+            "Current limit is %d configured by startup flag " +
+            "'catalog_partial_fetch_max_files'. Consider compacting files of 
the table.",
+            full_name_, isPartitioned() ? " partition " + 
part.getPartitionName() : "",
+            numFds, BackendConfig.INSTANCE.getCatalogPartialFetchMaxFiles());
+        LOG.error(err);
+        resp.setStatus(new TStatus(TErrorCode.INTERNAL_ERROR, 
Lists.newArrayList(err)));
+      } else {
+        LOG.warn("Returning {} files from {}/{} requested partitions for table 
{}. " +
+                "Coordinator will fetch the remaining partitions in another 
request " +
+                "but this impacts metadata performance. Consider compacting 
files to " +
+                "improve it.",
+            numFilesCollected, resp.table_info.partitions.size(), 
partIds.size(),
+            full_name_);
+      }
+      return true;
+    }
+    return false;
+  }
+
   private double getFileMetadataCacheHitRate() {
     long hits = metrics_.getCounter(FILEMETADATA_CACHE_HIT_METRIC).getCount();
     long misses = 
metrics_.getCounter(FILEMETADATA_CACHE_MISS_METRIC).getCount();
diff --git 
a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java 
b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 02cb9aaa7..fa40fdddc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -466,9 +466,8 @@ public class CatalogdMetaProvider implements MetaProvider {
     }
     resp = new TGetPartialCatalogObjectResponse();
     new TDeserializer().deserialize(resp, ret);
-    if (resp.status.status_code != TErrorCode.OK) {
-      // TODO(todd) do reasonable error handling
-      throw new TException(resp.toString());
+    if (resp.isSetStatus() && resp.status.status_code != TErrorCode.OK) {
+      throw new TException(String.join("\n", resp.status.error_msgs));
     }
 
     // If we get a "not found" response, then we assume that this was a case 
of an
@@ -746,7 +745,7 @@ public class CatalogdMetaProvider implements MetaProvider {
     return res.values();
   }
 
-  private TGetPartialCatalogObjectRequest newReqForTable(String dbName,
+  private static TGetPartialCatalogObjectRequest newReqForTable(String dbName,
       String tableName) {
     TGetPartialCatalogObjectRequest req = new 
TGetPartialCatalogObjectRequest();
     req.object_desc = new TCatalogObject();
@@ -756,7 +755,7 @@ public class CatalogdMetaProvider implements MetaProvider {
     return req;
   }
 
-  private TGetPartialCatalogObjectRequest newReqForTable(TableMetaRef table) {
+  private static TGetPartialCatalogObjectRequest newReqForTable(TableMetaRef 
table) {
     Preconditions.checkArgument(table instanceof TableMetaRefImpl,
         "table ref %s was not created by CatalogdMetaProvider", table);
     TGetPartialCatalogObjectRequest req = newReqForTable(
@@ -766,6 +765,20 @@ public class CatalogdMetaProvider implements MetaProvider {
     return req;
   }
 
+  private static TGetPartialCatalogObjectRequest newReqForPartitions(
+      TableMetaRefImpl table, List<Long> partIds) {
+    TGetPartialCatalogObjectRequest req = newReqForTable(table);
+    req.table_info_selector.partition_ids = partIds;
+    req.table_info_selector.want_partition_metadata = true;
+    req.table_info_selector.want_partition_files = true;
+    if (BackendConfig.INSTANCE.isAutoCheckCompaction()) {
+      req.table_info_selector.valid_write_ids = table.validWriteIds_;
+    }
+    // TODO(IMPALA-7535): fetch incremental stats on-demand
+    req.table_info_selector.want_partition_stats = true;
+    return req;
+  }
+
   @Override
   public Pair<Table, TableMetaRef> getTableIfPresent(String dbName, String 
tblName) {
     TableCacheKey cacheKey = new TableCacheKey(dbName.toLowerCase(),
@@ -967,7 +980,7 @@ public class CatalogdMetaProvider implements MetaProvider {
       List<String> partitionColumnNames,
       ListMap<TNetworkAddress> hostIndex,
       List<PartitionRef> partitionRefs)
-      throws MetaException, TException {
+      throws CatalogException, TException {
     Preconditions.checkArgument(table instanceof TableMetaRefImpl);
     TableMetaRefImpl refImpl = (TableMetaRefImpl)table;
     Stopwatch sw = Stopwatch.createStarted();
@@ -1071,31 +1084,39 @@ public class CatalogdMetaProvider implements 
MetaProvider {
    */
   private Map<PartitionRef, PartitionMetadata> loadPartitionsFromCatalogd(
       TableMetaRefImpl table, ListMap<TNetworkAddress> hostIndex,
-      List<PartitionRef> partRefs) throws TException {
+      List<PartitionRef> partRefs) throws CatalogException, TException {
     List<Long> ids = Lists.newArrayListWithCapacity(partRefs.size());
     for (PartitionRef partRef: partRefs) {
       ids.add(((PartitionRefImpl)partRef).getId());
     }
 
-    TGetPartialCatalogObjectRequest req = newReqForTable(table);
-    req.table_info_selector.partition_ids = ids;
-    req.table_info_selector.want_partition_metadata = true;
-    req.table_info_selector.want_partition_files = true;
-    if (BackendConfig.INSTANCE.isAutoCheckCompaction()) {
-      req.table_info_selector.valid_write_ids = table.validWriteIds_;
-    }
-    // TODO(todd): fetch incremental stats on-demand for 
compute-incremental-stats.
-    req.table_info_selector.want_partition_stats = true;
+    TGetPartialCatalogObjectRequest req = newReqForPartitions(table, ids);
     TGetPartialCatalogObjectResponse resp = sendRequest(req);
     checkResponse(resp.table_info != null && resp.table_info.partitions != 
null,
         req, "missing partition list result");
     checkResponse(resp.table_info.network_addresses != null,
         req, "missing network addresses");
-    checkResponse(resp.table_info.partitions.size() == ids.size(),
-        req, "returned %d partitions instead of expected %d",
-        resp.table_info.partitions.size(), ids.size());
     addTableMetadatStorageLoadTimeToProfile(
         resp.table_info.storage_metadata_load_time_ns);
+    boolean logProgress = false;
+    while (resp.table_info.partitions.size() < ids.size()) {
+      logProgress = true;
+      int numFetchedParts = resp.table_info.partitions.size();
+      LOG.info("Fetched {}/{} partitions for {}. Sending new requests.",
+          numFetchedParts, ids.size(), table);
+      List<Long> remainingIds = Lists.newArrayListWithCapacity(
+          ids.size() - numFetchedParts);
+      for (int i = numFetchedParts; i < ids.size(); i++) {
+        remainingIds.add(ids.get(i));
+      }
+
+      TGetPartialCatalogObjectRequest nextReq = newReqForPartitions(table, 
remainingIds);
+      TGetPartialCatalogObjectResponse nextResp = sendRequest(nextReq);
+      resp.table_info.partitions.addAll(nextResp.table_info.partitions);
+    }
+    if (logProgress) {
+      LOG.info("Fetched {} partitions for {}", ids.size(), table);
+    }
     Map<PartitionRef, PartitionMetadata> ret = new HashMap<>();
     for (int i = 0; i < ids.size(); i++) {
       PartitionRef partRef = partRefs.get(i);
@@ -1142,7 +1163,7 @@ public class CatalogdMetaProvider implements MetaProvider 
{
 
       PartitionMetadata oldVal = ret.put(partRef, metaImpl);
       if (oldVal != null) {
-        throw new RuntimeException("catalogd returned partition " + part.id +
+        throw new CatalogException("catalogd returned partition " + part.id +
             " multiple times");
       }
     }
@@ -1254,7 +1275,7 @@ public class CatalogdMetaProvider implements MetaProvider 
{
       TGetPartialCatalogObjectRequest req, String msg, Object... args) throws 
TException {
     if (condition) return;
     throw new TException(String.format("Invalid response from catalogd for 
request " +
-        req.toString() + ": " + msg, args));
+        StringUtils.abbreviate(req.toString(), 1000) + ": " + msg, args));
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/service/BackendConfig.java 
b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
index 8cb113191..50d633c55 100644
--- a/fe/src/main/java/org/apache/impala/service/BackendConfig.java
+++ b/fe/src/main/java/org/apache/impala/service/BackendConfig.java
@@ -139,6 +139,10 @@ public class BackendConfig {
     return backendCfg_.catalog_partial_fetch_rpc_queue_timeout_s;
   }
 
+  public int getCatalogPartialFetchMaxFiles() {
+    return backendCfg_.catalog_partial_fetch_max_files;
+  }
+
   public long getHMSPollingIntervalInSeconds() {
     return backendCfg_.hms_event_polling_interval_s;
   }
diff --git a/tests/custom_cluster/test_local_catalog.py 
b/tests/custom_cluster/test_local_catalog.py
index 83ad36c2e..3f7da98d8 100644
--- a/tests/custom_cluster/test_local_catalog.py
+++ b/tests/custom_cluster/test_local_catalog.py
@@ -682,3 +682,56 @@ class TestReusePartitionMetadata(CustomClusterTestSuite):
     match = re.search(r"CatalogFetch.Partitions.Misses: (\d+)", 
ret.runtime_profile)
     assert len(match.groups()) == 1
     assert match.group(1) == str(partition_misses)
+
+
+class TestAllowIncompleteData(CustomClusterTestSuite):
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--use_local_catalog=true",
+    catalogd_args="--catalog_topic_mode=minimal 
--catalog_partial_fetch_max_files=1000")
+  def test_incomplete_partition_list(self):
+    """Test that coordinator can fetch the missing partitions when catalogd 
decides to
+    truncate the partition list in the response"""
+    res = self.execute_query_expect_success(
+        self.client, "show files in tpcds.store_sales")
+    assert len(res.data) == 1824
+
+    self.assert_catalogd_log_contains(
+        "WARNING", "Returning 1000 files from 1000/1824 requested partitions 
for table "
+        "tpcds.store_sales. Coordinator will fetch the remaining partitions in 
another "
+        "request but this impacts metadata performance. Consider compacting 
files to "
+        "improve it.")
+    self.assert_impalad_log_contains(
+        "INFO", r"Fetched 1000/1824 partitions for TableMetaRef 
tpcds.store_sales@\d+. "
+        "Sending new requests.")
+    self.assert_impalad_log_contains(
+        "INFO", r"Fetched 1824 partitions for TableMetaRef 
tpcds.store_sales@\d+")
+
+  @pytest.mark.execute_serially
+  @CustomClusterTestSuite.with_args(
+    impalad_args="--use_local_catalog=true",
+    catalogd_args="--catalog_topic_mode=minimal 
--catalog_partial_fetch_max_files=1")
+  def test_too_many_files(self, unique_database):
+    """Test the error reporting the limit is too small"""
+    exception = self.execute_query_expect_failure(
+        self.client, "show files in tpch_parquet.lineitem")
+    err = ("Too many files to collect in table tpch_parquet.lineitem: 3. 
Current limit "
+           "is 1 configured by startup flag 'catalog_partial_fetch_max_files'. 
Consider "
+           "compacting files of the table.")
+    assert err in str(exception)
+    self.assert_catalogd_log_contains("ERROR", err)
+
+    # Create a partitioned table with multiple files
+    tbl = unique_database + ".foo"
+    self.execute_query("create table {0} partitioned by (year, month) as "
+                       "select * from functional.alltypestiny".format(tbl))
+    self.execute_query("insert into {0} partition(year, month) select * from "
+                       "functional.alltypestiny".format(tbl))
+    exception = self.execute_query_expect_failure(
+      self.client, "show files in {0}.foo".format(unique_database))
+    err = ("Too many files to collect in table {0} partition 
year=2009/month=1: 2. "
+           "Current limit is 1 configured by startup flag "
+           "'catalog_partial_fetch_max_files'. Consider compacting files of 
the table."
+           ).format(tbl)
+    assert err in str(exception)
+    self.assert_catalogd_log_contains("ERROR", err)

Reply via email to