This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 7ca20b3c94b1c9c1ddd4ed1e89f0969a0df55330
Author: Riza Suminto <[email protected]>
AuthorDate: Thu May 4 14:45:43 2023 -0700

    Revert "IMPALA-11123: Optimize count(star) for ORC scans"
    
    This reverts commit f932d78ad0a30e322d59fc39072f710f889d2135.
    
    The commit is reverted because it cause significant regression for
    non-optimized counts star query in parquet format.
    
    There are several conflicts that need to be resolved manually:
    - Removed assertion against 'NumFileMetadataRead' counter that is lost
      with the revert.
    - Adjust the assertion in test_plain_count_star_optimization,
      test_in_predicate_push_down, and test_partitioned_insert of
      test_iceberg.py due to missing improvement in parquet optimized count
      star code path.
    - Keep the "override" specifier in hdfs-parquet-scanner.h to pass
      clang-tidy
    - Keep python3 style of RuntimeError instantiation in
      test_file_parser.py to pass check-python-syntax.sh
    
    Change-Id: Iefd8fd0838638f9db146f7b706e541fe2aaf01c1
    Reviewed-on: http://gerrit.cloudera.org:8080/19843
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Wenzhe Zhou <[email protected]>
---
 be/src/exec/hdfs-columnar-scanner.cc               |  72 +---
 be/src/exec/hdfs-columnar-scanner.h                |  37 --
 be/src/exec/hdfs-scan-node-base.cc                 |   5 +-
 be/src/exec/hdfs-scan-node-base.h                  |  41 +-
 be/src/exec/hdfs-scanner.cc                        |   5 +-
 be/src/exec/orc/hdfs-orc-scanner.cc                |  64 ++--
 be/src/exec/orc/hdfs-orc-scanner.h                 |  21 +-
 be/src/exec/orc/orc-column-readers.h               |   4 +-
 be/src/exec/parquet/hdfs-parquet-scanner.cc        | 103 +++--
 be/src/exec/parquet/hdfs-parquet-scanner.h         |  23 +-
 be/src/exec/parquet/parquet-column-readers.h       |   2 +-
 common/thrift/PlanNodes.thrift                     |   5 +-
 .../org/apache/impala/planner/HdfsScanNode.java    |  20 +-
 .../org/apache/impala/planner/PlannerTest.java     |   5 -
 .../queries/PlannerTest/orc-stats-agg.test         | 426 ---------------------
 .../queries/PlannerTest/parquet-stats-agg.test     |  31 +-
 .../queries/PlannerTest/resource-requirements.test |   9 +-
 .../iceberg-compound-predicate-push-down.test      |   2 -
 .../QueryTest/iceberg-in-predicate-push-down.test  |  10 +-
 .../iceberg-is-null-predicate-push-down.test       |   1 -
 .../QueryTest/iceberg-partitioned-insert.test      |  25 +-
 .../iceberg-plain-count-star-optimization.test     |  24 +-
 .../iceberg-upper-lower-bound-metrics.test         |   3 -
 .../iceberg-v2-plain-count-star-optimization.test  |   6 +-
 .../iceberg-v2-read-position-deletes-orc.test      |   8 -
 .../iceberg-v2-read-position-deletes.test          |  12 -
 .../queries/QueryTest/mixed-format.test            |  13 +-
 .../queries/QueryTest/orc-stats-agg.test           | 152 --------
 .../queries/QueryTest/parquet-stats-agg.test       |  59 ---
 .../queries/QueryTest/partition-key-scans.test     |  28 --
 .../queries/QueryTest/runtime_filters.test         |   8 -
 .../queries/QueryTest/runtime_filters_mt_dop.test  |   8 -
 .../queries/QueryTest/scanners.test                |   3 -
 tests/custom_cluster/test_executor_groups.py       |   2 +-
 tests/custom_cluster/test_query_retries.py         |  33 +-
 tests/query_test/test_aggregation.py               |  72 ++--
 tests/query_test/test_iceberg.py                   |   1 -
 tests/query_test/test_scanners.py                  |  12 +-
 tests/util/test_file_parser.py                     |  18 +-
 39 files changed, 269 insertions(+), 1104 deletions(-)

diff --git a/be/src/exec/hdfs-columnar-scanner.cc 
b/be/src/exec/hdfs-columnar-scanner.cc
index 28da3c40a..913142936 100644
--- a/be/src/exec/hdfs-columnar-scanner.cc
+++ b/be/src/exec/hdfs-columnar-scanner.cc
@@ -64,27 +64,20 @@ PROFILE_DEFINE_COUNTER(IoReadTotalBytes, DEBUG, 
TUnit::BYTES,
     "The total number of bytes read from streams.");
 PROFILE_DEFINE_COUNTER(IoReadSkippedBytes, DEBUG, TUnit::BYTES,
     "The total number of bytes skipped from streams.");
-PROFILE_DEFINE_COUNTER(NumFileMetadataRead, DEBUG, TUnit::UNIT,
-    "The total number of file metadata reads done in place of rows or row 
groups / "
-    "stripe iteration.");
 
 const char* HdfsColumnarScanner::LLVM_CLASS_NAME = 
"class.impala::HdfsColumnarScanner";
 
-HdfsColumnarScanner::HdfsColumnarScanner(HdfsScanNodeBase* scan_node, 
RuntimeState* state)
-  : HdfsScanner(scan_node, state),
+HdfsColumnarScanner::HdfsColumnarScanner(HdfsScanNodeBase* scan_node,
+    RuntimeState* state) :
+    HdfsScanner(scan_node, state),
     scratch_batch_(new ScratchTupleBatch(
-        *scan_node->row_desc(), state_->batch_size(), 
scan_node->mem_tracker())),
-    assemble_rows_timer_(scan_node->materialize_tuple_timer()) {}
+        *scan_node->row_desc(), state_->batch_size(), 
scan_node->mem_tracker())) {
+}
 
 HdfsColumnarScanner::~HdfsColumnarScanner() {}
 
 Status HdfsColumnarScanner::Open(ScannerContext* context) {
   RETURN_IF_ERROR(HdfsScanner::Open(context));
-  // Memorize 'is_footer_scanner_' here since 'stream_' can be released early.
-  const io::ScanRange* range = stream_->scan_range();
-  is_footer_scanner_ =
-      range->offset() + range->bytes_to_read() >= 
stream_->file_desc()->file_length;
-
   RuntimeProfile* profile = scan_node_->runtime_profile();
   num_cols_counter_ = PROFILE_NumColumns.Instantiate(profile);
   num_scanners_with_no_reads_counter_ =
@@ -102,7 +95,6 @@ Status HdfsColumnarScanner::Open(ScannerContext* context) {
   io_total_request_ = PROFILE_IoReadTotalRequest.Instantiate(profile);
   io_total_bytes_ = PROFILE_IoReadTotalBytes.Instantiate(profile);
   io_skipped_bytes_ = PROFILE_IoReadSkippedBytes.Instantiate(profile);
-  num_file_metadata_read_ = PROFILE_NumFileMetadataRead.Instantiate(profile);
   return Status::OK();
 }
 
@@ -298,60 +290,6 @@ Status 
HdfsColumnarScanner::DivideReservationBetweenColumns(
   return Status::OK();
 }
 
-Status HdfsColumnarScanner::GetNextWithCountStarOptimization(RowBatch* 
row_batch) {
-  // There are no materialized slots, e.g. count(*) over the table.  We can 
serve
-  // this query from just the file metadata.  We don't need to read the column 
data.
-  // Only scanner of the footer split will run in this case. See the logic in
-  // HdfsScanner::IssueFooterRanges() and 
HdfsScanNodeBase::ReadsFileMetadataOnly().
-  DCHECK(is_footer_scanner_);
-  int64_t tuple_buffer_size;
-  uint8_t* tuple_buffer;
-  int capacity = 1;
-  RETURN_IF_ERROR(row_batch->ResizeAndAllocateTupleBuffer(state_,
-      row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(), 
&capacity,
-      &tuple_buffer_size, &tuple_buffer));
-  int64_t num_rows = GetNumberOfRowsInFile();
-  COUNTER_ADD(num_file_metadata_read_, 1);
-  DCHECK_LE(rows_read_in_group_, num_rows);
-  Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buffer);
-  InitTuple(template_tuple_, dst_tuple);
-  int64_t* dst_slot = 
dst_tuple->GetBigIntSlot(scan_node_->count_star_slot_offset());
-  *dst_slot = num_rows;
-  TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow());
-  dst_row->SetTuple(0, dst_tuple);
-  row_batch->CommitLastRow();
-  rows_read_in_group_ += num_rows;
-  eos_ = true;
-  return Status::OK();
-}
-
-Status HdfsColumnarScanner::GetNextWithTemplateTuple(RowBatch* row_batch) {
-  // There are no materialized slots, e.g. "select 1" over the table.  We can 
serve
-  // this query from just the file metadata.  We don't need to read the column 
data.
-  // Only scanner of the footer split will run in this case. See the logic in
-  // HdfsScanner::IssueFooterRanges() and 
HdfsScanNodeBase::ReadsFileMetadataOnly().
-  // We might also get here for count(*) query against full acid table such as:
-  // "select count(*) from functional_orc_def.alltypes;"
-  DCHECK(is_footer_scanner_);
-  int64_t file_rows = GetNumberOfRowsInFile();
-  COUNTER_ADD(num_file_metadata_read_, 1);
-  if (rows_read_in_group_ == file_rows) {
-    eos_ = true;
-    return Status::OK();
-  }
-  assemble_rows_timer_.Start();
-  DCHECK_LT(rows_read_in_group_, file_rows);
-  int64_t rows_remaining = file_rows - rows_read_in_group_;
-  int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining);
-  TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
-  int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
-  Status status = CommitRows(num_to_commit, row_batch);
-  assemble_rows_timer_.Stop();
-  RETURN_IF_ERROR(status);
-  rows_read_in_group_ += max_tuples;
-  return Status::OK();
-}
-
 void HdfsColumnarScanner::AddSyncReadBytesCounter(int64_t total_bytes) {
   io_sync_request_->Add(1);
   io_total_request_->Add(1);
diff --git a/be/src/exec/hdfs-columnar-scanner.h 
b/be/src/exec/hdfs-columnar-scanner.h
index af423cff8..472863bf2 100644
--- a/be/src/exec/hdfs-columnar-scanner.h
+++ b/be/src/exec/hdfs-columnar-scanner.h
@@ -60,25 +60,6 @@ class HdfsColumnarScanner : public HdfsScanner {
   /// top-level tuples. See AssembleRows() in the derived classes.
   boost::scoped_ptr<ScratchTupleBatch> scratch_batch_;
 
-  /// Timer for materializing rows.  This ignores time getting the next buffer.
-  ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
-
-  /// Index of the current row group / stripe being processed. Initialized to 
-1 which
-  /// indicates that we have not started processing the first group yet 
(GetNext() has
-  /// not yet been called).
-  int32_t group_idx_ = -1;
-
-  /// Counts the number of rows processed for the current row group / stripe.
-  int64_t rows_read_in_group_ = 0;
-
-  /// Indicates whether we should advance to the next row group / stripe in 
the next
-  /// GetNext(). Starts out as true to move to the very first row group.
-  bool advance_group_ = true;
-
-  /// Indicate whether this is a footer scanner or not.
-  /// Assigned in HdfsColumnarScanner::Open().
-  bool is_footer_scanner_ = false;
-
   /// Scan range for the metadata.
   const io::ScanRange* metadata_range_ = nullptr;
 
@@ -119,9 +100,6 @@ class HdfsColumnarScanner : public HdfsScanner {
   Status DivideReservationBetweenColumns(const ColumnRangeLengths& 
col_range_lengths,
       ColumnReservations& reservation_per_column);
 
-  /// Get the number of rows in file.
-  virtual int64_t GetNumberOfRowsInFile() const = 0;
-
   /// Helper for DivideReservationBetweenColumns(). Implements the core 
algorithm for
   /// dividing a reservation of 'reservation_to_distribute' bytes between 
columns with
   /// scan range lengths 'col_range_lengths' given a min and max buffer size. 
Returns
@@ -136,16 +114,6 @@ class HdfsColumnarScanner : public HdfsScanner {
   /// in ExecEnv.
   static int64_t ComputeIdealReservation(const ColumnRangeLengths& 
col_range_lengths);
 
-  /// Handle count(*) queries by reading the row count from the footer 
statistics.
-  /// The optimization is possible only in simpler cases e.g. when there are 
no conjucts.
-  /// Check ScanNode.java#canApplyCountStarOptimization for full detail.
-  Status GetNextWithCountStarOptimization(RowBatch* row_batch);
-
-  /// Handle zero slot scan queries by reading the row count from the footer 
statistics.
-  /// Possible queries include "select 1" or "select count(*)" over full acid 
table that
-  /// does not require row validation.
-  Status GetNextWithTemplateTuple(RowBatch* row_batch);
-
   /// Number of columns that need to be read.
   RuntimeProfile::Counter* num_cols_counter_;
 
@@ -174,11 +142,6 @@ class HdfsColumnarScanner : public HdfsScanner {
   /// Total number of bytes skipped during stream reading.
   RuntimeProfile::Counter* io_skipped_bytes_;
 
-  /// Total file metadata reads done.
-  /// Incremented when serving query from metadata instead of iterating rows or
-  /// row groups / stripes.
-  RuntimeProfile::Counter* num_file_metadata_read_;
-
  private:
   int ProcessScratchBatchCodegenOrInterpret(RowBatch* dst_batch);
 };
diff --git a/be/src/exec/hdfs-scan-node-base.cc 
b/be/src/exec/hdfs-scan-node-base.cc
index ca3fab885..96c5934f0 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -450,8 +450,9 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const 
HdfsScanPlanNode& pno
             hdfs_scan_node.skip_header_line_count :
             0),
     tuple_id_(pnode.tuple_id_),
-    count_star_slot_offset_(hdfs_scan_node.__isset.count_star_slot_offset ?
-            hdfs_scan_node.count_star_slot_offset :
+    parquet_count_star_slot_offset_(
+        hdfs_scan_node.__isset.parquet_count_star_slot_offset ?
+            hdfs_scan_node.parquet_count_star_slot_offset :
             -1),
     is_partition_key_scan_(hdfs_scan_node.is_partition_key_scan),
     tuple_desc_(pnode.tuple_desc_),
diff --git a/be/src/exec/hdfs-scan-node-base.h 
b/be/src/exec/hdfs-scan-node-base.h
index 56a8157aa..160d4d782 100644
--- a/be/src/exec/hdfs-scan-node-base.h
+++ b/be/src/exec/hdfs-scan-node-base.h
@@ -30,7 +30,6 @@
 #include <boost/unordered_map.hpp>
 
 #include "codegen/codegen-fn-ptr.h"
-#include "exec/acid-metadata-utils.h"
 #include "exec/file-metadata-utils.h"
 #include "exec/filter-context.h"
 #include "exec/scan-node.h"
@@ -464,12 +463,10 @@ class HdfsScanNodeBase : public ScanNode {
   const AvroSchemaElement& avro_schema() const { return avro_schema_; }
   int skip_header_line_count() const { return skip_header_line_count_; }
   io::RequestContext* reader_context() const { return reader_context_.get(); }
-  bool optimize_count_star() const {
-    bool is_optimized = count_star_slot_offset_ != -1;
-    DCHECK(!hdfs_table_->IsTableFullAcid() || !is_optimized);
-    return is_optimized;
+  bool optimize_parquet_count_star() const {
+    return parquet_count_star_slot_offset_ != -1;
   }
-  int count_star_slot_offset() const { return count_star_slot_offset_; }
+  int parquet_count_star_slot_offset() const { return 
parquet_count_star_slot_offset_; }
   bool is_partition_key_scan() const { return is_partition_key_scan_; }
 
   typedef std::unordered_map<TupleId, std::vector<ScalarExprEvaluator*>>
@@ -601,32 +598,6 @@ class HdfsScanNodeBase : public ScanNode {
         virtual_column_slots().empty();
   }
 
-  /// Return true if scan over 'filename' require row validation.
-  /// Hive Streaming Ingestion allocates multiple write ids, hence create delta
-  /// directories like delta_5_10. Then it continuously appends new stripes 
(and footers)
-  /// to the ORC files of the delte dir. So it's possible that the file has 
rows that
-  /// Impala is not allowed to see based on its valid write id list. In such 
cases we need
-  /// to validate the write ids of the row batches.
-  inline bool RequireRowValidation(std::string filename) const {
-    if (!hdfs_table()->IsTableFullAcid()) return false;
-    if (ValidWriteIdList::IsCompacted(filename)) return false;
-    ValidWriteIdList valid_write_ids;
-    std::pair<int64_t, int64_t> acid_write_id_range =
-        valid_write_ids.GetWriteIdRange(filename);
-    valid_write_ids.InitFrom(hdfs_table()->ValidWriteIdList());
-    ValidWriteIdList::RangeResponse rows_valid = 
valid_write_ids.IsWriteIdRangeValid(
-        acid_write_id_range.first, acid_write_id_range.second);
-    DCHECK_NE(rows_valid, ValidWriteIdList::NONE);
-    return rows_valid == ValidWriteIdList::SOME;
-  }
-
-  /// Return true if scan over 'filename 'can be served only by reading the 
file metadata,
-  /// such as a count(*) over the table.
-  inline bool ReadsFileMetadataOnly(std::string filename) const {
-    return !RequireRowValidation(filename)
-        && (IsZeroSlotTableScan() || optimize_count_star());
-  }
-
   /// Transfers all memory from 'pool' to shared state of all scanners.
   void TransferToSharedStatePool(MemPool* pool);
 
@@ -715,11 +686,11 @@ class HdfsScanNodeBase : public ScanNode {
   /// Tuple id of the tuple descriptor to be used.
   const int tuple_id_;
 
-  /// The byte offset of the slot for Parquet/ORC metadata if count star 
optimization
+  /// The byte offset of the slot for Parquet metadata if Parquet count star 
optimization
   /// is enabled. When set, this scan node can optimize a count(*) query by 
populating
   /// the tuple with data from the Parquet num rows statistic. See
-  /// applyCountStarOptimization() in ScanNode.java.
-  const int count_star_slot_offset_;
+  /// applyParquetCountStartOptimization() in HdfsScanNode.java.
+  const int parquet_count_star_slot_offset_;
 
   // True if this is a partition key scan that needs only to return at least 
one row from
   // each scan range. If true, the scan node and scanner implementations 
should attempt
diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc
index 49f360ed1..de3807bec 100644
--- a/be/src/exec/hdfs-scanner.cc
+++ b/be/src/exec/hdfs-scanner.cc
@@ -873,13 +873,12 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* 
scan_node,
       ScanRange* split = files[i]->splits[j];
 
       DCHECK_LE(split->offset() + split->len(), files[i]->file_length);
-      // If scan only reads file metadata (such as count(*) over the table), 
we can
+      // If there are no materialized slots (such as count(*) over the table), 
we can
       // get the result with the file metadata alone and don't need to read 
any row
       // groups. We only want a single node to process the file footer in this 
case,
       // which is the node with the footer split.  If it's not a count(*), we 
create a
       // footer range for the split always.
-      if (!scan_node->ReadsFileMetadataOnly(files[i]->filename)
-          || footer_split == split) {
+      if (!scan_node->IsZeroSlotTableScan() || footer_split == split) {
         ScanRangeMetadata* split_metadata =
             static_cast<ScanRangeMetadata*>(split->meta_data());
         // Each split is processed by first issuing a scan range for the file 
footer, which
diff --git a/be/src/exec/orc/hdfs-orc-scanner.cc 
b/be/src/exec/orc/hdfs-orc-scanner.cc
index a4193dca5..595f2d60a 100644
--- a/be/src/exec/orc/hdfs-orc-scanner.cc
+++ b/be/src/exec/orc/hdfs-orc-scanner.cc
@@ -311,7 +311,8 @@ HdfsOrcScanner::HdfsOrcScanner(HdfsScanNodeBase* scan_node, 
RuntimeState* state)
   : HdfsColumnarScanner(scan_node, state),
     dictionary_pool_(new MemPool(scan_node->mem_tracker())),
     data_batch_pool_(new MemPool(scan_node->mem_tracker())),
-    search_args_pool_(new MemPool(scan_node->mem_tracker())) {
+    search_args_pool_(new MemPool(scan_node->mem_tracker())),
+    assemble_rows_timer_(scan_node_->materialize_tuple_timer()) {
   assemble_rows_timer_.Stop();
 }
 
@@ -405,9 +406,9 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
     row_batches_need_validation_ = rows_valid == ValidWriteIdList::SOME;
   }
 
-  if (scan_node_->optimize_count_star() && !row_batches_need_validation_) {
-    template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()];
-    return Status::OK();
+  if (UNLIKELY(scan_node_->optimize_parquet_count_star())) {
+    DCHECK(false);
+    return Status("Internal ERROR: ORC scanner cannot optimize count star 
slot.");
   }
 
   // Update 'row_reader_options_' based on the tuple descriptor so the ORC lib 
can skip
@@ -777,20 +778,29 @@ Status HdfsOrcScanner::ProcessSplit() {
 }
 
 Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
-  if (row_batches_need_validation_) {
-    // In case 'row_batches_need_validation_' is true, we need to look at the 
row
-    // batches and check their validity. This might be a zero slot scan, which
-    // 'currentTransaction' is the only selected field from the file. And this 
should
-    // not be an optimized count(*) because it is disabled for full acid table.
-    DCHECK(!scan_node_->optimize_count_star());
-  } else if (scan_node_->optimize_count_star()) {
-    // This is an optimized count(*) case.
-    // For each file, populate one slot with the footer's numberOfRows 
statistic.
-    return GetNextWithCountStarOptimization(row_batch);
-  } else if (scan_node_->IsZeroSlotTableScan()) {
-    // There are no materialized slots, e.g. "select 1" over the table.  We 
can serve
+  // In case 'row_batches_need_validation_' is true, we need to look at the row
+  // batches and check their validity. In that case 'currentTransaction' is 
the only
+  // selected field from the file (in case of zero slot scans).
+  if (scan_node_->IsZeroSlotTableScan() && !row_batches_need_validation_) {
+    uint64_t file_rows = reader_->getNumberOfRows();
+    // There are no materialized slots, e.g. count(*) over the table.  We can 
serve
     // this query from just the file metadata.  We don't need to read the 
column data.
-    return GetNextWithTemplateTuple(row_batch);
+    if (stripe_rows_read_ == file_rows) {
+      eos_ = true;
+      return Status::OK();
+    }
+    assemble_rows_timer_.Start();
+    DCHECK_LT(stripe_rows_read_, file_rows);
+    int64_t rows_remaining = file_rows - stripe_rows_read_;
+    int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining);
+    TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
+    int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
+    Status status = CommitRows(num_to_commit, row_batch);
+    assemble_rows_timer_.Stop();
+    RETURN_IF_ERROR(status);
+    stripe_rows_read_ += max_tuples;
+    COUNTER_ADD(scan_node_->rows_read_counter(), num_to_commit);
+    return Status::OK();
   }
 
   if (!scratch_batch_->AtEnd()) {
@@ -822,7 +832,7 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) 
{
   // 'advance_stripe_' is updated in 'NextStripe', meaning the current stripe 
we advance
   // to can be skip. 'end_of_stripe_' marks whether current stripe is drained. 
It's only
   // set to true in 'AssembleRows'.
-  while (advance_group_ || end_of_stripe_) {
+  while (advance_stripe_ || end_of_stripe_) {
     // The next stripe will use a new dictionary blob so transfer the memory 
to row_batch.
     row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false);
     context_->ReleaseCompletedResources(/* done */ true);
@@ -830,8 +840,8 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) 
{
     RETURN_IF_ERROR(CommitRows(0, row_batch));
 
     RETURN_IF_ERROR(NextStripe());
-    DCHECK_LE(group_idx_, reader_->getNumberOfStripes());
-    if (group_idx_ == reader_->getNumberOfStripes()) {
+    DCHECK_LE(stripe_idx_, reader_->getNumberOfStripes());
+    if (stripe_idx_ == reader_->getNumberOfStripes()) {
       eos_ = true;
       DCHECK(parse_status_.ok());
       return Status::OK();
@@ -873,19 +883,19 @@ Status HdfsOrcScanner::NextStripe() {
   int64_t split_offset = split_range->offset();
   int64_t split_length = split_range->len();
 
-  bool start_with_first_stripe = group_idx_ == -1;
+  bool start_with_first_stripe = stripe_idx_ == -1;
   bool misaligned_stripe_skipped = false;
 
-  advance_group_ = false;
-  rows_read_in_group_ = 0;
+  advance_stripe_ = false;
+  stripe_rows_read_ = 0;
 
   // Loop until we have found a non-empty stripe.
   while (true) {
     // Reset the parse status for the next stripe.
     parse_status_ = Status::OK();
 
-    ++group_idx_;
-    if (group_idx_ >= reader_->getNumberOfStripes()) {
+    ++stripe_idx_;
+    if (stripe_idx_ >= reader_->getNumberOfStripes()) {
       if (start_with_first_stripe && misaligned_stripe_skipped) {
         // We started with the first stripe and skipped all the stripes 
because they were
         // misaligned. The execution flow won't reach this point if there is 
at least one
@@ -894,7 +904,7 @@ Status HdfsOrcScanner::NextStripe() {
       }
       break;
     }
-    unique_ptr<orc::StripeInformation> stripe = reader_->getStripe(group_idx_);
+    unique_ptr<orc::StripeInformation> stripe = 
reader_->getStripe(stripe_idx_);
     // Also check 'footer_.numberOfRows' to make sure 'select count(*)' and 
'select *'
     // behave consistently for corrupt files that have 'footer_.numberOfRows 
== 0'
     // but some data in stripe.
@@ -967,7 +977,7 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) {
     if (row_batch->AtCapacity()) break;
     continue_execution &= !scan_node_->ReachedLimitShared() && 
!context_->cancelled();
   }
-  rows_read_in_group_ += num_rows_read;
+  stripe_rows_read_ += num_rows_read;
   COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
   // Merge Scanner-local counter into HdfsScanNode counter and reset.
   COUNTER_ADD(scan_node_->collection_items_read_counter(), 
coll_items_read_counter_);
diff --git a/be/src/exec/orc/hdfs-orc-scanner.h 
b/be/src/exec/orc/hdfs-orc-scanner.h
index 7383f4942..bf52787bc 100644
--- a/be/src/exec/orc/hdfs-orc-scanner.h
+++ b/be/src/exec/orc/hdfs-orc-scanner.h
@@ -185,11 +185,6 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
     return THdfsFileFormat::ORC;
   }
 
- protected:
-  virtual int64_t GetNumberOfRowsInFile() const override {
-    return static_cast<int64_t>(reader_->getNumberOfRows());
-  }
-
  private:
   friend class OrcColumnReader;
   friend class OrcDateColumnReader;
@@ -200,10 +195,23 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   friend class OrcStructReader;
   friend class OrcListReader;
   friend class OrcMapReader;
+  friend class HdfsOrcScannerTest;
 
   /// Memory guard of the tuple_mem_
   uint8_t* tuple_mem_end_ = nullptr;
 
+  /// Index of the current stripe being processed. Stripe in ORC is equivalent 
to
+  /// RowGroup in Parquet. Initialized to -1 which indicates that we have not 
started
+  /// processing the first stripe yet (GetNext() has not yet been called).
+  int32_t stripe_idx_ = -1;
+
+  /// Counts the number of rows processed for the current stripe.
+  int64_t stripe_rows_read_ = 0;
+
+  /// Indicates whether we should advance to the next stripe in the next 
GetNext().
+  /// Starts out as true to move to the very first stripe.
+  bool advance_stripe_ = true;
+
   /// Indicates whether we are at the end of a stripe.
   bool end_of_stripe_ = true;
 
@@ -288,6 +296,9 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
   /// offset, and there are no two overlapping range.
   vector<ColumnRange> columnRanges_;
 
+  /// Timer for materializing rows. This ignores time getting the next buffer.
+  ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
+
   /// Number of stripes that need to be read.
   RuntimeProfile::Counter* num_stripes_counter_ = nullptr;
 
diff --git a/be/src/exec/orc/orc-column-readers.h 
b/be/src/exec/orc/orc-column-readers.h
index f67e487de..3adba8049 100644
--- a/be/src/exec/orc/orc-column-readers.h
+++ b/be/src/exec/orc/orc-column-readers.h
@@ -337,8 +337,8 @@ class OrcStringColumnReader : public 
OrcPrimitiveColumnReader<OrcStringColumnRea
     }
     DCHECK(static_cast<orc::EncodedStringVectorBatch*>(batch_) ==
         dynamic_cast<orc::EncodedStringVectorBatch*>(orc_batch));
-    if (last_stripe_idx_ != scanner_->group_idx_) {
-      last_stripe_idx_ = scanner_->group_idx_;
+    if (last_stripe_idx_ != scanner_->stripe_idx_) {
+      last_stripe_idx_ = scanner_->stripe_idx_;
       auto current_batch = static_cast<orc::EncodedStringVectorBatch*>(batch_);
       return InitBlob(&current_batch->dictionary->dictionaryBlob,
           scanner_->dictionary_pool_.get());
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc 
b/be/src/exec/parquet/hdfs-parquet-scanner.cc
index 13242a7e9..d540a3f11 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.cc
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc
@@ -86,10 +86,14 @@ Status 
HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
 
 HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, 
RuntimeState* state)
   : HdfsColumnarScanner(scan_node, state),
+    row_group_idx_(-1),
+    row_group_rows_read_(0),
+    advance_row_group_(true),
     min_max_tuple_(nullptr),
     row_batches_produced_(0),
     dictionary_pool_(new MemPool(scan_node->mem_tracker())),
     stats_batch_read_pool_(new MemPool(scan_node->mem_tracker())),
+    assemble_rows_timer_(scan_node_->materialize_tuple_timer()),
     num_stats_filtered_row_groups_counter_(nullptr),
     num_minmax_filtered_row_groups_counter_(nullptr),
     num_bloom_filtered_row_groups_counter_(nullptr),
@@ -379,7 +383,7 @@ static bool CheckRowGroupOverlapsSplit(const 
parquet::RowGroup& row_group,
 
 int HdfsParquetScanner::CountScalarColumns(
     const vector<ParquetColumnReader*>& column_readers) {
-  DCHECK(!column_readers.empty() || scan_node_->optimize_count_star());
+  DCHECK(!column_readers.empty() || scan_node_->optimize_parquet_count_star());
   int num_columns = 0;
   stack<ParquetColumnReader*> readers;
   for (ParquetColumnReader* r: column_readers_) readers.push(r);
@@ -429,15 +433,55 @@ Status HdfsParquetScanner::ProcessSplit() {
 
 Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) {
   DCHECK(parse_status_.ok()) << parse_status_.GetDetail();
-  if (scan_node_->optimize_count_star()) {
-    // This is an optimized count(*) case.
+  if (scan_node_->optimize_parquet_count_star()) {
     // Populate the single slot with the Parquet num rows statistic.
-    return GetNextWithCountStarOptimization(row_batch);
+    int64_t tuple_buf_size;
+    uint8_t* tuple_buf;
+    // We try to allocate a smaller row batch here because in most cases the 
number row
+    // groups in a file is much lower than the default row batch capacity.
+    int capacity = min(
+        static_cast<int>(file_metadata_.row_groups.size()), 
row_batch->capacity());
+    RETURN_IF_ERROR(RowBatch::ResizeAndAllocateTupleBuffer(state_,
+        row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(),
+        &capacity, &tuple_buf_size, &tuple_buf));
+    while (!row_batch->AtCapacity()) {
+      RETURN_IF_ERROR(NextRowGroup());
+      DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size());
+      DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows);
+      if (row_group_idx_ == file_metadata_.row_groups.size()) break;
+      Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf);
+      TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow());
+      InitTuple(template_tuple_, dst_tuple);
+      int64_t* dst_slot =
+          
dst_tuple->GetBigIntSlot(scan_node_->parquet_count_star_slot_offset());
+      *dst_slot = file_metadata_.row_groups[row_group_idx_].num_rows;
+      row_group_rows_read_ += *dst_slot;
+      dst_row->SetTuple(0, dst_tuple);
+      row_batch->CommitLastRow();
+      tuple_buf += scan_node_->tuple_desc()->byte_size();
+    }
+    eos_ = row_group_idx_ == file_metadata_.row_groups.size();
+    return Status::OK();
   } else if (scan_node_->IsZeroSlotTableScan()) {
     // There are no materialized slots and we are not optimizing count(*), e.g.
     // "select 1 from alltypes". We can serve this query from just the file 
metadata.
     // We don't need to read the column data.
-    return GetNextWithTemplateTuple(row_batch);
+    if (row_group_rows_read_ == file_metadata_.num_rows) {
+      eos_ = true;
+      return Status::OK();
+    }
+    assemble_rows_timer_.Start();
+    DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows);
+    int64_t rows_remaining = file_metadata_.num_rows - row_group_rows_read_;
+    int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining);
+    TupleRow* current_row = row_batch->GetRow(row_batch->AddRow());
+    int num_to_commit = WriteTemplateTuples(current_row, max_tuples);
+    Status status = CommitRows(row_batch, num_to_commit);
+    assemble_rows_timer_.Stop();
+    RETURN_IF_ERROR(status);
+    row_group_rows_read_ += max_tuples;
+    COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples);
+    return Status::OK();
   }
 
   // Transfer remaining tuples from the scratch batch.
@@ -449,18 +493,18 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* 
row_batch) {
     if (row_batch->AtCapacity()) return Status::OK();
   }
 
-  while (advance_group_ || column_readers_[0]->RowGroupAtEnd()) {
+  while (advance_row_group_ || column_readers_[0]->RowGroupAtEnd()) {
     // Transfer resources and clear streams if there is any leftover from the 
previous
     // row group. We will create new streams for the next row group.
     FlushRowGroupResources(row_batch);
-    if (!advance_group_) {
+    if (!advance_row_group_) {
       Status status =
-          ValidateEndOfRowGroup(column_readers_, group_idx_, 
rows_read_in_group_);
+          ValidateEndOfRowGroup(column_readers_, row_group_idx_, 
row_group_rows_read_);
       if (!status.ok()) 
RETURN_IF_ERROR(state_->LogOrReturnError(status.msg()));
     }
     RETURN_IF_ERROR(NextRowGroup());
-    DCHECK_LE(group_idx_, file_metadata_.row_groups.size());
-    if (group_idx_ == file_metadata_.row_groups.size()) {
+    DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size());
+    if (row_group_idx_ == file_metadata_.row_groups.size()) {
       eos_ = true;
       DCHECK(parse_status_.ok());
       return Status::OK();
@@ -481,9 +525,9 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* 
row_batch) {
   assemble_rows_timer_.Start();
   Status status;
   if (filter_pages_) {
-    status = AssembleRows<true>(row_batch, &advance_group_);
+    status = AssembleRows<true>(row_batch, &advance_row_group_);
   } else {
-    status = AssembleRows<false>(row_batch, &advance_group_);
+    status = AssembleRows<false>(row_batch, &advance_row_group_);
   }
   assemble_rows_timer_.Stop();
   RETURN_IF_ERROR(status);
@@ -819,11 +863,11 @@ Status HdfsParquetScanner::NextRowGroup() {
   const HdfsFileDesc* file_desc =
       scan_node_->GetFileDesc(context_->partition_descriptor()->id(), 
filename());
 
-  bool start_with_first_row_group = group_idx_ == -1;
+  bool start_with_first_row_group = row_group_idx_ == -1;
   bool misaligned_row_group_skipped = false;
 
-  advance_group_ = false;
-  rows_read_in_group_ = 0;
+  advance_row_group_ = false;
+  row_group_rows_read_ = 0;
 
   // Loop until we have found a non-empty row group, and successfully 
initialized and
   // seeded the column readers. Return a non-OK status from within loop only 
if the error
@@ -835,8 +879,8 @@ Status HdfsParquetScanner::NextRowGroup() {
     // or previous row groups.
     DCHECK_EQ(0, context_->NumStreams());
 
-    ++group_idx_;
-    if (group_idx_ >= file_metadata_.row_groups.size()) {
+    ++row_group_idx_;
+    if (row_group_idx_ >= file_metadata_.row_groups.size()) {
       if (start_with_first_row_group && misaligned_row_group_skipped) {
         // We started with the first row group and skipped all the row groups 
because
         // they were misaligned. The execution flow won't reach this point if 
there is at
@@ -845,7 +889,7 @@ Status HdfsParquetScanner::NextRowGroup() {
       }
       break;
     }
-    const parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_];
+    const parquet::RowGroup& row_group = 
file_metadata_.row_groups[row_group_idx_];
     // Also check 'file_metadata_.num_rows' to make sure 'select count(*)' and 
'select *'
     // behave consistently for corrupt files that have 
'file_metadata_.num_rows == 0'
     // but some data in row groups.
@@ -854,7 +898,7 @@ Status HdfsParquetScanner::NextRowGroup() {
     // Let's find the index of the first row in this row group. It's needed to 
track the
     // file position of each row.
     int64_t row_group_first_row = 0;
-    for (int i = 0; i < group_idx_; ++i) {
+    for (int i = 0; i < row_group_idx_; ++i) {
       const parquet::RowGroup& row_group = file_metadata_.row_groups[i];
       row_group_first_row += row_group.num_rows;
     }
@@ -1280,7 +1324,7 @@ Status HdfsParquetScanner::ProcessPageIndex() {
   MonotonicStopWatch single_process_page_index_timer;
   single_process_page_index_timer.Start();
   ResetPageFiltering();
-  RETURN_IF_ERROR(page_index_.ReadAll(group_idx_));
+  RETURN_IF_ERROR(page_index_.ReadAll(row_group_idx_));
   if (page_index_.IsEmpty()) return Status::OK();
   // We can release the raw page index buffer when we exit this function.
   const auto scope_exit = 
MakeScopeExitTrigger([this](){page_index_.Release();});
@@ -1404,7 +1448,7 @@ Status 
HdfsParquetScanner::FindSkipRangesForPagesWithMinMaxFilters(
   }
 
   min_max_tuple_->Init(min_max_tuple_desc->byte_size());
-  parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_];
+  parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
 
   int filtered_pages = 0;
 
@@ -1507,7 +1551,7 @@ Status 
HdfsParquetScanner::FindSkipRangesForPagesWithMinMaxFilters(
 }
 
 Status HdfsParquetScanner::EvaluatePageIndex() {
-  parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_];
+  parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
   vector<RowRange> skip_ranges;
 
   for (int i = 0; i < stats_conjunct_evals_.size(); ++i) {
@@ -1597,7 +1641,7 @@ Status HdfsParquetScanner::EvaluatePageIndex() {
 Status HdfsParquetScanner::ComputeCandidatePagesForColumns() {
   if (candidate_ranges_.empty()) return Status::OK();
 
-  parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_];
+  parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
   for (BaseScalarColumnReader* scalar_reader : scalar_readers_) {
     const auto& page_locations = scalar_reader->offset_index_.page_locations;
     if (!ComputeCandidatePages(page_locations, candidate_ranges_, 
row_group.num_rows,
@@ -2186,8 +2230,7 @@ Status HdfsParquetScanner::ProcessBloomFilter(const 
parquet::RowGroup&
       if (!bloom_filter.Find(hash)) {
         *skip_row_group = true;
         VLOG(3) << Substitute("Row group with idx $0 filtered by Parquet Bloom 
filter on "
-                              "column with idx $1 in file $2.",
-            group_idx_, col_idx, filename());
+            "column with idx $1 in file $2.", row_group_idx_, col_idx, 
filename());
         return Status::OK();
       }
     }
@@ -2261,7 +2304,7 @@ Status 
HdfsParquetScanner::AssembleRowsWithoutLateMaterialization(
     RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit));
     if (row_batch->AtCapacity()) break;
   }
-  rows_read_in_group_ += num_rows_read;
+  row_group_rows_read_ += num_rows_read;
   COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
   // Merge Scanner-local counter into HdfsScanNode counter and reset.
   COUNTER_ADD(scan_node_->collection_items_read_counter(), 
coll_items_read_counter_);
@@ -2388,7 +2431,7 @@ Status HdfsParquetScanner::AssembleRows(RowBatch* 
row_batch, bool* skip_row_grou
       break;
     }
   }
-  rows_read_in_group_ += num_rows_read;
+  row_group_rows_read_ += num_rows_read;
   COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read);
   // Merge Scanner-local counter into HdfsScanNode counter and reset.
   COUNTER_ADD(scan_node_->collection_items_read_counter(), 
coll_items_read_counter_);
@@ -2788,7 +2831,7 @@ Status HdfsParquetScanner::CreateColumnReaders(const 
TupleDescriptor& tuple_desc
   DCHECK(column_readers != nullptr);
   DCHECK(column_readers->empty());
 
-  if (scan_node_->optimize_count_star()) {
+  if (scan_node_->optimize_parquet_count_star()) {
     // Column readers are not needed because we are not reading from any 
columns if this
     // optimization is enabled.
     return Status::OK();
@@ -2955,7 +2998,7 @@ Status HdfsParquetScanner::InitScalarColumns(int64_t 
row_group_first_row) {
   int64_t partition_id = context_->partition_descriptor()->id();
   const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, 
filename());
   DCHECK(file_desc != nullptr);
-  parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_];
+  parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_];
 
   // Used to validate that the number of values in each reader in 
column_readers_ at the
   // same SchemaElement is the same.
@@ -2975,7 +3018,7 @@ Status HdfsParquetScanner::InitScalarColumns(int64_t 
row_group_first_row) {
       return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, 
scalar_reader->col_idx(),
           col_chunk.meta_data.num_values, num_values, filename());
     }
-    RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, group_idx_,
+    RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, row_group_idx_,
         row_group_first_row));
   }
 
diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h 
b/be/src/exec/parquet/hdfs-parquet-scanner.h
index 5a0474b5a..5f5c038da 100644
--- a/be/src/exec/parquet/hdfs-parquet-scanner.h
+++ b/be/src/exec/parquet/hdfs-parquet-scanner.h
@@ -50,7 +50,7 @@ class ParquetColumnReader;
 class ParquetPageReader;
 template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
 class ScalarColumnReader;
-
+class BoolColumnReader;
 
 /// This scanner parses Parquet files located in HDFS, and writes the content 
as tuples in
 /// the Impala in-memory representation of data, e.g.  (tuples, rows, row 
batches).
@@ -394,22 +394,30 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
       "You can increase PARQUET_FOOTER_SIZE if you want, "
       "just don't forget to increase READ_SIZE_MIN_VALUE as well.");
 
- protected:
-  virtual int64_t GetNumberOfRowsInFile() const override {
-    return file_metadata_.num_rows;
-  }
-
  private:
   friend class ParquetColumnReader;
   friend class CollectionColumnReader;
   friend class BaseScalarColumnReader;
   template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool 
MATERIALIZED>
   friend class ScalarColumnReader;
+  friend class BoolColumnReader;
   friend class HdfsParquetScannerTest;
   friend class ParquetPageIndex;
   friend class ParquetColumnChunkReader;
   friend class ParquetPageReader;
 
+  /// Index of the current row group being processed. Initialized to -1 which 
indicates
+  /// that we have not started processing the first row group yet (GetNext() 
has not yet
+  /// been called).
+  int32_t row_group_idx_;
+
+  /// Counts the number of rows processed for the current row group.
+  int64_t row_group_rows_read_;
+
+  /// Indicates whether we should advance to the next row group in the next 
GetNext().
+  /// Starts out as true to move to the very first row group.
+  bool advance_row_group_;
+
   boost::scoped_ptr<ParquetSchemaResolver> schema_resolver_;
 
   /// Tuple to hold values when reading parquet::Statistics. Owned by 
perm_pool_.
@@ -492,6 +500,9 @@ class HdfsParquetScanner : public HdfsColumnarScanner {
   /// perm_pool_.
   std::unordered_map<const TupleDescriptor*, Tuple*> dict_filter_tuple_map_;
 
+  /// Timer for materializing rows.  This ignores time getting the next buffer.
+  ScopedTimer<MonotonicStopWatch> assemble_rows_timer_;
+
   /// Average and min/max time spent processing the page index for each row 
group.
   RuntimeProfile::SummaryStatsCounter* process_page_index_stats_;
 
diff --git a/be/src/exec/parquet/parquet-column-readers.h 
b/be/src/exec/parquet/parquet-column-readers.h
index d461c94e7..76e0825b3 100644
--- a/be/src/exec/parquet/parquet-column-readers.h
+++ b/be/src/exec/parquet/parquet-column-readers.h
@@ -600,7 +600,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   int64_t LastRowIdxInCurrentPage() const {
     DCHECK(!candidate_data_pages_.empty());
     int64_t num_rows =
-        parent_->file_metadata_.row_groups[parent_->group_idx_].num_rows;
+        parent_->file_metadata_.row_groups[parent_->row_group_idx_].num_rows;
     // Find the next valid page.
     int page_idx = candidate_data_pages_[candidate_page_idx_] + 1;
     while (page_idx < offset_index_.page_locations.size()) {
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index 6e8e370be..6e5d8886c 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -323,8 +323,9 @@ struct THdfsScanNode {
   // The conjuncts that are eligible for dictionary filtering.
   9: optional map<Types.TSlotId, list<i32>> dictionary_filter_conjuncts
 
-  // The byte offset of the slot for counter if count star optimization is 
enabled.
-  10: optional i32 count_star_slot_offset
+  // The byte offset of the slot for Parquet metadata if Parquet count star 
optimization
+  // is enabled.
+  10: optional i32 parquet_count_star_slot_offset
 
   // If true, the backend only needs to return one row per partition.
   11: optional bool is_partition_key_scan
diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java 
b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
index 71eab659c..ea48a3bc9 100644
--- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
+++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java
@@ -89,7 +89,6 @@ import org.apache.impala.thrift.TScanRangeLocationList;
 import org.apache.impala.thrift.TScanRangeSpec;
 import org.apache.impala.thrift.TSortingOrder;
 import org.apache.impala.thrift.TTableStats;
-import org.apache.impala.util.AcidUtils;
 import org.apache.impala.util.BitUtil;
 import org.apache.impala.util.ExecutorMembershipSnapshot;
 import org.apache.impala.util.MathUtil;
@@ -335,8 +334,6 @@ public class HdfsScanNode extends ScanNode {
   // Used only to display EXPLAIN information.
   private final List<Expr> partitionConjuncts_;
 
-  private boolean isFullAcidTable_ = false;
-
   /**
    * Construct a node to scan given data files into tuples described by 'desc',
    * with 'conjuncts' being the unevaluated conjuncts bound by the tuple and
@@ -357,8 +354,6 @@ public class HdfsScanNode extends ScanNode {
     tableNumRowsHint_ = hdfsTblRef.getTableNumRowsHint();
     FeFsTable hdfsTable = (FeFsTable)hdfsTblRef.getTable();
     Preconditions.checkState(tbl_ == hdfsTable);
-    isFullAcidTable_ =
-        
AcidUtils.isFullAcidTable(hdfsTable.getMetaStoreTable().getParameters());
     StringBuilder error = new StringBuilder();
     aggInfo_ = aggInfo;
     skipHeaderLineCount_ = tbl_.parseSkipHeaderLineCount(error);
@@ -403,14 +398,13 @@ public class HdfsScanNode extends ScanNode {
   }
 
   /**
-   * Returns true if the count(*) optimization can be applied to the query 
block
+   * Returns true if the Parquet count(*) optimization can be applied to the 
query block
    * of this scan node.
    */
   protected boolean canApplyCountStarOptimization(Analyzer analyzer,
       Set<HdfsFileFormat> fileFormats) {
     if (fileFormats.size() != 1) return false;
-    if (isFullAcidTable_) return false;
-    if (!hasParquet(fileFormats) && !hasOrc(fileFormats)) return false;
+    if (!hasParquet(fileFormats)) return false;
     return canApplyCountStarOptimization(analyzer);
   }
 
@@ -1563,13 +1557,6 @@ public class HdfsScanNode extends ScanNode {
           numRangesAdjusted :
           Math.min(inputCardinality_, numRangesAdjusted);
     }
-
-    if (countStarSlot_ != null) {
-      // We are doing optimized count star. Override cardinality with total 
num files.
-      long totalFiles = sumValues(totalFilesPerFs_);
-      inputCardinality_ = totalFiles;
-      cardinality_ = totalFiles;
-    }
     if (LOG.isTraceEnabled()) {
       LOG.trace("HdfsScan: cardinality_=" + Long.toString(cardinality_));
     }
@@ -1867,7 +1854,8 @@ public class HdfsScanNode extends ScanNode {
     msg.hdfs_scan_node.setUse_mt_scan_node(useMtScanNode_);
     Preconditions.checkState((optimizedAggSmap_ == null) == (countStarSlot_ == 
null));
     if (countStarSlot_ != null) {
-      
msg.hdfs_scan_node.setCount_star_slot_offset(countStarSlot_.getByteOffset());
+      msg.hdfs_scan_node.setParquet_count_star_slot_offset(
+          countStarSlot_.getByteOffset());
     }
     if (!statsConjuncts_.isEmpty()) {
       for (Expr e: statsConjuncts_) {
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java 
b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index a3b91eaf5..c60cefc3c 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -1343,11 +1343,6 @@ public class PlannerTest extends PlannerTestBase {
     runPlannerTestFile("tpcds-dist-method", "tpcds");
   }
 
-  @Test
-  public void testOrcStatsAgg() {
-    runPlannerTestFile("orc-stats-agg");
-  }
-
   /**
    * Test new hint of 'TABLE_NUM_ROWS'
    */
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/orc-stats-agg.test 
b/testdata/workloads/functional-planner/queries/PlannerTest/orc-stats-agg.test
deleted file mode 100644
index 00d4caca8..000000000
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/orc-stats-agg.test
+++ /dev/null
@@ -1,426 +0,0 @@
-# Verify that that the ORC count(*) optimization is applied in all count(*) or
-# count(<literal>) cases when scanning a ORC table. In the last case, we are 
scanning
-# a text table, so the optimization is not applied. The optimization is 
observed when
-# the cardinality of the ORC scan (24) is the same as # the # of files (24).
-select count(*) from functional_orc_def.uncomp_src_alltypes
-union all
-select count(1) from functional_orc_def.uncomp_src_alltypes
-union all
-select count(123) from functional_orc_def.uncomp_src_alltypes
-union all
-select count(*) from functional.alltypes
----- PLAN
-PLAN-ROOT SINK
-|
-00:UNION
-|  pass-through-operands: all
-|  row-size=8B cardinality=4
-|
-|--08:AGGREGATE [FINALIZE]
-|  |  output: count(*)
-|  |  row-size=8B cardinality=1
-|  |
-|  07:SCAN HDFS [functional.alltypes]
-|     HDFS partitions=24/24 files=24 size=478.45KB
-|     row-size=0B cardinality=7.30K
-|
-|--06:AGGREGATE [FINALIZE]
-|  |  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
-|  |  row-size=8B cardinality=1
-|  |
-|  05:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-|     HDFS partitions=24/24 files=24 size=205.47KB
-|     row-size=4B cardinality=24
-|
-|--04:AGGREGATE [FINALIZE]
-|  |  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
-|  |  row-size=8B cardinality=1
-|  |
-|  03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-|     HDFS partitions=24/24 files=24 size=205.47KB
-|     row-size=4B cardinality=24
-|
-02:AGGREGATE [FINALIZE]
-|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
-|  row-size=8B cardinality=1
-|
-01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   HDFS partitions=24/24 files=24 size=205.47KB
-   row-size=4B cardinality=24
----- DISTRIBUTEDPLAN
-PLAN-ROOT SINK
-|
-00:UNION
-|  pass-through-operands: all
-|  row-size=8B cardinality=4
-|
-|--16:AGGREGATE [FINALIZE]
-|  |  output: count:merge(*)
-|  |  row-size=8B cardinality=1
-|  |
-|  15:EXCHANGE [UNPARTITIONED]
-|  |
-|  08:AGGREGATE
-|  |  output: count(*)
-|  |  row-size=8B cardinality=1
-|  |
-|  07:SCAN HDFS [functional.alltypes]
-|     HDFS partitions=24/24 files=24 size=478.45KB
-|     row-size=0B cardinality=7.30K
-|
-|--14:AGGREGATE [FINALIZE]
-|  |  output: count:merge(*)
-|  |  row-size=8B cardinality=1
-|  |
-|  13:EXCHANGE [UNPARTITIONED]
-|  |
-|  06:AGGREGATE
-|  |  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
-|  |  row-size=8B cardinality=1
-|  |
-|  05:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-|     HDFS partitions=24/24 files=24 size=205.47KB
-|     row-size=4B cardinality=24
-|
-|--12:AGGREGATE [FINALIZE]
-|  |  output: count:merge(*)
-|  |  row-size=8B cardinality=1
-|  |
-|  11:EXCHANGE [UNPARTITIONED]
-|  |
-|  04:AGGREGATE
-|  |  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
-|  |  row-size=8B cardinality=1
-|  |
-|  03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-|     HDFS partitions=24/24 files=24 size=205.47KB
-|     row-size=4B cardinality=24
-|
-10:AGGREGATE [FINALIZE]
-|  output: count:merge(*)
-|  row-size=8B cardinality=1
-|
-09:EXCHANGE [UNPARTITIONED]
-|
-02:AGGREGATE
-|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
-|  row-size=8B cardinality=1
-|
-01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   HDFS partitions=24/24 files=24 size=205.47KB
-   row-size=4B cardinality=24
-====
-# Verify that the ORC count(*) optimization is applied even if there is more 
than
-# one item in the select list.
-select count(*), count(1), count(123) from 
functional_orc_def.uncomp_src_alltypes
----- PLAN
-PLAN-ROOT SINK
-|
-01:AGGREGATE [FINALIZE]
-|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
-|  row-size=8B cardinality=1
-|
-00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   HDFS partitions=24/24 files=24 size=205.47KB
-   row-size=4B cardinality=24
-====
-# Select count(<partition col>) - the optimization is disabled because it's 
not a
-# count(<literal>) or count(*) aggregate function.
-select count(year) from functional_orc_def.uncomp_src_alltypes
----- PLAN
-PLAN-ROOT SINK
-|
-01:AGGREGATE [FINALIZE]
-|  output: count(`year`)
-|  row-size=8B cardinality=1
-|
-00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   HDFS partitions=24/24 files=24 size=205.47KB
-   row-size=4B cardinality=13.07K
-====
-# Group by partition columns.
-select month, count(*) from functional_orc_def.uncomp_src_alltypes group by 
month, year
----- PLAN
-PLAN-ROOT SINK
-|
-01:AGGREGATE [FINALIZE]
-|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
-|  group by: `month`, `year`
-|  row-size=16B cardinality=24
-|
-00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   HDFS partitions=24/24 files=24 size=205.47KB
-   row-size=16B cardinality=24
-====
-# The optimization is disabled because tinyint_col is not a partition col.
-select tinyint_col, count(*) from functional_orc_def.uncomp_src_alltypes group 
by tinyint_col, year
----- PLAN
-PLAN-ROOT SINK
-|
-01:AGGREGATE [FINALIZE]
-|  output: count(*)
-|  group by: tinyint_col, `year`
-|  row-size=13B cardinality=13.07K
-|
-00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   HDFS partitions=24/24 files=24 size=205.47KB
-   row-size=5B cardinality=13.07K
-====
-# The optimization is disabled because it can not be applied to the 1st 
aggregate
-# function.
-select avg(year), count(*) from functional_orc_def.uncomp_src_alltypes
----- PLAN
-PLAN-ROOT SINK
-|
-01:AGGREGATE [FINALIZE]
-|  output: avg(`year`), count(*)
-|  row-size=16B cardinality=1
-|
-00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   HDFS partitions=24/24 files=24 size=205.47KB
-   row-size=4B cardinality=13.07K
-====
-# Optimization is not applied because the inner count(*) is not materialized. 
The outer
-# count(*) does not reference a base table.
-select count(*) from (select count(*) from 
functional_orc_def.uncomp_src_alltypes) t
----- PLAN
-PLAN-ROOT SINK
-|
-02:AGGREGATE [FINALIZE]
-|  output: count(*)
-|  row-size=8B cardinality=1
-|
-01:AGGREGATE [FINALIZE]
-|  row-size=0B cardinality=1
-|
-00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   HDFS partitions=24/24 files=24 size=205.47KB
-   partition key scan
-   row-size=0B cardinality=24
-====
-# The optimization is applied if count(*) is in the having clause.
-select 1 from functional_orc_def.uncomp_src_alltypes having count(*) > 1
----- PLAN
-PLAN-ROOT SINK
-|
-01:AGGREGATE [FINALIZE]
-|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
-|  having: count(*) > 1
-|  row-size=8B cardinality=0
-|
-00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   HDFS partitions=24/24 files=24 size=205.47KB
-   row-size=4B cardinality=24
-====
-# The count(*) optimization is applied in the inline view.
-select count(*), count(a) from (select count(1) as a from 
functional_orc_def.uncomp_src_alltypes) t
----- PLAN
-PLAN-ROOT SINK
-|
-02:AGGREGATE [FINALIZE]
-|  output: count(*), count(count(*))
-|  row-size=16B cardinality=1
-|
-01:AGGREGATE [FINALIZE]
-|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
-|  row-size=8B cardinality=1
-|
-00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   HDFS partitions=24/24 files=24 size=205.47KB
-   row-size=4B cardinality=24
-====
-# The count(*) optimization is applied to the inline view even if there is a 
join.
-select *
-from functional.alltypes x inner join (
-  select count(1) as a from functional_orc_def.uncomp_src_alltypes group by 
year
-) t on x.id = t.a;
----- PLAN
-PLAN-ROOT SINK
-|
-03:HASH JOIN [INNER JOIN]
-|  hash predicates: x.id = count(*)
-|  runtime filters: RF000 <- count(*)
-|  row-size=101B cardinality=2
-|
-|--02:AGGREGATE [FINALIZE]
-|  |  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
-|  |  group by: `year`
-|  |  row-size=12B cardinality=2
-|  |
-|  01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-|     HDFS partitions=24/24 files=24 size=205.47KB
-|     row-size=12B cardinality=24
-|
-00:SCAN HDFS [functional.alltypes x]
-   HDFS partitions=24/24 files=24 size=478.45KB
-   runtime filters: RF000 -> x.id
-   row-size=89B cardinality=7.30K
-====
-# The count(*) optimization is not applied if there is more than 1 table ref.
-select count(*) from functional_orc_def.uncomp_src_alltypes a, 
functional_orc_def.uncomp_src_alltypes b
----- PLAN
-PLAN-ROOT SINK
-|
-03:AGGREGATE [FINALIZE]
-|  output: count(*)
-|  row-size=8B cardinality=1
-|
-02:NESTED LOOP JOIN [CROSS JOIN]
-|  row-size=0B cardinality=170.85M
-|
-|--01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes b]
-|     HDFS partitions=24/24 files=24 size=205.47KB
-|     row-size=0B cardinality=13.07K
-|
-00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes a]
-   HDFS partitions=24/24 files=24 size=205.47KB
-   row-size=0B cardinality=13.07K
-====
-# The count(*) optimization is applied if all predicates are on partition 
columns only.
-select count(1) from functional_orc_def.uncomp_src_alltypes where year < 2010 
and month > 8;
----- PLAN
-PLAN-ROOT SINK
-|
-01:AGGREGATE [FINALIZE]
-|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
-|  row-size=8B cardinality=1
-|
-00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   partition predicates: `year` < 2010, `month` > 8
-   HDFS partitions=4/24 files=4 size=33.53KB
-   row-size=8B cardinality=4
-====
-# tinyint_col is not a partition column so the optimization is disabled.
-select count(1) from functional_orc_def.uncomp_src_alltypes where year < 2010 
and tinyint_col > 8;
----- PLAN
-PLAN-ROOT SINK
-|
-01:AGGREGATE [FINALIZE]
-|  output: count(*)
-|  row-size=8B cardinality=1
-|
-00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   partition predicates: `year` < 2010
-   HDFS partitions=12/24 files=12 size=102.74KB
-   predicates: tinyint_col > 8
-   row-size=1B cardinality=654
-====
-# Optimization is applied after constant folding.
-select count(1 + 2 + 3) from functional_orc_def.uncomp_src_alltypes
----- PLAN
-PLAN-ROOT SINK
-|
-01:AGGREGATE [FINALIZE]
-|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
-|  row-size=8B cardinality=1
-|
-00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   HDFS partitions=24/24 files=24 size=205.47KB
-   row-size=4B cardinality=24
-====
-# Optimization is not applied to count(null).
-select count(1 + null + 3) from functional_orc_def.uncomp_src_alltypes
-union all
-select count(null) from functional_orc_def.uncomp_src_alltypes
----- PLAN
-PLAN-ROOT SINK
-|
-00:UNION
-|  pass-through-operands: all
-|  row-size=8B cardinality=2
-|
-|--04:AGGREGATE [FINALIZE]
-|  |  output: count(NULL)
-|  |  row-size=8B cardinality=1
-|  |
-|  03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-|     HDFS partitions=24/24 files=24 size=205.47KB
-|     row-size=0B cardinality=13.07K
-|
-02:AGGREGATE [FINALIZE]
-|  output: count(NULL + 3)
-|  row-size=8B cardinality=1
-|
-01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   HDFS partitions=24/24 files=24 size=205.47KB
-   row-size=0B cardinality=13.07K
-====
-# Optimization is not applied when selecting from an empty table.
-select count(*) from functional_orc_def.emptytable
----- PLAN
-PLAN-ROOT SINK
-|
-01:AGGREGATE [FINALIZE]
-|  output: count(*)
-|  row-size=8B cardinality=0
-|
-00:SCAN HDFS [functional_orc_def.emptytable]
-   partitions=0/0 files=0 size=0B
-   row-size=0B cardinality=0
-====
-# Optimization is not applied when all partitions are pruned.
-select count(1) from functional_orc_def.uncomp_src_alltypes where year = -1
----- PLAN
-PLAN-ROOT SINK
-|
-01:AGGREGATE [FINALIZE]
-|  output: count(*)
-|  row-size=8B cardinality=0
-|
-00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   partition predicates: `year` = -1
-   partitions=0/24 files=0 size=0B
-   row-size=0B cardinality=0
-====
-# Optimization is not applied across query blocks, even though it would be 
correct here.
-select count(*) from (select int_col from 
functional_orc_def.uncomp_src_alltypes) t
----- PLAN
-PLAN-ROOT SINK
-|
-01:AGGREGATE [FINALIZE]
-|  output: count(*)
-|  row-size=8B cardinality=1
-|
-00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   HDFS partitions=24/24 files=24 size=205.47KB
-   row-size=0B cardinality=13.07K
-====
-# In general, optimization is not applied when there is a distinct agg.
-select count(*), count(distinct 1) from functional_orc_def.uncomp_src_alltypes
----- PLAN
-PLAN-ROOT SINK
-|
-02:AGGREGATE [FINALIZE]
-|  output: count(1), count:merge(*)
-|  row-size=16B cardinality=1
-|
-01:AGGREGATE
-|  output: count(*)
-|  group by: 1
-|  row-size=9B cardinality=1
-|
-00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   HDFS partitions=24/24 files=24 size=205.47KB
-   row-size=0B cardinality=13.07K
-====
-# The optimization is applied here because only the count(*) and a partition 
column are
-# materialized. Non-materialized agg exprs are ignored.
-select year, cnt from (
-  select year, count(bigint_col), count(*) cnt, avg(int_col)
-  from functional_orc_def.uncomp_src_alltypes
-  where month=1
-  group by year
-) t
----- PLAN
-PLAN-ROOT SINK
-|
-01:AGGREGATE [FINALIZE]
-|  output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: 
num_rows)
-|  group by: `year`
-|  row-size=12B cardinality=2
-|
-00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes]
-   partition predicates: `month` = 1
-   HDFS partitions=2/24 files=2 size=17.07KB
-   row-size=12B cardinality=2
-====
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
index 7ec11f482..41a10602f 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test
@@ -1,7 +1,6 @@
 # Verify that that the parquet count(*) optimization is applied in all 
count(*) or
 # count(<literal>) cases when scanning a Parquet table. In the last case, we 
are scanning
-# a text table, so the optimization is not applied. The optimization is 
observed when
-# the cardinality of the Parquet scan (24) is the same as # the # of files 
(24).
+# a text table, so the optimization is not applied.
 select count(*) from functional_parquet.alltypes
 union all
 select count(1) from functional_parquet.alltypes
@@ -30,7 +29,7 @@ PLAN-ROOT SINK
 |  |
 |  05:SCAN HDFS [functional_parquet.alltypes]
 |     HDFS partitions=24/24 files=24 size=200.45KB
-|     row-size=8B cardinality=24
+|     row-size=8B cardinality=12.75K
 |
 |--04:AGGREGATE [FINALIZE]
 |  |  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
@@ -38,7 +37,7 @@ PLAN-ROOT SINK
 |  |
 |  03:SCAN HDFS [functional_parquet.alltypes]
 |     HDFS partitions=24/24 files=24 size=200.45KB
-|     row-size=8B cardinality=24
+|     row-size=8B cardinality=12.75K
 |
 02:AGGREGATE [FINALIZE]
 |  output: sum_init_zero(functional_parquet.alltypes.stats: num_rows)
@@ -46,7 +45,7 @@ PLAN-ROOT SINK
 |
 01:SCAN HDFS [functional_parquet.alltypes]
    HDFS partitions=24/24 files=24 size=200.45KB
-   row-size=8B cardinality=24
+   row-size=8B cardinality=12.75K
 ---- DISTRIBUTEDPLAN
 PLAN-ROOT SINK
 |
@@ -80,7 +79,7 @@ PLAN-ROOT SINK
 |  |
 |  05:SCAN HDFS [functional_parquet.alltypes]
 |     HDFS partitions=24/24 files=24 size=200.45KB
-|     row-size=8B cardinality=24
+|     row-size=8B cardinality=12.75K
 |
 |--12:AGGREGATE [FINALIZE]
 |  |  output: count:merge(*)
@@ -94,7 +93,7 @@ PLAN-ROOT SINK
 |  |
 |  03:SCAN HDFS [functional_parquet.alltypes]
 |     HDFS partitions=24/24 files=24 size=200.45KB
-|     row-size=8B cardinality=24
+|     row-size=8B cardinality=12.75K
 |
 10:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
@@ -108,7 +107,7 @@ PLAN-ROOT SINK
 |
 01:SCAN HDFS [functional_parquet.alltypes]
    HDFS partitions=24/24 files=24 size=200.45KB
-   row-size=8B cardinality=24
+   row-size=8B cardinality=12.75K
 ====
 # Verify that the parquet count(*) optimization is applied even if there is 
more than
 # one item in the select list.
@@ -122,7 +121,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.alltypes]
    HDFS partitions=24/24 files=24 size=200.45KB
-   row-size=8B cardinality=24
+   row-size=8B cardinality=12.75K
 ====
 # Select count(<partition col>) - the optimization should be disabled because 
it's not a
 # count(<literal>) or count(*) aggregate function.
@@ -150,7 +149,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.alltypes]
    HDFS partitions=24/24 files=24 size=200.45KB
-   row-size=16B cardinality=24
+   row-size=16B cardinality=12.75K
 ====
 # The optimization is disabled because tinyint_col is not a partition col.
 select tinyint_col, count(*) from functional_parquet.alltypes group by 
tinyint_col, year
@@ -209,7 +208,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.alltypes]
    HDFS partitions=24/24 files=24 size=200.45KB
-   row-size=8B cardinality=24
+   row-size=8B cardinality=12.75K
 ====
 # The count(*) optimization is applied in the inline view.
 select count(*), count(a) from (select count(1) as a from 
functional_parquet.alltypes) t
@@ -226,7 +225,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.alltypes]
    HDFS partitions=24/24 files=24 size=200.45KB
-   row-size=8B cardinality=24
+   row-size=8B cardinality=12.75K
 ====
 # The count(*) optimization is applied to the inline view even if there is a 
join.
 select *
@@ -248,7 +247,7 @@ PLAN-ROOT SINK
 |  |
 |  01:SCAN HDFS [functional_parquet.alltypes]
 |     HDFS partitions=24/24 files=24 size=200.45KB
-|     row-size=12B cardinality=24
+|     row-size=12B cardinality=12.75K
 |
 00:SCAN HDFS [functional.alltypes x]
    HDFS partitions=24/24 files=24 size=478.45KB
@@ -287,7 +286,7 @@ PLAN-ROOT SINK
 00:SCAN HDFS [functional_parquet.alltypes]
    partition predicates: `year` < 2010, `month` > 8
    HDFS partitions=4/24 files=4 size=33.53KB
-   row-size=8B cardinality=4
+   row-size=8B cardinality=2.13K
 ====
 # tinyint_col is not a partition column so the optimization is disabled.
 select count(1) from functional_parquet.alltypes where year < 2010 and 
tinyint_col > 8;
@@ -315,7 +314,7 @@ PLAN-ROOT SINK
 |
 00:SCAN HDFS [functional_parquet.alltypes]
    HDFS partitions=24/24 files=24 size=200.45KB
-   row-size=8B cardinality=24
+   row-size=8B cardinality=12.75K
 ====
 # Optimization is not applied to count(null).
 select count(1 + null + 3) from functional_parquet.alltypes
@@ -421,5 +420,5 @@ PLAN-ROOT SINK
 00:SCAN HDFS [functional_parquet.alltypes]
    partition predicates: `month` = 1
    HDFS partitions=2/24 files=2 size=17.07KB
-   row-size=12B cardinality=2
+   row-size=12B cardinality=1.09K
 ====
diff --git 
a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
 
b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index cb58de723..b7abe2566 100644
--- 
a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ 
b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -1869,7 +1869,6 @@ select count(*) from tpch_parquet.lineitem
 ---- PLAN
 Max Per-Host Resource Reservation: Memory=128.00KB Threads=2
 Per-Host Resource Estimates: Memory=10MB
-Codegen disabled by planner
 Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1891,12 +1890,11 @@ PLAN-ROOT SINK
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=2.14M
    mem-estimate=1.00MB mem-reservation=128.00KB thread-reservation=1
-   tuple-ids=0 row-size=8B cardinality=3
+   tuple-ids=0 row-size=8B cardinality=6.00M
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
 Max Per-Host Resource Reservation: Memory=128.00KB Threads=3
 Per-Host Resource Estimates: Memory=10MB
-Codegen disabled by planner
 Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1931,12 +1929,11 @@ Per-Host Resources: mem-estimate=1.02MB 
mem-reservation=128.00KB thread-reservat
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=2.14M
    mem-estimate=1.00MB mem-reservation=128.00KB thread-reservation=1
-   tuple-ids=0 row-size=8B cardinality=3
+   tuple-ids=0 row-size=8B cardinality=6.00M
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
 Max Per-Host Resource Reservation: Memory=128.00KB Threads=2
 Per-Host Resource Estimates: Memory=80MB
-Codegen disabled by planner
 Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
@@ -1971,7 +1968,7 @@ Per-Instance Resources: mem-estimate=80.02MB 
mem-reservation=128.00KB thread-res
      columns: all
    extrapolated-rows=disabled max-scan-range-rows=2.14M
    mem-estimate=80.00MB mem-reservation=128.00KB thread-reservation=0
-   tuple-ids=0 row-size=8B cardinality=3
+   tuple-ids=0 row-size=8B cardinality=6.00M
    in pipelines: 00(GETNEXT)
 ====
 # Sort
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-compound-predicate-push-down.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-compound-predicate-push-down.test
index e5ef490f8..c7feddf64 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-compound-predicate-push-down.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-compound-predicate-push-down.test
@@ -39,7 +39,6 @@ select count(1) from ice_compound_pred_pd;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 show files in ice_compound_pred_pd;
@@ -424,7 +423,6 @@ select count(1) from ice_compound_pred_pd1;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 show files in ice_compound_pred_pd1;
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
index fcec4cb69..9aadc2711 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test
@@ -33,7 +33,6 @@ select count(1) from ice_pred_pd1;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 show files in ice_pred_pd1;
@@ -63,6 +62,7 @@ aggregation(SUM, NumRowGroups): 3
 ---- QUERY
 # Filtering only on a partition column is done by Iceberg and in Impala we can 
get the results
 # simply using file metadata.
+# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 
to 3.
 select
   count(1)
 from
@@ -72,8 +72,7 @@ where
 ---- RESULTS
 9
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 3
+aggregation(SUM, NumRowGroups): 3
 ====
 ---- QUERY
 # The IN predicate matches two row groups
@@ -447,6 +446,7 @@ aggregation(SUM, NumRowGroups): 1
 ====
 ---- QUERY
 # NOT_IN could be answered using file metadata if only partition cols are 
included
+# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 
to 1.
 select
   count(1)
 from
@@ -456,8 +456,7 @@ where
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 1
+aggregation(SUM, NumRowGroups): 1
 ====
 ---- QUERY
 # NOT_IN does not work because col_dt is not the partition column
@@ -548,7 +547,6 @@ select count(1) from ice_pred_pd2;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 show files in ice_pred_pd2;
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-is-null-predicate-push-down.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-is-null-predicate-push-down.test
index 6b7ba819d..e6207e791 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-is-null-predicate-push-down.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-is-null-predicate-push-down.test
@@ -35,7 +35,6 @@ select count(1) from ice_is_null_pred_pd;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 show files in ice_is_null_pred_pd;
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
index a1b955a94..9ce8545cb 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test
@@ -170,18 +170,17 @@ select count(*) from ice_bigints;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 # When filtered only by partition column Iceberg can do the filtering and no 
need to read data in Impala.
+# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 
to 1.
 select count(*) from ice_bigints
 where i = 0 and j = 0;
 ---- RESULTS
 1217
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
+aggregation(SUM, NumRowGroups): 1
 aggregation(SUM, RowsRead): 0
-aggregation(SUM, NumFileMetadataRead): 1
 ====
 ---- QUERY
 # When not just partition columns are involved in the filtering then Impala 
has to read data to answer the query.
@@ -281,6 +280,7 @@ select count(*) from alltypes_part;
 Expression 'timestamp_col' (type: TIMESTAMP) would need to be cast to STRING 
for column 'string_col'
 ====
 ---- QUERY
+# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 
to 4.
 select count(*) from alltypes_part
 where bool_col = true;
 ---- RESULTS
@@ -288,10 +288,10 @@ where bool_col = true;
 ---- TYPES
 BIGINT
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 4
+aggregation(SUM, NumRowGroups): 4
 ====
 ---- QUERY
+# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 
to 4.
 select count(*) from alltypes_part
 where float_col = 0;
 ---- RESULTS
@@ -299,10 +299,10 @@ where float_col = 0;
 ---- TYPES
 BIGINT
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 4
+aggregation(SUM, NumRowGroups): 4
 ====
 ---- QUERY
+# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 
to 4.
 select count(*) from alltypes_part
 where double_col = 0;
 ---- RESULTS
@@ -310,10 +310,10 @@ where double_col = 0;
 ---- TYPES
 BIGINT
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 4
+aggregation(SUM, NumRowGroups): 4
 ====
 ---- QUERY
+# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 
to 2.
 select count(*) from alltypes_part
 where date_col = '2009-01-01';
 ---- RESULTS
@@ -321,10 +321,10 @@ where date_col = '2009-01-01';
 ---- TYPES
 BIGINT
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 2
+aggregation(SUM, NumRowGroups): 2
 ====
 ---- QUERY
+# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 
to 4.
 select count(*) from alltypes_part
 where string_col = '0';
 ---- RESULTS
@@ -332,8 +332,7 @@ where string_col = '0';
 ---- TYPES
 BIGINT
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 4
+aggregation(SUM, NumRowGroups): 4
 ====
 ---- QUERY
 # 'timestamp_col' is not a partitioning column, so min/max stats will not be 
used to
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test
index da56563eb..9310878c5 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test
@@ -13,7 +13,6 @@ select count(*) from ice_tbl;
 0
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 insert into
@@ -34,7 +33,6 @@ select count(*) from ice_tbl;
 3
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 create table ice_tbl_u1 stored as iceberg as select * from ice_tbl;
@@ -47,7 +45,6 @@ select count(*) from ice_tbl_u1;
 3
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 insert into
@@ -69,7 +66,6 @@ select count(*) from ice_tbl;
 6
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 create table ice_tbl_u2 stored as iceberg as select * from ice_tbl;
@@ -82,7 +78,6 @@ select count(*) from ice_tbl_u2;
 6
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 insert into
@@ -105,7 +100,6 @@ select count(*) from ice_tbl;
 8
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 select count(*) from ice_tbl for system_time as of now();
@@ -113,7 +107,6 @@ select count(*) from ice_tbl for system_time as of now();
 8
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 set explain_level=3;
@@ -145,6 +138,7 @@ explain select 123, count(*), 321 from ice_tbl;
 ====
 ---- QUERY
 # Filtering by a partition column results in Iceberg doing the filtering 
instead of Impala.
+# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 
to 2.
 select
   count(*)
 from
@@ -154,8 +148,7 @@ where
 ---- RESULTS
 4
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 2
+aggregation(SUM, NumRowGroups): 2
 ====
 ---- QUERY
 select
@@ -167,7 +160,6 @@ having
 ---- RESULTS
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 4
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 select
@@ -181,7 +173,6 @@ group by
 4
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 4
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 select
@@ -192,7 +183,6 @@ from
 6
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 4
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 truncate ice_tbl;
@@ -205,7 +195,6 @@ select count(*) from ice_tbl;
 0
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 create table parq_tbl(col_i INT, col_s STRING) PARTITIONED BY(x INT) STORED AS 
PARQUET;
@@ -213,6 +202,7 @@ create table parq_tbl(col_i INT, col_s STRING) PARTITIONED 
BY(x INT) STORED AS P
 'Table has been created.'
 ====
 ---- QUERY
+# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 
to 3.
 insert into parq_tbl PARTITION(x = 12340) values (0, "a");
 insert into parq_tbl PARTITION(x = 12341) values (1, "b");
 insert into parq_tbl PARTITION(x = 12342) values (2, "c");
@@ -220,8 +210,7 @@ select count(*) from parq_tbl;
 ---- RESULTS
 3
 ---- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 3
+aggregation(SUM, NumRowGroups): 3
 ====
 ---- QUERY
 select count(*) as c from ice_tbl_u1 union all (select count(*) c from 
ice_tbl_u2) order by c;
@@ -232,7 +221,6 @@ select count(*) as c from ice_tbl_u1 union all (select 
count(*) c from ice_tbl_u
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 with u1 as (select count(*) from ice_tbl_u1), u2 as (select count(*) from 
ice_tbl_u2) select * from u1, u2;
@@ -242,7 +230,6 @@ with u1 as (select count(*) from ice_tbl_u1), u2 as (select 
count(*) from ice_tb
 BIGINT,BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 with u1 as (select count(*) from ice_tbl_u1),
@@ -254,5 +241,4 @@ u2 as (select count(*) from ice_tbl_u1 union all (select 
count(*) from ice_tbl_u
 BIGINT,BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
-====
\ No newline at end of file
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test
index c16399812..a1728253a 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test
@@ -20,7 +20,6 @@ select count(*) from ice_types1;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 show files in ice_types1;
@@ -252,7 +251,6 @@ select count(*) from ice_types2;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 show files in ice_types2;
@@ -370,7 +368,6 @@ select count(*) from ice_types3;
 BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 show files in ice_types3;
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-plain-count-star-optimization.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-plain-count-star-optimization.test
index 21ac4a1fe..0edf877d4 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-plain-count-star-optimization.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-plain-count-star-optimization.test
@@ -20,7 +20,6 @@ BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 4
 aggregation(SUM, NumOrcStripes): 4
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 with u1 as (select count(*) as c from 
functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files),
@@ -35,7 +34,6 @@ BIGINT,BIGINT,TINYINT,BIGINT,BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 4
 aggregation(SUM, NumOrcStripes): 4
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 select count(*) as c from 
iceberg_v2_positional_not_all_data_files_have_delete_files for system_version 
as of 752781918366351945
@@ -58,7 +56,6 @@ BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 2
 aggregation(SUM, NumOrcStripes): 2
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 with u1 as (select count(*) as c from 
functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files 
for system_version as of 752781918366351945),
@@ -73,5 +70,4 @@ BIGINT,BIGINT,TINYINT,BIGINT,BIGINT
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 2
 aggregation(SUM, NumOrcStripes): 2
-aggregation(SUM, NumFileMetadataRead): 0
-====
\ No newline at end of file
+====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test
index a2af39918..5cc292ee3 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test
@@ -39,7 +39,6 @@ SELECT count(*) from iceberg_v2_no_deletes_orc
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 COMPUTE STATS iceberg_v2_positional_delete_all_rows_orc
@@ -81,7 +80,6 @@ SELECT count(*) from 
iceberg_v2_positional_delete_all_rows_orc for system_versio
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumOrcStripes): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 SELECT count(*) from iceberg_v2_positional_delete_all_rows_orc
@@ -91,7 +89,6 @@ SELECT count(*) from iceberg_v2_positional_delete_all_rows_orc
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumOrcStripes): 2
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 COMPUTE STATS iceberg_v2_positional_not_all_data_files_have_delete_files_orc
@@ -145,7 +142,6 @@ SELECT count(*) from 
iceberg_v2_positional_not_all_data_files_have_delete_files_
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumOrcStripes): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 SELECT count(*) from 
iceberg_v2_positional_not_all_data_files_have_delete_files_orc for 
system_version as of 5003445199566617082
@@ -155,7 +151,6 @@ SELECT count(*) from 
iceberg_v2_positional_not_all_data_files_have_delete_files_
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumOrcStripes): 2
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 SELECT count(*) from 
iceberg_v2_positional_not_all_data_files_have_delete_files_orc
@@ -165,7 +160,6 @@ SELECT count(*) from 
iceberg_v2_positional_not_all_data_files_have_delete_files_
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumOrcStripes): 4
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 COMPUTE STATS iceberg_v2_partitioned_position_deletes_orc
@@ -207,7 +201,6 @@ SELECT count(*) from 
iceberg_v2_partitioned_position_deletes_orc for system_vers
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumOrcStripes): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 SELECT count(*) from iceberg_v2_partitioned_position_deletes_orc
@@ -217,7 +210,6 @@ SELECT count(*) from 
iceberg_v2_partitioned_position_deletes_orc
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumOrcStripes): 6
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 SELECT count(*) from iceberg_v2_no_deletes_orc where i = 2;
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
index e9c7b985f..0b9675a5f 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test
@@ -39,7 +39,6 @@ SELECT count(*) from iceberg_v2_no_deletes
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 COMPUTE STATS iceberg_v2_delete_positional
@@ -81,7 +80,6 @@ SELECT count(*) from iceberg_v2_delete_positional for 
system_version as of 68169
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 SELECT count(*) from iceberg_v2_delete_positional;
@@ -91,7 +89,6 @@ SELECT count(*) from iceberg_v2_delete_positional;
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 2
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 COMPUTE STATS iceberg_v2_positional_delete_all_rows
@@ -133,7 +130,6 @@ SELECT count(*) from iceberg_v2_positional_delete_all_rows 
for system_version as
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 SELECT count(*) from iceberg_v2_positional_delete_all_rows
@@ -143,7 +139,6 @@ SELECT count(*) from iceberg_v2_positional_delete_all_rows
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 2
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 COMPUTE STATS iceberg_v2_positional_not_all_data_files_have_delete_files
@@ -197,7 +192,6 @@ SELECT count(*) from 
iceberg_v2_positional_not_all_data_files_have_delete_files
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 SELECT count(*) from 
iceberg_v2_positional_not_all_data_files_have_delete_files for system_version 
as of 752781918366351945
@@ -207,7 +201,6 @@ SELECT count(*) from 
iceberg_v2_positional_not_all_data_files_have_delete_files
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 2
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files
@@ -217,7 +210,6 @@ SELECT count(*) from 
iceberg_v2_positional_not_all_data_files_have_delete_files
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 4
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 COMPUTE STATS iceberg_v2_positional_update_all_rows
@@ -268,7 +260,6 @@ SELECT count(*) from iceberg_v2_positional_update_all_rows 
for system_version as
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 2
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 SELECT count(*) from iceberg_v2_positional_update_all_rows
@@ -278,7 +269,6 @@ SELECT count(*) from iceberg_v2_positional_update_all_rows
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 2
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 COMPUTE STATS iceberg_v2_partitioned_position_deletes
@@ -320,7 +310,6 @@ SELECT count(*) from 
iceberg_v2_partitioned_position_deletes for system_version
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 SELECT count(*) from iceberg_v2_partitioned_position_deletes
@@ -330,7 +319,6 @@ SELECT count(*) from iceberg_v2_partitioned_position_deletes
 bigint
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumRowGroups): 6
-aggregation(SUM, NumFileMetadataRead): 0
 ====
 ---- QUERY
 SELECT count(*) from iceberg_v2_no_deletes where i = 2;
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/mixed-format.test 
b/testdata/workloads/functional-query/queries/QueryTest/mixed-format.test
index f4b7730a4..2d5bf9e80 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/mixed-format.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/mixed-format.test
@@ -25,17 +25,16 @@ bigint, bigint
 280,1260
 ====
 ---- QUERY
-# IMPALA-11123: IMPALA-5861 add this test to verify that 'RowRead' counter is 
not double
-# counted for zero slot scan. IMPALA-11123 remove incerement of 'RowRead' 
counter
-# in case of optimized count(star) and zero slot scan query. This cause 
reduction of
-# 'RowsRead' value from 1200 to 900 since the other 300 are served through
-# zero slot scan. We do not verify 'NumFileMetadataRead' since it does not 
stay the same
-# over different test vector permutation.
+# IMPALA-5861: RowsRead counter should be accurate for table scan that returns
+# zero slots. This test is run with various batch_size values, which helps
+# reproduce the bug. Scanning multiple file formats triggers the bug because
+# the Parquet count(*) rewrite is disabled when non-Parquet file formats are
+# present.
 select count(*) from functional.alltypesmixedformat
 ---- TYPES
 bigint
 ---- RESULTS
 1200
 ---- RUNTIME_PROFILE
-aggregation(SUM, RowsRead): 900
+aggregation(SUM, RowsRead): 1200
 ====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/orc-stats-agg.test 
b/testdata/workloads/functional-query/queries/QueryTest/orc-stats-agg.test
deleted file mode 100644
index 4c9bdfb50..000000000
--- a/testdata/workloads/functional-query/queries/QueryTest/orc-stats-agg.test
+++ /dev/null
@@ -1,152 +0,0 @@
-====
----- QUERY
-# Tests the correctness of the ORC count(*) optimization.
-select count(1)
-from functional_orc_def.uncomp_src_alltypes
----- RESULTS
-7300
----- TYPES
-bigint
----- RUNTIME_PROFILE
-aggregation(SUM, NumOrcStripes): 0
-aggregation(SUM, NumFileMetadataRead): 24
-aggregation(SUM, RowsRead): 0
-=====
----- QUERY
-# Tests the correctness of zero slot scan over ORC.
-# Does not verify 'NumFileMetadataRead' here since codegen vs non-codegen yield
-# different number.
-select 1 from functional_orc_def.alltypestiny
----- RESULTS
-1
-1
-1
-1
-1
-1
-1
-1
----- TYPES
-tinyint
----- RUNTIME_PROFILE
-aggregation(SUM, NumOrcStripes): 0
-aggregation(SUM, RowsRead): 0
-=====
----- QUERY
-# ORC count(*) optimization with predicates on the partition columns.
-select count(1)
-from functional_orc_def.uncomp_src_alltypes where year < 2010 and month > 8
----- RESULTS
-1220
----- TYPES
-bigint
----- RUNTIME_PROFILE
-aggregation(SUM, NumOrcStripes): 0
-aggregation(SUM, NumFileMetadataRead): 4
-aggregation(SUM, RowsRead): 0
-=====
----- QUERY
-# ORC count(*) optimization with group by partition columns.
-select year, month, count(1)
-from functional_orc_def.uncomp_src_alltypes group by year, month
----- RESULTS
-2009,1,310
-2009,2,280
-2009,3,310
-2009,4,300
-2009,5,310
-2009,6,300
-2009,7,310
-2009,8,310
-2009,9,300
-2009,10,310
-2009,11,300
-2009,12,310
-2010,1,310
-2010,2,280
-2010,3,310
-2010,4,300
-2010,5,310
-2010,6,300
-2010,7,310
-2010,8,310
-2010,9,300
-2010,10,310
-2010,11,300
-2010,12,310
----- TYPES
-int, int, bigint
----- RUNTIME_PROFILE
-aggregation(SUM, NumOrcStripes): 0
-aggregation(SUM, NumFileMetadataRead): 24
-aggregation(SUM, RowsRead): 0
-=====
----- QUERY
-# ORC count(*) optimization with both group by and predicates on partition 
columns.
-select count(1)
-from functional_orc_def.uncomp_src_alltypes where year < 2010 and month > 8
-group by month
----- RESULTS
-310
-300
-310
-300
----- TYPES
-bigint
----- RUNTIME_PROFILE
-aggregation(SUM, NumOrcStripes): 0
-aggregation(SUM, NumFileMetadataRead): 4
-aggregation(SUM, RowsRead): 0
-=====
----- QUERY
-# ORC count(*) optimization with the result going into a join.
-select x.bigint_col from functional_orc_def.uncomp_src_alltypes x
-  inner join (
-    select count(1) as a from functional_orc_def.uncomp_src_alltypes group by 
year
-  ) t on x.id = t.a;
----- RESULTS
-0
-0
----- TYPES
-bigint
----- RUNTIME_PROFILE
-aggregation(SUM, NumOrcStripes): 24
-aggregation(SUM, NumFileMetadataRead): 24
-aggregation(SUM, RowsRead): 7300
-=====
----- QUERY
-# ORC count(*) optimization with the agg function in the having clause.
-select 1 from functional_orc_def.uncomp_src_alltypes having count(*) > 1
----- RESULTS
-1
----- TYPES
-tinyint
----- RUNTIME_PROFILE
-aggregation(SUM, NumOrcStripes): 0
-aggregation(SUM, NumFileMetadataRead): 24
-aggregation(SUM, RowsRead): 0
-====
----- QUERY
-# Verify that 0 is returned for count(*) on an empty table.
-select count(1) from functional_orc_def.emptytable
----- RESULTS
-0
----- TYPES
-bigint
----- RUNTIME_PROFILE
-aggregation(SUM, NumOrcStripes): 0
-aggregation(SUM, NumFileMetadataRead): 0
-aggregation(SUM, RowsRead): 0
-=====
----- QUERY
-# Verify that 0 is returned when all partitions are pruned.
-select count(1) from functional_orc_def.uncomp_src_alltypes where year = -1
----- RESULTS
-0
----- TYPES
-bigint
----- RUNTIME_PROFILE
-aggregation(SUM, NumOrcStripes): 0
-aggregation(SUM, NumFileMetadataRead): 0
-aggregation(SUM, RowsRead): 0
-=====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test
index 43959d7b5..620c50bef 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test
@@ -7,30 +7,6 @@ from functional_parquet.alltypes
 7300
 ---- TYPES
 bigint
----- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 24
-aggregation(SUM, RowsRead): 0
-=====
----- QUERY
-# Tests the correctness of zero slot scan over Parquet.
-# Not checking 'NumFileMetadataRead' here since codegen vs non-codegen yield
-# different number.
-select 1 from functional_orc_def.alltypestiny
----- RESULTS
-1
-1
-1
-1
-1
-1
-1
-1
----- TYPES
-tinyint
----- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, RowsRead): 0
 =====
 ---- QUERY
 # Parquet count(*) optimization with predicates on the partition columns.
@@ -40,10 +16,6 @@ from functional_parquet.alltypes where year < 2010 and month 
> 8
 1220
 ---- TYPES
 bigint
----- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 4
-aggregation(SUM, RowsRead): 0
 =====
 ---- QUERY
 # Parquet count(*) optimization with group by partition columns.
@@ -76,10 +48,6 @@ from functional_parquet.alltypes group by year, month
 2010,12,310
 ---- TYPES
 int, int, bigint
----- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 24
-aggregation(SUM, RowsRead): 0
 =====
 ---- QUERY
 # Parquet count(*) optimization with both group by and predicates on partition 
columns.
@@ -93,10 +61,6 @@ group by month
 300
 ---- TYPES
 bigint
----- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 4
-aggregation(SUM, RowsRead): 0
 =====
 ---- QUERY
 # Parquet count(*) optimization with the result going into a join.
@@ -109,10 +73,6 @@ select x.bigint_col from functional.alltypes x
 0
 ---- TYPES
 bigint
----- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 24
-aggregation(SUM, RowsRead): 7300
 =====
 ---- QUERY
 # Parquet count(*) optimization with the agg function in the having clause.
@@ -121,10 +81,6 @@ select 1 from functional_parquet.alltypes having count(*) > 
1
 1
 ---- TYPES
 tinyint
----- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 24
-aggregation(SUM, RowsRead): 0
 ====
 ---- QUERY
 # Verify that 0 is returned for count(*) on an empty table.
@@ -133,10 +89,6 @@ select count(1) from functional_parquet.emptytable
 0
 ---- TYPES
 bigint
----- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
-aggregation(SUM, RowsRead): 0
 =====
 ---- QUERY
 # Verify that 0 is returned when all partitions are pruned.
@@ -145,10 +97,6 @@ select count(1) from functional_parquet.alltypes where year 
= -1
 0
 ---- TYPES
 bigint
----- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 0
-aggregation(SUM, RowsRead): 0
 =====
 ---- QUERY
 # Test different row group size combinations.
@@ -166,9 +114,6 @@ select count(*) from tpch_parquet.lineitem
 6001215
 ---- TYPES
 bigint
----- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, RowsRead): 0
 =====
 ---- QUERY
 # IMPALA-5679: Count(*) with group by on a string partition column.
@@ -191,8 +136,4 @@ select string_col, count(*) from 
$DATABASE.string_partitioned_table group by str
 '9',730
 ---- TYPES
 string, bigint
----- RUNTIME_PROFILE
-aggregation(SUM, NumRowGroups): 0
-aggregation(SUM, NumFileMetadataRead): 10
-aggregation(SUM, RowsRead): 0
 =====
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test
 
b/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test
index 3938ff3ec..0d9e17330 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test
@@ -11,10 +11,6 @@ INT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
----- RUNTIME_PROFILE: table_format=parquet,orc
-# Confirm that only one metadata per file is read.
-aggregation(SUM, RowsRead): 0
-aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Test with more complex multiple distinct aggregation.
@@ -27,10 +23,6 @@ BIGINT,BIGINT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
----- RUNTIME_PROFILE: table_format=parquet,orc
-# Confirm that only one metadata per file is read.
-aggregation(SUM, RowsRead): 0
-aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Distinct aggregation with multiple columns.
@@ -66,10 +58,6 @@ INT,INT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
----- RUNTIME_PROFILE: table_format=parquet,orc
-# Confirm that only one metadata per file is read.
-aggregation(SUM, RowsRead): 0
-aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Partition key scan combined with analytic function.
@@ -83,10 +71,6 @@ INT,BIGINT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
----- RUNTIME_PROFILE: table_format=parquet,orc
-# Confirm that only one metadata per file is read.
-aggregation(SUM, RowsRead): 0
-aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Partition scan combined with sort.
@@ -123,10 +107,6 @@ INT,INT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
----- RUNTIME_PROFILE: table_format=parquet,orc
-# Confirm that only one metadata per file is read.
-aggregation(SUM, RowsRead): 0
-aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Partition key scan combined with predicate on partition columns
@@ -141,10 +121,6 @@ INT,INT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 2
----- RUNTIME_PROFILE: table_format=parquet,orc
-# Confirm that only one metadata per file is read.
-aggregation(SUM, RowsRead): 0
-aggregation(SUM, NumFileMetadataRead): 2
 ====
 ---- QUERY
 # Partition key scan combined with having predicate.
@@ -160,10 +136,6 @@ INT,INT
 ---- RUNTIME_PROFILE
 # Confirm that only one row per file is read.
 aggregation(SUM, RowsRead): 24
----- RUNTIME_PROFILE: table_format=parquet,orc
-# Confirm that only one metadata per file is read.
-aggregation(SUM, RowsRead): 0
-aggregation(SUM, NumFileMetadataRead): 24
 ====
 ---- QUERY
 # Empty table should not return any rows
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test 
b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
index 698e66da7..13057c0d2 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test
@@ -15,10 +15,6 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
 620
 ---- RUNTIME_PROFILE
 row_regex: .*RowsRead: 2.43K .*
----- RUNTIME_PROFILE: table_format=parquet,orc
-row_regex: .*RowsReturned: 2.43K .*
-aggregation(SUM, RowsRead): 2
-aggregation(SUM, NumFileMetadataRead): 48
 ====
 ---- QUERY
 # Now turn on local filtering: we expect to see a reduction in scan volume.
@@ -53,10 +49,6 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
 620
 ---- RUNTIME_PROFILE
 row_regex: .*RowsRead: 2.43K .*
----- RUNTIME_PROFILE: table_format=parquet,orc
-row_regex: .*RowsReturned: 2.43K .*
-aggregation(SUM, RowsRead): 2
-aggregation(SUM, NumFileMetadataRead): 48
 ====
 ---- QUERY
 # Shuffle join, global mode. Expect filters to be propagated.
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_mt_dop.test
 
b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_mt_dop.test
index 1a6c30cdd..b6fab637d 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_mt_dop.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_mt_dop.test
@@ -15,10 +15,6 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
 620
 ---- RUNTIME_PROFILE
 row_regex: .*RowsRead: 2.43K .*
----- RUNTIME_PROFILE: table_format=parquet,orc
-row_regex: .*RowsReturned: 2.43K .*
-aggregation(SUM, RowsRead): 2
-aggregation(SUM, NumFileMetadataRead): 48
 ====
 ---- QUERY
 # Now turn on local filtering: we expect to see a reduction in scan volume.
@@ -53,10 +49,6 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1"
 620
 ---- RUNTIME_PROFILE
 row_regex: .*RowsRead: 2.43K .*
----- RUNTIME_PROFILE: table_format=parquet,orc
-row_regex: .*RowsReturned: 2.43K .*
-aggregation(SUM, RowsRead): 2
-aggregation(SUM, NumFileMetadataRead): 48
 ====
 ---- QUERY
 # Shuffle join, global mode. Expect filters to be propagated.
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/scanners.test 
b/testdata/workloads/functional-query/queries/QueryTest/scanners.test
index 8dd741f67..eff43d53d 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/scanners.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/scanners.test
@@ -238,9 +238,6 @@ tinyint
 1
 ---- RUNTIME_PROFILE
 aggregation(SUM, RowsRead): 100
----- RUNTIME_PROFILE: table_format=parquet,orc
-aggregation(SUM, RowsRead): 0
-aggregation(SUM, RowsReturned): 200
 ====
 ---- QUERY
 select year, count(*) from alltypes group by year
diff --git a/tests/custom_cluster/test_executor_groups.py 
b/tests/custom_cluster/test_executor_groups.py
index 74af1051d..22da7969f 100644
--- a/tests/custom_cluster/test_executor_groups.py
+++ b/tests/custom_cluster/test_executor_groups.py
@@ -771,7 +771,7 @@ class TestExecutorGroups(CustomClusterTestSuite):
     different number of executors and memory limit in each."""
     # A small query with estimated memory per host of 10MB that can run on the 
small
     # executor group
-    SMALL_QUERY = "select count(*) from tpcds_parquet.date_dim where 
d_year=2022;"
+    SMALL_QUERY = "select count(*) from tpcds_parquet.date_dim;"
     # A large query with estimated memory per host of 132MB that can only run 
on
     # the large executor group.
     LARGE_QUERY = "select * from tpcds_parquet.store_sales where ss_item_sk = 
1 limit 50;"
diff --git a/tests/custom_cluster/test_query_retries.py 
b/tests/custom_cluster/test_query_retries.py
index 56e852970..7d039708a 100644
--- a/tests/custom_cluster/test_query_retries.py
+++ b/tests/custom_cluster/test_query_retries.py
@@ -79,11 +79,6 @@ class TestQueryRetries(CustomClusterTestSuite):
         union all
         select count(*) from functional.alltypes where bool_col = sleep(50)"""
 
-  # A simple count query with predicate. The predicate is needed so that the 
planner does
-  # not create the optimized count(star) query plan.
-  _count_query = "select count(*) from tpch_parquet.lineitem where l_orderkey 
< 50"
-  _count_query_result = "55"
-
   @classmethod
   def get_workload(cls):
     return 'functional-query'
@@ -257,7 +252,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     killed_impalad = self.__kill_random_impalad()
-    query = self._count_query
+    query = "select count(*) from tpch_parquet.lineitem"
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
     self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
@@ -269,7 +264,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     results = self.client.fetch(query, handle)
     assert results.success
     assert len(results.data) == 1
-    assert self._count_query_result in results.data[0]
+    assert "6001215" in results.data[0]
 
     # The runtime profile of the retried query.
     retried_runtime_profile = self.client.get_runtime_profile(handle)
@@ -317,7 +312,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     # and the query should be retried. Add delay before admission so that the 
2nd node
     # is removed from the blacklist before scheduler makes schedule for the 
retried
     # query.
-    query = self._count_query
+    query = "select count(*) from tpch_parquet.lineitem"
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true',
                        'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
@@ -330,7 +325,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     results = self.client.fetch(query, handle)
     assert results.success
     assert len(results.data) == 1
-    assert self._count_query_result in results.data[0]
+    assert "6001215" in results.data[0]
 
     # The runtime profile of the retried query.
     retried_runtime_profile = self.client.get_runtime_profile(handle)
@@ -380,7 +375,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     rpc_not_accessible_impalad = self.cluster.impalads[1]
     assert rpc_not_accessible_impalad.service.krpc_port == FAILED_KRPC_PORT
 
-    query = self._count_query
+    query = "select count(*) from tpch_parquet.lineitem"
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true',
                        'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
@@ -703,7 +698,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = self._count_query
+    query = "select count(*) from tpch_parquet.lineitem"
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
     self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
@@ -742,7 +737,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = self._count_query
+    query = "select count(*) from tpch_parquet.lineitem"
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
     self.__wait_until_retry_state(handle, 'RETRYING')
@@ -772,7 +767,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = self._count_query
+    query = "select count(*) from tpch_parquet.lineitem"
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true'})
 
@@ -796,7 +791,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = self._count_query
+    query = "select count(*) from tpch_parquet.lineitem"
     self.hs2_client.set_configuration({'retry_failed_queries': 'true'})
     self.hs2_client.set_configuration_option('impala.resultset.cache.size', 
'1024')
     self.hs2_client.execute_async(query)
@@ -823,7 +818,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = self._count_query
+    query = "select count(*) from tpch_parquet.lineitem"
     self.execute_query_async(query, query_options={'retry_failed_queries': 
'true'})
     # The number of in-flight queries is 0 at the beginning, then 1 when the 
original
     # query is submitted. It's 2 when the retried query is registered. 
Although the retry
@@ -853,7 +848,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = self._count_query
+    query = "select count(*) from tpch_parquet.lineitem"
     handle = self.execute_query_async(query,
         query_options={'retry_failed_queries': 'true', 'query_timeout_s': '1'})
     self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60)
@@ -892,7 +887,7 @@ class TestQueryRetries(CustomClusterTestSuite):
 
     # Kill an impalad, and run a query. The query should be retried.
     self.cluster.impalads[1].kill()
-    query = self._count_query
+    query = "select count(*) from tpch_parquet.lineitem"
     client = self.cluster.get_first_impalad().service.create_beeswax_client()
     client.set_configuration({'retry_failed_queries': 'true'})
     handle = client.execute_async(query)
@@ -922,7 +917,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     """Test query retries with the HS2 protocol. Enable the results set cache 
as well and
     test that query retries work with the results cache."""
     self.cluster.impalads[1].kill()
-    query = self._count_query
+    query = "select count(*) from tpch_parquet.lineitem"
     self.hs2_client.set_configuration({'retry_failed_queries': 'true'})
     self.hs2_client.set_configuration_option('impala.resultset.cache.size', 
'1024')
     handle = self.hs2_client.execute_async(query)
@@ -931,7 +926,7 @@ class TestQueryRetries(CustomClusterTestSuite):
     results = self.hs2_client.fetch(query, handle)
     assert results.success
     assert len(results.data) == 1
-    assert results.data[0] == self._count_query_result
+    assert int(results.data[0]) == 6001215
 
     # Validate the live exec summary.
     retried_query_id = \
diff --git a/tests/query_test/test_aggregation.py 
b/tests/query_test/test_aggregation.py
index 1cf2942ff..0df1767a7 100644
--- a/tests/query_test/test_aggregation.py
+++ b/tests/query_test/test_aggregation.py
@@ -260,6 +260,24 @@ class TestAggregationQueries(ImpalaTestSuite):
       # Verify codegen was enabled for all four stages of the aggregation.
       assert_codegen_enabled(result.runtime_profile, [1, 2, 4, 6])
 
+  def test_parquet_count_star_optimization(self, vector, unique_database):
+    if (vector.get_value('table_format').file_format != 'text' or
+        vector.get_value('table_format').compression_codec != 'none'):
+      # No need to run this test on all file formats
+      pytest.skip()
+    self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
+    vector.get_value('exec_option')['batch_size'] = 1
+    self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
+
+  def test_kudu_count_star_optimization(self, vector, unique_database):
+    if (vector.get_value('table_format').file_format != 'text' or
+       vector.get_value('table_format').compression_codec != 'none'):
+      # No need to run this test on all file formats
+      pytest.skip()
+    self.run_test_case('QueryTest/kudu-stats-agg', vector, unique_database)
+    vector.get_value('exec_option')['batch_size'] = 1
+    self.run_test_case('QueryTest/kudu-stats-agg', vector, unique_database)
+
   def test_ndv(self):
     """Test the version of NDV() that accepts a scale value argument against
     different column data types. The scale argument is an integer in range
@@ -303,55 +321,17 @@ class TestAggregationQueries(ImpalaTestSuite):
       for j in range(0, 11):
         assert(ndv_results[i - 1][j] == int(ndv_vals[j]))
 
-  def test_grouping_sets(self, vector):
-    """Tests for ROLLUP, CUBE and GROUPING SETS."""
-    if vector.get_value('table_format').file_format == 'hbase':
-      pytest.xfail(reason="IMPALA-283 - HBase null handling is inconsistent")
-    self.run_test_case('QueryTest/grouping-sets', vector)
-
-
-class TestAggregationQueriesRunOnce(ImpalaTestSuite):
-  """Run the aggregation test suite similarly as TestAggregationQueries, but 
with stricter
-  constraint. Each test in this class only run once by setting uncompressed 
text dimension
-  for all exploration strategy. However, they may not necessarily target 
uncompressed text
-  table format. This also run with codegen enabled and disabled to exercise our
-  non-codegen code"""
-  @classmethod
-  def get_workload(self):
-    return 'functional-query'
-
-  @classmethod
-  def add_test_dimensions(cls):
-    super(TestAggregationQueriesRunOnce, cls).add_test_dimensions()
-
-    cls.ImpalaTestMatrix.add_dimension(
-      create_exec_option_dimension(disable_codegen_options=[False, True]))
-
-    cls.ImpalaTestMatrix.add_dimension(
-        create_uncompressed_text_dimension(cls.get_workload()))
-
-  def test_parquet_count_star_optimization(self, vector, unique_database):
-    self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
-    vector.get_value('exec_option')['batch_size'] = 1
-    self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database)
-
-  def test_kudu_count_star_optimization(self, vector):
-    self.run_test_case('QueryTest/kudu-stats-agg', vector)
-    vector.get_value('exec_option')['batch_size'] = 1
-    self.run_test_case('QueryTest/kudu-stats-agg', vector)
-
-  def test_orc_count_star_optimization(self, vector):
-    self.run_test_case('QueryTest/orc-stats-agg', vector)
-    vector.get_value('exec_option')['batch_size'] = 1
-    self.run_test_case('QueryTest/orc-stats-agg', vector)
-
-  def test_sampled_ndv(self, vector):
+  def test_sampled_ndv(self, vector, unique_database):
     """The SAMPLED_NDV() function is inherently non-deterministic and cannot be
     reasonably made deterministic with existing options so we test it 
separately.
     The goal of this test is to ensure that SAMPLED_NDV() works on all data 
types
     and returns approximately sensible estimates. It is not the goal of this 
test
     to ensure tight error bounds on the NDV estimates. SAMPLED_NDV() is 
expected
     be inaccurate on small data sets like the ones we use in this test."""
+    if (vector.get_value('table_format').file_format != 'text' or
+        vector.get_value('table_format').compression_codec != 'none'):
+      # No need to run this test on all file formats
+      pytest.skip()
 
     # NDV() is used a baseline to compare SAMPLED_NDV(). Both NDV() and 
SAMPLED_NDV()
     # are based on HyperLogLog so NDV() is roughly the best that SAMPLED_NDV() 
can do.
@@ -405,6 +385,12 @@ class TestAggregationQueriesRunOnce(ImpalaTestSuite):
       for i in range(14, 16):
         self.appx_equals(int(sampled_ndv_vals[i]) * sample_perc, 
int(ndv_vals[i]), 2.0)
 
+  def test_grouping_sets(self, vector):
+    """Tests for ROLLUP, CUBE and GROUPING SETS."""
+    if vector.get_value('table_format').file_format == 'hbase':
+      pytest.xfail(reason="IMPALA-283 - HBase null handling is inconsistent")
+    self.run_test_case('QueryTest/grouping-sets', vector)
+
 
 class TestDistinctAggregation(ImpalaTestSuite):
   """Run the distinct aggregation test suite, with codegen and 
shuffle_distinct_exprs
diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py
index 37ad24e32..4d4db1e37 100644
--- a/tests/query_test/test_iceberg.py
+++ b/tests/query_test/test_iceberg.py
@@ -453,7 +453,6 @@ class TestIcebergTable(IcebergTestSuite):
       assert len(data.data) == 1
       assert expected in data.data
       assert "NumRowGroups" not in data.runtime_profile
-      assert "NumFileMetadataRead" not in data.runtime_profile
 
     def expect_results_t(ts, expected_results, expected_cols):
       expect_results(
diff --git a/tests/query_test/test_scanners.py 
b/tests/query_test/test_scanners.py
index 1813483fa..45072e47a 100644
--- a/tests/query_test/test_scanners.py
+++ b/tests/query_test/test_scanners.py
@@ -1623,8 +1623,7 @@ class TestOrc(ImpalaTestSuite):
   def _misaligned_orc_stripes_helper(
           self, table_name, rows_in_table, num_scanners_with_no_reads=0):
     """Checks if 'num_scanners_with_no_reads' indicates the expected number of 
scanners
-    that don't read anything because the underlying file is poorly formatted.
-    Additionally, test that select count(star) match with expected number of 
rows.
+    that don't read anything because the underlying file is poorly formatted
     """
     query = 'select * from %s' % table_name
     result = self.client.execute(query)
@@ -1645,11 +1644,6 @@ class TestOrc(ImpalaTestSuite):
       total += int(n)
     assert total == num_scanners_with_no_reads
 
-    # Test that select count(star) match with expected number of rows.
-    query = 'select count(*) from %s' % table_name
-    result = self.client.execute(query)
-    assert int(result.data[0]) == rows_in_table
-
   # Skip this test on non-HDFS filesystems, because orc-type-check.test 
contains Hive
   # queries that hang in some cases (IMPALA-9345). It would be possible to 
separate
   # the tests that use Hive and run most tests on S3, but I think that running 
these on
@@ -1783,13 +1777,13 @@ class TestOrc(ImpalaTestSuite):
         "CREATE TABLE {db}.{tbl} (id BIGINT) STORED AS ORC",
         unique_database, test_name, test_files)
     err = self.execute_query_expect_failure(self.client,
-        "select count(id) from {0}.{1}".format(unique_database, test_name))
+        "select count(*) from {0}.{1}".format(unique_database, test_name))
     assert expected_error in str(err)
 
   def test_invalid_schema(self, vector, unique_database):
     """Test scanning of ORC file with malformed schema."""
     self._run_invalid_schema_test(unique_database, "corrupt_schema",
-        "Encountered parse error in tail of ORC file")
+        "Encountered parse error during schema selection")
     self._run_invalid_schema_test(unique_database, "corrupt_root_type",
         "Root of the selected type returned by the ORC lib is not STRUCT: 
boolean.")
 
diff --git a/tests/util/test_file_parser.py b/tests/util/test_file_parser.py
index f1746e45c..256bc5277 100644
--- a/tests/util/test_file_parser.py
+++ b/tests/util/test_file_parser.py
@@ -264,19 +264,15 @@ def parse_test_file_text(text, valid_section_names, 
skip_unknown_sections=True):
       # evaluated for all formats that don't have a commented section for this 
query.
       if subsection_name == 'RUNTIME_PROFILE':
         if subsection_comment is not None and subsection_comment is not "":
-          allowed_formats = ['kudu', 'parquet', 'orc']
+          allowed_formats = ['kudu']
           if not subsection_comment.startswith("table_format="):
             raise RuntimeError('RUNTIME_PROFILE comment (%s) must be of the 
form '
-              '"table_format=FORMAT[,FORMAT2,...]"' % subsection_comment)
-          parsed_formats = subsection_comment[13:].split(',')
-          for table_format in parsed_formats:
-            if table_format not in allowed_formats:
-              raise RuntimeError('RUNTIME_PROFILE table format (%s) must be 
in: %s' %
-                  (table_format, allowed_formats))
-            else:
-              subsection_name_for_format = 'RUNTIME_PROFILE_%s' % table_format
-              parsed_sections[subsection_name_for_format] = subsection_str
-          continue
+              '"table_format=FORMAT"' % subsection_comment)
+          table_format = subsection_comment[13:]
+          if table_format not in allowed_formats:
+            raise RuntimeError('RUNTIME_PROFILE table format (%s) must be in: 
%s' %
+                (table_format, allowed_formats))
+          subsection_name = 'RUNTIME_PROFILE_%s' % table_format
 
       parsed_sections[subsection_name] = subsection_str
 

Reply via email to