This is an automated email from the ASF dual-hosted git repository. joemcdonnell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit 7ca20b3c94b1c9c1ddd4ed1e89f0969a0df55330 Author: Riza Suminto <[email protected]> AuthorDate: Thu May 4 14:45:43 2023 -0700 Revert "IMPALA-11123: Optimize count(star) for ORC scans" This reverts commit f932d78ad0a30e322d59fc39072f710f889d2135. The commit is reverted because it cause significant regression for non-optimized counts star query in parquet format. There are several conflicts that need to be resolved manually: - Removed assertion against 'NumFileMetadataRead' counter that is lost with the revert. - Adjust the assertion in test_plain_count_star_optimization, test_in_predicate_push_down, and test_partitioned_insert of test_iceberg.py due to missing improvement in parquet optimized count star code path. - Keep the "override" specifier in hdfs-parquet-scanner.h to pass clang-tidy - Keep python3 style of RuntimeError instantiation in test_file_parser.py to pass check-python-syntax.sh Change-Id: Iefd8fd0838638f9db146f7b706e541fe2aaf01c1 Reviewed-on: http://gerrit.cloudera.org:8080/19843 Tested-by: Impala Public Jenkins <[email protected]> Reviewed-by: Wenzhe Zhou <[email protected]> --- be/src/exec/hdfs-columnar-scanner.cc | 72 +--- be/src/exec/hdfs-columnar-scanner.h | 37 -- be/src/exec/hdfs-scan-node-base.cc | 5 +- be/src/exec/hdfs-scan-node-base.h | 41 +- be/src/exec/hdfs-scanner.cc | 5 +- be/src/exec/orc/hdfs-orc-scanner.cc | 64 ++-- be/src/exec/orc/hdfs-orc-scanner.h | 21 +- be/src/exec/orc/orc-column-readers.h | 4 +- be/src/exec/parquet/hdfs-parquet-scanner.cc | 103 +++-- be/src/exec/parquet/hdfs-parquet-scanner.h | 23 +- be/src/exec/parquet/parquet-column-readers.h | 2 +- common/thrift/PlanNodes.thrift | 5 +- .../org/apache/impala/planner/HdfsScanNode.java | 20 +- .../org/apache/impala/planner/PlannerTest.java | 5 - .../queries/PlannerTest/orc-stats-agg.test | 426 --------------------- .../queries/PlannerTest/parquet-stats-agg.test | 31 +- .../queries/PlannerTest/resource-requirements.test | 9 +- .../iceberg-compound-predicate-push-down.test | 2 - .../QueryTest/iceberg-in-predicate-push-down.test | 10 +- .../iceberg-is-null-predicate-push-down.test | 1 - .../QueryTest/iceberg-partitioned-insert.test | 25 +- .../iceberg-plain-count-star-optimization.test | 24 +- .../iceberg-upper-lower-bound-metrics.test | 3 - .../iceberg-v2-plain-count-star-optimization.test | 6 +- .../iceberg-v2-read-position-deletes-orc.test | 8 - .../iceberg-v2-read-position-deletes.test | 12 - .../queries/QueryTest/mixed-format.test | 13 +- .../queries/QueryTest/orc-stats-agg.test | 152 -------- .../queries/QueryTest/parquet-stats-agg.test | 59 --- .../queries/QueryTest/partition-key-scans.test | 28 -- .../queries/QueryTest/runtime_filters.test | 8 - .../queries/QueryTest/runtime_filters_mt_dop.test | 8 - .../queries/QueryTest/scanners.test | 3 - tests/custom_cluster/test_executor_groups.py | 2 +- tests/custom_cluster/test_query_retries.py | 33 +- tests/query_test/test_aggregation.py | 72 ++-- tests/query_test/test_iceberg.py | 1 - tests/query_test/test_scanners.py | 12 +- tests/util/test_file_parser.py | 18 +- 39 files changed, 269 insertions(+), 1104 deletions(-) diff --git a/be/src/exec/hdfs-columnar-scanner.cc b/be/src/exec/hdfs-columnar-scanner.cc index 28da3c40a..913142936 100644 --- a/be/src/exec/hdfs-columnar-scanner.cc +++ b/be/src/exec/hdfs-columnar-scanner.cc @@ -64,27 +64,20 @@ PROFILE_DEFINE_COUNTER(IoReadTotalBytes, DEBUG, TUnit::BYTES, "The total number of bytes read from streams."); PROFILE_DEFINE_COUNTER(IoReadSkippedBytes, DEBUG, TUnit::BYTES, "The total number of bytes skipped from streams."); -PROFILE_DEFINE_COUNTER(NumFileMetadataRead, DEBUG, TUnit::UNIT, - "The total number of file metadata reads done in place of rows or row groups / " - "stripe iteration."); const char* HdfsColumnarScanner::LLVM_CLASS_NAME = "class.impala::HdfsColumnarScanner"; -HdfsColumnarScanner::HdfsColumnarScanner(HdfsScanNodeBase* scan_node, RuntimeState* state) - : HdfsScanner(scan_node, state), +HdfsColumnarScanner::HdfsColumnarScanner(HdfsScanNodeBase* scan_node, + RuntimeState* state) : + HdfsScanner(scan_node, state), scratch_batch_(new ScratchTupleBatch( - *scan_node->row_desc(), state_->batch_size(), scan_node->mem_tracker())), - assemble_rows_timer_(scan_node->materialize_tuple_timer()) {} + *scan_node->row_desc(), state_->batch_size(), scan_node->mem_tracker())) { +} HdfsColumnarScanner::~HdfsColumnarScanner() {} Status HdfsColumnarScanner::Open(ScannerContext* context) { RETURN_IF_ERROR(HdfsScanner::Open(context)); - // Memorize 'is_footer_scanner_' here since 'stream_' can be released early. - const io::ScanRange* range = stream_->scan_range(); - is_footer_scanner_ = - range->offset() + range->bytes_to_read() >= stream_->file_desc()->file_length; - RuntimeProfile* profile = scan_node_->runtime_profile(); num_cols_counter_ = PROFILE_NumColumns.Instantiate(profile); num_scanners_with_no_reads_counter_ = @@ -102,7 +95,6 @@ Status HdfsColumnarScanner::Open(ScannerContext* context) { io_total_request_ = PROFILE_IoReadTotalRequest.Instantiate(profile); io_total_bytes_ = PROFILE_IoReadTotalBytes.Instantiate(profile); io_skipped_bytes_ = PROFILE_IoReadSkippedBytes.Instantiate(profile); - num_file_metadata_read_ = PROFILE_NumFileMetadataRead.Instantiate(profile); return Status::OK(); } @@ -298,60 +290,6 @@ Status HdfsColumnarScanner::DivideReservationBetweenColumns( return Status::OK(); } -Status HdfsColumnarScanner::GetNextWithCountStarOptimization(RowBatch* row_batch) { - // There are no materialized slots, e.g. count(*) over the table. We can serve - // this query from just the file metadata. We don't need to read the column data. - // Only scanner of the footer split will run in this case. See the logic in - // HdfsScanner::IssueFooterRanges() and HdfsScanNodeBase::ReadsFileMetadataOnly(). - DCHECK(is_footer_scanner_); - int64_t tuple_buffer_size; - uint8_t* tuple_buffer; - int capacity = 1; - RETURN_IF_ERROR(row_batch->ResizeAndAllocateTupleBuffer(state_, - row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(), &capacity, - &tuple_buffer_size, &tuple_buffer)); - int64_t num_rows = GetNumberOfRowsInFile(); - COUNTER_ADD(num_file_metadata_read_, 1); - DCHECK_LE(rows_read_in_group_, num_rows); - Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buffer); - InitTuple(template_tuple_, dst_tuple); - int64_t* dst_slot = dst_tuple->GetBigIntSlot(scan_node_->count_star_slot_offset()); - *dst_slot = num_rows; - TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow()); - dst_row->SetTuple(0, dst_tuple); - row_batch->CommitLastRow(); - rows_read_in_group_ += num_rows; - eos_ = true; - return Status::OK(); -} - -Status HdfsColumnarScanner::GetNextWithTemplateTuple(RowBatch* row_batch) { - // There are no materialized slots, e.g. "select 1" over the table. We can serve - // this query from just the file metadata. We don't need to read the column data. - // Only scanner of the footer split will run in this case. See the logic in - // HdfsScanner::IssueFooterRanges() and HdfsScanNodeBase::ReadsFileMetadataOnly(). - // We might also get here for count(*) query against full acid table such as: - // "select count(*) from functional_orc_def.alltypes;" - DCHECK(is_footer_scanner_); - int64_t file_rows = GetNumberOfRowsInFile(); - COUNTER_ADD(num_file_metadata_read_, 1); - if (rows_read_in_group_ == file_rows) { - eos_ = true; - return Status::OK(); - } - assemble_rows_timer_.Start(); - DCHECK_LT(rows_read_in_group_, file_rows); - int64_t rows_remaining = file_rows - rows_read_in_group_; - int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining); - TupleRow* current_row = row_batch->GetRow(row_batch->AddRow()); - int num_to_commit = WriteTemplateTuples(current_row, max_tuples); - Status status = CommitRows(num_to_commit, row_batch); - assemble_rows_timer_.Stop(); - RETURN_IF_ERROR(status); - rows_read_in_group_ += max_tuples; - return Status::OK(); -} - void HdfsColumnarScanner::AddSyncReadBytesCounter(int64_t total_bytes) { io_sync_request_->Add(1); io_total_request_->Add(1); diff --git a/be/src/exec/hdfs-columnar-scanner.h b/be/src/exec/hdfs-columnar-scanner.h index af423cff8..472863bf2 100644 --- a/be/src/exec/hdfs-columnar-scanner.h +++ b/be/src/exec/hdfs-columnar-scanner.h @@ -60,25 +60,6 @@ class HdfsColumnarScanner : public HdfsScanner { /// top-level tuples. See AssembleRows() in the derived classes. boost::scoped_ptr<ScratchTupleBatch> scratch_batch_; - /// Timer for materializing rows. This ignores time getting the next buffer. - ScopedTimer<MonotonicStopWatch> assemble_rows_timer_; - - /// Index of the current row group / stripe being processed. Initialized to -1 which - /// indicates that we have not started processing the first group yet (GetNext() has - /// not yet been called). - int32_t group_idx_ = -1; - - /// Counts the number of rows processed for the current row group / stripe. - int64_t rows_read_in_group_ = 0; - - /// Indicates whether we should advance to the next row group / stripe in the next - /// GetNext(). Starts out as true to move to the very first row group. - bool advance_group_ = true; - - /// Indicate whether this is a footer scanner or not. - /// Assigned in HdfsColumnarScanner::Open(). - bool is_footer_scanner_ = false; - /// Scan range for the metadata. const io::ScanRange* metadata_range_ = nullptr; @@ -119,9 +100,6 @@ class HdfsColumnarScanner : public HdfsScanner { Status DivideReservationBetweenColumns(const ColumnRangeLengths& col_range_lengths, ColumnReservations& reservation_per_column); - /// Get the number of rows in file. - virtual int64_t GetNumberOfRowsInFile() const = 0; - /// Helper for DivideReservationBetweenColumns(). Implements the core algorithm for /// dividing a reservation of 'reservation_to_distribute' bytes between columns with /// scan range lengths 'col_range_lengths' given a min and max buffer size. Returns @@ -136,16 +114,6 @@ class HdfsColumnarScanner : public HdfsScanner { /// in ExecEnv. static int64_t ComputeIdealReservation(const ColumnRangeLengths& col_range_lengths); - /// Handle count(*) queries by reading the row count from the footer statistics. - /// The optimization is possible only in simpler cases e.g. when there are no conjucts. - /// Check ScanNode.java#canApplyCountStarOptimization for full detail. - Status GetNextWithCountStarOptimization(RowBatch* row_batch); - - /// Handle zero slot scan queries by reading the row count from the footer statistics. - /// Possible queries include "select 1" or "select count(*)" over full acid table that - /// does not require row validation. - Status GetNextWithTemplateTuple(RowBatch* row_batch); - /// Number of columns that need to be read. RuntimeProfile::Counter* num_cols_counter_; @@ -174,11 +142,6 @@ class HdfsColumnarScanner : public HdfsScanner { /// Total number of bytes skipped during stream reading. RuntimeProfile::Counter* io_skipped_bytes_; - /// Total file metadata reads done. - /// Incremented when serving query from metadata instead of iterating rows or - /// row groups / stripes. - RuntimeProfile::Counter* num_file_metadata_read_; - private: int ProcessScratchBatchCodegenOrInterpret(RowBatch* dst_batch); }; diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index ca3fab885..96c5934f0 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -450,8 +450,9 @@ HdfsScanNodeBase::HdfsScanNodeBase(ObjectPool* pool, const HdfsScanPlanNode& pno hdfs_scan_node.skip_header_line_count : 0), tuple_id_(pnode.tuple_id_), - count_star_slot_offset_(hdfs_scan_node.__isset.count_star_slot_offset ? - hdfs_scan_node.count_star_slot_offset : + parquet_count_star_slot_offset_( + hdfs_scan_node.__isset.parquet_count_star_slot_offset ? + hdfs_scan_node.parquet_count_star_slot_offset : -1), is_partition_key_scan_(hdfs_scan_node.is_partition_key_scan), tuple_desc_(pnode.tuple_desc_), diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index 56a8157aa..160d4d782 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -30,7 +30,6 @@ #include <boost/unordered_map.hpp> #include "codegen/codegen-fn-ptr.h" -#include "exec/acid-metadata-utils.h" #include "exec/file-metadata-utils.h" #include "exec/filter-context.h" #include "exec/scan-node.h" @@ -464,12 +463,10 @@ class HdfsScanNodeBase : public ScanNode { const AvroSchemaElement& avro_schema() const { return avro_schema_; } int skip_header_line_count() const { return skip_header_line_count_; } io::RequestContext* reader_context() const { return reader_context_.get(); } - bool optimize_count_star() const { - bool is_optimized = count_star_slot_offset_ != -1; - DCHECK(!hdfs_table_->IsTableFullAcid() || !is_optimized); - return is_optimized; + bool optimize_parquet_count_star() const { + return parquet_count_star_slot_offset_ != -1; } - int count_star_slot_offset() const { return count_star_slot_offset_; } + int parquet_count_star_slot_offset() const { return parquet_count_star_slot_offset_; } bool is_partition_key_scan() const { return is_partition_key_scan_; } typedef std::unordered_map<TupleId, std::vector<ScalarExprEvaluator*>> @@ -601,32 +598,6 @@ class HdfsScanNodeBase : public ScanNode { virtual_column_slots().empty(); } - /// Return true if scan over 'filename' require row validation. - /// Hive Streaming Ingestion allocates multiple write ids, hence create delta - /// directories like delta_5_10. Then it continuously appends new stripes (and footers) - /// to the ORC files of the delte dir. So it's possible that the file has rows that - /// Impala is not allowed to see based on its valid write id list. In such cases we need - /// to validate the write ids of the row batches. - inline bool RequireRowValidation(std::string filename) const { - if (!hdfs_table()->IsTableFullAcid()) return false; - if (ValidWriteIdList::IsCompacted(filename)) return false; - ValidWriteIdList valid_write_ids; - std::pair<int64_t, int64_t> acid_write_id_range = - valid_write_ids.GetWriteIdRange(filename); - valid_write_ids.InitFrom(hdfs_table()->ValidWriteIdList()); - ValidWriteIdList::RangeResponse rows_valid = valid_write_ids.IsWriteIdRangeValid( - acid_write_id_range.first, acid_write_id_range.second); - DCHECK_NE(rows_valid, ValidWriteIdList::NONE); - return rows_valid == ValidWriteIdList::SOME; - } - - /// Return true if scan over 'filename 'can be served only by reading the file metadata, - /// such as a count(*) over the table. - inline bool ReadsFileMetadataOnly(std::string filename) const { - return !RequireRowValidation(filename) - && (IsZeroSlotTableScan() || optimize_count_star()); - } - /// Transfers all memory from 'pool' to shared state of all scanners. void TransferToSharedStatePool(MemPool* pool); @@ -715,11 +686,11 @@ class HdfsScanNodeBase : public ScanNode { /// Tuple id of the tuple descriptor to be used. const int tuple_id_; - /// The byte offset of the slot for Parquet/ORC metadata if count star optimization + /// The byte offset of the slot for Parquet metadata if Parquet count star optimization /// is enabled. When set, this scan node can optimize a count(*) query by populating /// the tuple with data from the Parquet num rows statistic. See - /// applyCountStarOptimization() in ScanNode.java. - const int count_star_slot_offset_; + /// applyParquetCountStartOptimization() in HdfsScanNode.java. + const int parquet_count_star_slot_offset_; // True if this is a partition key scan that needs only to return at least one row from // each scan range. If true, the scan node and scanner implementations should attempt diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index 49f360ed1..de3807bec 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -873,13 +873,12 @@ Status HdfsScanner::IssueFooterRanges(HdfsScanNodeBase* scan_node, ScanRange* split = files[i]->splits[j]; DCHECK_LE(split->offset() + split->len(), files[i]->file_length); - // If scan only reads file metadata (such as count(*) over the table), we can + // If there are no materialized slots (such as count(*) over the table), we can // get the result with the file metadata alone and don't need to read any row // groups. We only want a single node to process the file footer in this case, // which is the node with the footer split. If it's not a count(*), we create a // footer range for the split always. - if (!scan_node->ReadsFileMetadataOnly(files[i]->filename) - || footer_split == split) { + if (!scan_node->IsZeroSlotTableScan() || footer_split == split) { ScanRangeMetadata* split_metadata = static_cast<ScanRangeMetadata*>(split->meta_data()); // Each split is processed by first issuing a scan range for the file footer, which diff --git a/be/src/exec/orc/hdfs-orc-scanner.cc b/be/src/exec/orc/hdfs-orc-scanner.cc index a4193dca5..595f2d60a 100644 --- a/be/src/exec/orc/hdfs-orc-scanner.cc +++ b/be/src/exec/orc/hdfs-orc-scanner.cc @@ -311,7 +311,8 @@ HdfsOrcScanner::HdfsOrcScanner(HdfsScanNodeBase* scan_node, RuntimeState* state) : HdfsColumnarScanner(scan_node, state), dictionary_pool_(new MemPool(scan_node->mem_tracker())), data_batch_pool_(new MemPool(scan_node->mem_tracker())), - search_args_pool_(new MemPool(scan_node->mem_tracker())) { + search_args_pool_(new MemPool(scan_node->mem_tracker())), + assemble_rows_timer_(scan_node_->materialize_tuple_timer()) { assemble_rows_timer_.Stop(); } @@ -405,9 +406,9 @@ Status HdfsOrcScanner::Open(ScannerContext* context) { row_batches_need_validation_ = rows_valid == ValidWriteIdList::SOME; } - if (scan_node_->optimize_count_star() && !row_batches_need_validation_) { - template_tuple_ = template_tuple_map_[scan_node_->tuple_desc()]; - return Status::OK(); + if (UNLIKELY(scan_node_->optimize_parquet_count_star())) { + DCHECK(false); + return Status("Internal ERROR: ORC scanner cannot optimize count star slot."); } // Update 'row_reader_options_' based on the tuple descriptor so the ORC lib can skip @@ -777,20 +778,29 @@ Status HdfsOrcScanner::ProcessSplit() { } Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) { - if (row_batches_need_validation_) { - // In case 'row_batches_need_validation_' is true, we need to look at the row - // batches and check their validity. This might be a zero slot scan, which - // 'currentTransaction' is the only selected field from the file. And this should - // not be an optimized count(*) because it is disabled for full acid table. - DCHECK(!scan_node_->optimize_count_star()); - } else if (scan_node_->optimize_count_star()) { - // This is an optimized count(*) case. - // For each file, populate one slot with the footer's numberOfRows statistic. - return GetNextWithCountStarOptimization(row_batch); - } else if (scan_node_->IsZeroSlotTableScan()) { - // There are no materialized slots, e.g. "select 1" over the table. We can serve + // In case 'row_batches_need_validation_' is true, we need to look at the row + // batches and check their validity. In that case 'currentTransaction' is the only + // selected field from the file (in case of zero slot scans). + if (scan_node_->IsZeroSlotTableScan() && !row_batches_need_validation_) { + uint64_t file_rows = reader_->getNumberOfRows(); + // There are no materialized slots, e.g. count(*) over the table. We can serve // this query from just the file metadata. We don't need to read the column data. - return GetNextWithTemplateTuple(row_batch); + if (stripe_rows_read_ == file_rows) { + eos_ = true; + return Status::OK(); + } + assemble_rows_timer_.Start(); + DCHECK_LT(stripe_rows_read_, file_rows); + int64_t rows_remaining = file_rows - stripe_rows_read_; + int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining); + TupleRow* current_row = row_batch->GetRow(row_batch->AddRow()); + int num_to_commit = WriteTemplateTuples(current_row, max_tuples); + Status status = CommitRows(num_to_commit, row_batch); + assemble_rows_timer_.Stop(); + RETURN_IF_ERROR(status); + stripe_rows_read_ += max_tuples; + COUNTER_ADD(scan_node_->rows_read_counter(), num_to_commit); + return Status::OK(); } if (!scratch_batch_->AtEnd()) { @@ -822,7 +832,7 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) { // 'advance_stripe_' is updated in 'NextStripe', meaning the current stripe we advance // to can be skip. 'end_of_stripe_' marks whether current stripe is drained. It's only // set to true in 'AssembleRows'. - while (advance_group_ || end_of_stripe_) { + while (advance_stripe_ || end_of_stripe_) { // The next stripe will use a new dictionary blob so transfer the memory to row_batch. row_batch->tuple_data_pool()->AcquireData(dictionary_pool_.get(), false); context_->ReleaseCompletedResources(/* done */ true); @@ -830,8 +840,8 @@ Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) { RETURN_IF_ERROR(CommitRows(0, row_batch)); RETURN_IF_ERROR(NextStripe()); - DCHECK_LE(group_idx_, reader_->getNumberOfStripes()); - if (group_idx_ == reader_->getNumberOfStripes()) { + DCHECK_LE(stripe_idx_, reader_->getNumberOfStripes()); + if (stripe_idx_ == reader_->getNumberOfStripes()) { eos_ = true; DCHECK(parse_status_.ok()); return Status::OK(); @@ -873,19 +883,19 @@ Status HdfsOrcScanner::NextStripe() { int64_t split_offset = split_range->offset(); int64_t split_length = split_range->len(); - bool start_with_first_stripe = group_idx_ == -1; + bool start_with_first_stripe = stripe_idx_ == -1; bool misaligned_stripe_skipped = false; - advance_group_ = false; - rows_read_in_group_ = 0; + advance_stripe_ = false; + stripe_rows_read_ = 0; // Loop until we have found a non-empty stripe. while (true) { // Reset the parse status for the next stripe. parse_status_ = Status::OK(); - ++group_idx_; - if (group_idx_ >= reader_->getNumberOfStripes()) { + ++stripe_idx_; + if (stripe_idx_ >= reader_->getNumberOfStripes()) { if (start_with_first_stripe && misaligned_stripe_skipped) { // We started with the first stripe and skipped all the stripes because they were // misaligned. The execution flow won't reach this point if there is at least one @@ -894,7 +904,7 @@ Status HdfsOrcScanner::NextStripe() { } break; } - unique_ptr<orc::StripeInformation> stripe = reader_->getStripe(group_idx_); + unique_ptr<orc::StripeInformation> stripe = reader_->getStripe(stripe_idx_); // Also check 'footer_.numberOfRows' to make sure 'select count(*)' and 'select *' // behave consistently for corrupt files that have 'footer_.numberOfRows == 0' // but some data in stripe. @@ -967,7 +977,7 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) { if (row_batch->AtCapacity()) break; continue_execution &= !scan_node_->ReachedLimitShared() && !context_->cancelled(); } - rows_read_in_group_ += num_rows_read; + stripe_rows_read_ += num_rows_read; COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read); // Merge Scanner-local counter into HdfsScanNode counter and reset. COUNTER_ADD(scan_node_->collection_items_read_counter(), coll_items_read_counter_); diff --git a/be/src/exec/orc/hdfs-orc-scanner.h b/be/src/exec/orc/hdfs-orc-scanner.h index 7383f4942..bf52787bc 100644 --- a/be/src/exec/orc/hdfs-orc-scanner.h +++ b/be/src/exec/orc/hdfs-orc-scanner.h @@ -185,11 +185,6 @@ class HdfsOrcScanner : public HdfsColumnarScanner { return THdfsFileFormat::ORC; } - protected: - virtual int64_t GetNumberOfRowsInFile() const override { - return static_cast<int64_t>(reader_->getNumberOfRows()); - } - private: friend class OrcColumnReader; friend class OrcDateColumnReader; @@ -200,10 +195,23 @@ class HdfsOrcScanner : public HdfsColumnarScanner { friend class OrcStructReader; friend class OrcListReader; friend class OrcMapReader; + friend class HdfsOrcScannerTest; /// Memory guard of the tuple_mem_ uint8_t* tuple_mem_end_ = nullptr; + /// Index of the current stripe being processed. Stripe in ORC is equivalent to + /// RowGroup in Parquet. Initialized to -1 which indicates that we have not started + /// processing the first stripe yet (GetNext() has not yet been called). + int32_t stripe_idx_ = -1; + + /// Counts the number of rows processed for the current stripe. + int64_t stripe_rows_read_ = 0; + + /// Indicates whether we should advance to the next stripe in the next GetNext(). + /// Starts out as true to move to the very first stripe. + bool advance_stripe_ = true; + /// Indicates whether we are at the end of a stripe. bool end_of_stripe_ = true; @@ -288,6 +296,9 @@ class HdfsOrcScanner : public HdfsColumnarScanner { /// offset, and there are no two overlapping range. vector<ColumnRange> columnRanges_; + /// Timer for materializing rows. This ignores time getting the next buffer. + ScopedTimer<MonotonicStopWatch> assemble_rows_timer_; + /// Number of stripes that need to be read. RuntimeProfile::Counter* num_stripes_counter_ = nullptr; diff --git a/be/src/exec/orc/orc-column-readers.h b/be/src/exec/orc/orc-column-readers.h index f67e487de..3adba8049 100644 --- a/be/src/exec/orc/orc-column-readers.h +++ b/be/src/exec/orc/orc-column-readers.h @@ -337,8 +337,8 @@ class OrcStringColumnReader : public OrcPrimitiveColumnReader<OrcStringColumnRea } DCHECK(static_cast<orc::EncodedStringVectorBatch*>(batch_) == dynamic_cast<orc::EncodedStringVectorBatch*>(orc_batch)); - if (last_stripe_idx_ != scanner_->group_idx_) { - last_stripe_idx_ = scanner_->group_idx_; + if (last_stripe_idx_ != scanner_->stripe_idx_) { + last_stripe_idx_ = scanner_->stripe_idx_; auto current_batch = static_cast<orc::EncodedStringVectorBatch*>(batch_); return InitBlob(¤t_batch->dictionary->dictionaryBlob, scanner_->dictionary_pool_.get()); diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.cc b/be/src/exec/parquet/hdfs-parquet-scanner.cc index 13242a7e9..d540a3f11 100644 --- a/be/src/exec/parquet/hdfs-parquet-scanner.cc +++ b/be/src/exec/parquet/hdfs-parquet-scanner.cc @@ -86,10 +86,14 @@ Status HdfsParquetScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, HdfsParquetScanner::HdfsParquetScanner(HdfsScanNodeBase* scan_node, RuntimeState* state) : HdfsColumnarScanner(scan_node, state), + row_group_idx_(-1), + row_group_rows_read_(0), + advance_row_group_(true), min_max_tuple_(nullptr), row_batches_produced_(0), dictionary_pool_(new MemPool(scan_node->mem_tracker())), stats_batch_read_pool_(new MemPool(scan_node->mem_tracker())), + assemble_rows_timer_(scan_node_->materialize_tuple_timer()), num_stats_filtered_row_groups_counter_(nullptr), num_minmax_filtered_row_groups_counter_(nullptr), num_bloom_filtered_row_groups_counter_(nullptr), @@ -379,7 +383,7 @@ static bool CheckRowGroupOverlapsSplit(const parquet::RowGroup& row_group, int HdfsParquetScanner::CountScalarColumns( const vector<ParquetColumnReader*>& column_readers) { - DCHECK(!column_readers.empty() || scan_node_->optimize_count_star()); + DCHECK(!column_readers.empty() || scan_node_->optimize_parquet_count_star()); int num_columns = 0; stack<ParquetColumnReader*> readers; for (ParquetColumnReader* r: column_readers_) readers.push(r); @@ -429,15 +433,55 @@ Status HdfsParquetScanner::ProcessSplit() { Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) { DCHECK(parse_status_.ok()) << parse_status_.GetDetail(); - if (scan_node_->optimize_count_star()) { - // This is an optimized count(*) case. + if (scan_node_->optimize_parquet_count_star()) { // Populate the single slot with the Parquet num rows statistic. - return GetNextWithCountStarOptimization(row_batch); + int64_t tuple_buf_size; + uint8_t* tuple_buf; + // We try to allocate a smaller row batch here because in most cases the number row + // groups in a file is much lower than the default row batch capacity. + int capacity = min( + static_cast<int>(file_metadata_.row_groups.size()), row_batch->capacity()); + RETURN_IF_ERROR(RowBatch::ResizeAndAllocateTupleBuffer(state_, + row_batch->tuple_data_pool(), row_batch->row_desc()->GetRowSize(), + &capacity, &tuple_buf_size, &tuple_buf)); + while (!row_batch->AtCapacity()) { + RETURN_IF_ERROR(NextRowGroup()); + DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size()); + DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows); + if (row_group_idx_ == file_metadata_.row_groups.size()) break; + Tuple* dst_tuple = reinterpret_cast<Tuple*>(tuple_buf); + TupleRow* dst_row = row_batch->GetRow(row_batch->AddRow()); + InitTuple(template_tuple_, dst_tuple); + int64_t* dst_slot = + dst_tuple->GetBigIntSlot(scan_node_->parquet_count_star_slot_offset()); + *dst_slot = file_metadata_.row_groups[row_group_idx_].num_rows; + row_group_rows_read_ += *dst_slot; + dst_row->SetTuple(0, dst_tuple); + row_batch->CommitLastRow(); + tuple_buf += scan_node_->tuple_desc()->byte_size(); + } + eos_ = row_group_idx_ == file_metadata_.row_groups.size(); + return Status::OK(); } else if (scan_node_->IsZeroSlotTableScan()) { // There are no materialized slots and we are not optimizing count(*), e.g. // "select 1 from alltypes". We can serve this query from just the file metadata. // We don't need to read the column data. - return GetNextWithTemplateTuple(row_batch); + if (row_group_rows_read_ == file_metadata_.num_rows) { + eos_ = true; + return Status::OK(); + } + assemble_rows_timer_.Start(); + DCHECK_LE(row_group_rows_read_, file_metadata_.num_rows); + int64_t rows_remaining = file_metadata_.num_rows - row_group_rows_read_; + int max_tuples = min<int64_t>(row_batch->capacity(), rows_remaining); + TupleRow* current_row = row_batch->GetRow(row_batch->AddRow()); + int num_to_commit = WriteTemplateTuples(current_row, max_tuples); + Status status = CommitRows(row_batch, num_to_commit); + assemble_rows_timer_.Stop(); + RETURN_IF_ERROR(status); + row_group_rows_read_ += max_tuples; + COUNTER_ADD(scan_node_->rows_read_counter(), max_tuples); + return Status::OK(); } // Transfer remaining tuples from the scratch batch. @@ -449,18 +493,18 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) { if (row_batch->AtCapacity()) return Status::OK(); } - while (advance_group_ || column_readers_[0]->RowGroupAtEnd()) { + while (advance_row_group_ || column_readers_[0]->RowGroupAtEnd()) { // Transfer resources and clear streams if there is any leftover from the previous // row group. We will create new streams for the next row group. FlushRowGroupResources(row_batch); - if (!advance_group_) { + if (!advance_row_group_) { Status status = - ValidateEndOfRowGroup(column_readers_, group_idx_, rows_read_in_group_); + ValidateEndOfRowGroup(column_readers_, row_group_idx_, row_group_rows_read_); if (!status.ok()) RETURN_IF_ERROR(state_->LogOrReturnError(status.msg())); } RETURN_IF_ERROR(NextRowGroup()); - DCHECK_LE(group_idx_, file_metadata_.row_groups.size()); - if (group_idx_ == file_metadata_.row_groups.size()) { + DCHECK_LE(row_group_idx_, file_metadata_.row_groups.size()); + if (row_group_idx_ == file_metadata_.row_groups.size()) { eos_ = true; DCHECK(parse_status_.ok()); return Status::OK(); @@ -481,9 +525,9 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch) { assemble_rows_timer_.Start(); Status status; if (filter_pages_) { - status = AssembleRows<true>(row_batch, &advance_group_); + status = AssembleRows<true>(row_batch, &advance_row_group_); } else { - status = AssembleRows<false>(row_batch, &advance_group_); + status = AssembleRows<false>(row_batch, &advance_row_group_); } assemble_rows_timer_.Stop(); RETURN_IF_ERROR(status); @@ -819,11 +863,11 @@ Status HdfsParquetScanner::NextRowGroup() { const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(context_->partition_descriptor()->id(), filename()); - bool start_with_first_row_group = group_idx_ == -1; + bool start_with_first_row_group = row_group_idx_ == -1; bool misaligned_row_group_skipped = false; - advance_group_ = false; - rows_read_in_group_ = 0; + advance_row_group_ = false; + row_group_rows_read_ = 0; // Loop until we have found a non-empty row group, and successfully initialized and // seeded the column readers. Return a non-OK status from within loop only if the error @@ -835,8 +879,8 @@ Status HdfsParquetScanner::NextRowGroup() { // or previous row groups. DCHECK_EQ(0, context_->NumStreams()); - ++group_idx_; - if (group_idx_ >= file_metadata_.row_groups.size()) { + ++row_group_idx_; + if (row_group_idx_ >= file_metadata_.row_groups.size()) { if (start_with_first_row_group && misaligned_row_group_skipped) { // We started with the first row group and skipped all the row groups because // they were misaligned. The execution flow won't reach this point if there is at @@ -845,7 +889,7 @@ Status HdfsParquetScanner::NextRowGroup() { } break; } - const parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_]; + const parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; // Also check 'file_metadata_.num_rows' to make sure 'select count(*)' and 'select *' // behave consistently for corrupt files that have 'file_metadata_.num_rows == 0' // but some data in row groups. @@ -854,7 +898,7 @@ Status HdfsParquetScanner::NextRowGroup() { // Let's find the index of the first row in this row group. It's needed to track the // file position of each row. int64_t row_group_first_row = 0; - for (int i = 0; i < group_idx_; ++i) { + for (int i = 0; i < row_group_idx_; ++i) { const parquet::RowGroup& row_group = file_metadata_.row_groups[i]; row_group_first_row += row_group.num_rows; } @@ -1280,7 +1324,7 @@ Status HdfsParquetScanner::ProcessPageIndex() { MonotonicStopWatch single_process_page_index_timer; single_process_page_index_timer.Start(); ResetPageFiltering(); - RETURN_IF_ERROR(page_index_.ReadAll(group_idx_)); + RETURN_IF_ERROR(page_index_.ReadAll(row_group_idx_)); if (page_index_.IsEmpty()) return Status::OK(); // We can release the raw page index buffer when we exit this function. const auto scope_exit = MakeScopeExitTrigger([this](){page_index_.Release();}); @@ -1404,7 +1448,7 @@ Status HdfsParquetScanner::FindSkipRangesForPagesWithMinMaxFilters( } min_max_tuple_->Init(min_max_tuple_desc->byte_size()); - parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_]; + parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; int filtered_pages = 0; @@ -1507,7 +1551,7 @@ Status HdfsParquetScanner::FindSkipRangesForPagesWithMinMaxFilters( } Status HdfsParquetScanner::EvaluatePageIndex() { - parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_]; + parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; vector<RowRange> skip_ranges; for (int i = 0; i < stats_conjunct_evals_.size(); ++i) { @@ -1597,7 +1641,7 @@ Status HdfsParquetScanner::EvaluatePageIndex() { Status HdfsParquetScanner::ComputeCandidatePagesForColumns() { if (candidate_ranges_.empty()) return Status::OK(); - parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_]; + parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; for (BaseScalarColumnReader* scalar_reader : scalar_readers_) { const auto& page_locations = scalar_reader->offset_index_.page_locations; if (!ComputeCandidatePages(page_locations, candidate_ranges_, row_group.num_rows, @@ -2186,8 +2230,7 @@ Status HdfsParquetScanner::ProcessBloomFilter(const parquet::RowGroup& if (!bloom_filter.Find(hash)) { *skip_row_group = true; VLOG(3) << Substitute("Row group with idx $0 filtered by Parquet Bloom filter on " - "column with idx $1 in file $2.", - group_idx_, col_idx, filename()); + "column with idx $1 in file $2.", row_group_idx_, col_idx, filename()); return Status::OK(); } } @@ -2261,7 +2304,7 @@ Status HdfsParquetScanner::AssembleRowsWithoutLateMaterialization( RETURN_IF_ERROR(CommitRows(row_batch, num_row_to_commit)); if (row_batch->AtCapacity()) break; } - rows_read_in_group_ += num_rows_read; + row_group_rows_read_ += num_rows_read; COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read); // Merge Scanner-local counter into HdfsScanNode counter and reset. COUNTER_ADD(scan_node_->collection_items_read_counter(), coll_items_read_counter_); @@ -2388,7 +2431,7 @@ Status HdfsParquetScanner::AssembleRows(RowBatch* row_batch, bool* skip_row_grou break; } } - rows_read_in_group_ += num_rows_read; + row_group_rows_read_ += num_rows_read; COUNTER_ADD(scan_node_->rows_read_counter(), num_rows_read); // Merge Scanner-local counter into HdfsScanNode counter and reset. COUNTER_ADD(scan_node_->collection_items_read_counter(), coll_items_read_counter_); @@ -2788,7 +2831,7 @@ Status HdfsParquetScanner::CreateColumnReaders(const TupleDescriptor& tuple_desc DCHECK(column_readers != nullptr); DCHECK(column_readers->empty()); - if (scan_node_->optimize_count_star()) { + if (scan_node_->optimize_parquet_count_star()) { // Column readers are not needed because we are not reading from any columns if this // optimization is enabled. return Status::OK(); @@ -2955,7 +2998,7 @@ Status HdfsParquetScanner::InitScalarColumns(int64_t row_group_first_row) { int64_t partition_id = context_->partition_descriptor()->id(); const HdfsFileDesc* file_desc = scan_node_->GetFileDesc(partition_id, filename()); DCHECK(file_desc != nullptr); - parquet::RowGroup& row_group = file_metadata_.row_groups[group_idx_]; + parquet::RowGroup& row_group = file_metadata_.row_groups[row_group_idx_]; // Used to validate that the number of values in each reader in column_readers_ at the // same SchemaElement is the same. @@ -2975,7 +3018,7 @@ Status HdfsParquetScanner::InitScalarColumns(int64_t row_group_first_row) { return Status(TErrorCode::PARQUET_NUM_COL_VALS_ERROR, scalar_reader->col_idx(), col_chunk.meta_data.num_values, num_values, filename()); } - RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, group_idx_, + RETURN_IF_ERROR(scalar_reader->Reset(*file_desc, col_chunk, row_group_idx_, row_group_first_row)); } diff --git a/be/src/exec/parquet/hdfs-parquet-scanner.h b/be/src/exec/parquet/hdfs-parquet-scanner.h index 5a0474b5a..5f5c038da 100644 --- a/be/src/exec/parquet/hdfs-parquet-scanner.h +++ b/be/src/exec/parquet/hdfs-parquet-scanner.h @@ -50,7 +50,7 @@ class ParquetColumnReader; class ParquetPageReader; template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> class ScalarColumnReader; - +class BoolColumnReader; /// This scanner parses Parquet files located in HDFS, and writes the content as tuples in /// the Impala in-memory representation of data, e.g. (tuples, rows, row batches). @@ -394,22 +394,30 @@ class HdfsParquetScanner : public HdfsColumnarScanner { "You can increase PARQUET_FOOTER_SIZE if you want, " "just don't forget to increase READ_SIZE_MIN_VALUE as well."); - protected: - virtual int64_t GetNumberOfRowsInFile() const override { - return file_metadata_.num_rows; - } - private: friend class ParquetColumnReader; friend class CollectionColumnReader; friend class BaseScalarColumnReader; template<typename InternalType, parquet::Type::type PARQUET_TYPE, bool MATERIALIZED> friend class ScalarColumnReader; + friend class BoolColumnReader; friend class HdfsParquetScannerTest; friend class ParquetPageIndex; friend class ParquetColumnChunkReader; friend class ParquetPageReader; + /// Index of the current row group being processed. Initialized to -1 which indicates + /// that we have not started processing the first row group yet (GetNext() has not yet + /// been called). + int32_t row_group_idx_; + + /// Counts the number of rows processed for the current row group. + int64_t row_group_rows_read_; + + /// Indicates whether we should advance to the next row group in the next GetNext(). + /// Starts out as true to move to the very first row group. + bool advance_row_group_; + boost::scoped_ptr<ParquetSchemaResolver> schema_resolver_; /// Tuple to hold values when reading parquet::Statistics. Owned by perm_pool_. @@ -492,6 +500,9 @@ class HdfsParquetScanner : public HdfsColumnarScanner { /// perm_pool_. std::unordered_map<const TupleDescriptor*, Tuple*> dict_filter_tuple_map_; + /// Timer for materializing rows. This ignores time getting the next buffer. + ScopedTimer<MonotonicStopWatch> assemble_rows_timer_; + /// Average and min/max time spent processing the page index for each row group. RuntimeProfile::SummaryStatsCounter* process_page_index_stats_; diff --git a/be/src/exec/parquet/parquet-column-readers.h b/be/src/exec/parquet/parquet-column-readers.h index d461c94e7..76e0825b3 100644 --- a/be/src/exec/parquet/parquet-column-readers.h +++ b/be/src/exec/parquet/parquet-column-readers.h @@ -600,7 +600,7 @@ class BaseScalarColumnReader : public ParquetColumnReader { int64_t LastRowIdxInCurrentPage() const { DCHECK(!candidate_data_pages_.empty()); int64_t num_rows = - parent_->file_metadata_.row_groups[parent_->group_idx_].num_rows; + parent_->file_metadata_.row_groups[parent_->row_group_idx_].num_rows; // Find the next valid page. int page_idx = candidate_data_pages_[candidate_page_idx_] + 1; while (page_idx < offset_index_.page_locations.size()) { diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 6e8e370be..6e5d8886c 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -323,8 +323,9 @@ struct THdfsScanNode { // The conjuncts that are eligible for dictionary filtering. 9: optional map<Types.TSlotId, list<i32>> dictionary_filter_conjuncts - // The byte offset of the slot for counter if count star optimization is enabled. - 10: optional i32 count_star_slot_offset + // The byte offset of the slot for Parquet metadata if Parquet count star optimization + // is enabled. + 10: optional i32 parquet_count_star_slot_offset // If true, the backend only needs to return one row per partition. 11: optional bool is_partition_key_scan diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java index 71eab659c..ea48a3bc9 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsScanNode.java @@ -89,7 +89,6 @@ import org.apache.impala.thrift.TScanRangeLocationList; import org.apache.impala.thrift.TScanRangeSpec; import org.apache.impala.thrift.TSortingOrder; import org.apache.impala.thrift.TTableStats; -import org.apache.impala.util.AcidUtils; import org.apache.impala.util.BitUtil; import org.apache.impala.util.ExecutorMembershipSnapshot; import org.apache.impala.util.MathUtil; @@ -335,8 +334,6 @@ public class HdfsScanNode extends ScanNode { // Used only to display EXPLAIN information. private final List<Expr> partitionConjuncts_; - private boolean isFullAcidTable_ = false; - /** * Construct a node to scan given data files into tuples described by 'desc', * with 'conjuncts' being the unevaluated conjuncts bound by the tuple and @@ -357,8 +354,6 @@ public class HdfsScanNode extends ScanNode { tableNumRowsHint_ = hdfsTblRef.getTableNumRowsHint(); FeFsTable hdfsTable = (FeFsTable)hdfsTblRef.getTable(); Preconditions.checkState(tbl_ == hdfsTable); - isFullAcidTable_ = - AcidUtils.isFullAcidTable(hdfsTable.getMetaStoreTable().getParameters()); StringBuilder error = new StringBuilder(); aggInfo_ = aggInfo; skipHeaderLineCount_ = tbl_.parseSkipHeaderLineCount(error); @@ -403,14 +398,13 @@ public class HdfsScanNode extends ScanNode { } /** - * Returns true if the count(*) optimization can be applied to the query block + * Returns true if the Parquet count(*) optimization can be applied to the query block * of this scan node. */ protected boolean canApplyCountStarOptimization(Analyzer analyzer, Set<HdfsFileFormat> fileFormats) { if (fileFormats.size() != 1) return false; - if (isFullAcidTable_) return false; - if (!hasParquet(fileFormats) && !hasOrc(fileFormats)) return false; + if (!hasParquet(fileFormats)) return false; return canApplyCountStarOptimization(analyzer); } @@ -1563,13 +1557,6 @@ public class HdfsScanNode extends ScanNode { numRangesAdjusted : Math.min(inputCardinality_, numRangesAdjusted); } - - if (countStarSlot_ != null) { - // We are doing optimized count star. Override cardinality with total num files. - long totalFiles = sumValues(totalFilesPerFs_); - inputCardinality_ = totalFiles; - cardinality_ = totalFiles; - } if (LOG.isTraceEnabled()) { LOG.trace("HdfsScan: cardinality_=" + Long.toString(cardinality_)); } @@ -1867,7 +1854,8 @@ public class HdfsScanNode extends ScanNode { msg.hdfs_scan_node.setUse_mt_scan_node(useMtScanNode_); Preconditions.checkState((optimizedAggSmap_ == null) == (countStarSlot_ == null)); if (countStarSlot_ != null) { - msg.hdfs_scan_node.setCount_star_slot_offset(countStarSlot_.getByteOffset()); + msg.hdfs_scan_node.setParquet_count_star_slot_offset( + countStarSlot_.getByteOffset()); } if (!statsConjuncts_.isEmpty()) { for (Expr e: statsConjuncts_) { diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java index a3b91eaf5..c60cefc3c 100644 --- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java +++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java @@ -1343,11 +1343,6 @@ public class PlannerTest extends PlannerTestBase { runPlannerTestFile("tpcds-dist-method", "tpcds"); } - @Test - public void testOrcStatsAgg() { - runPlannerTestFile("orc-stats-agg"); - } - /** * Test new hint of 'TABLE_NUM_ROWS' */ diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/orc-stats-agg.test b/testdata/workloads/functional-planner/queries/PlannerTest/orc-stats-agg.test deleted file mode 100644 index 00d4caca8..000000000 --- a/testdata/workloads/functional-planner/queries/PlannerTest/orc-stats-agg.test +++ /dev/null @@ -1,426 +0,0 @@ -# Verify that that the ORC count(*) optimization is applied in all count(*) or -# count(<literal>) cases when scanning a ORC table. In the last case, we are scanning -# a text table, so the optimization is not applied. The optimization is observed when -# the cardinality of the ORC scan (24) is the same as # the # of files (24). -select count(*) from functional_orc_def.uncomp_src_alltypes -union all -select count(1) from functional_orc_def.uncomp_src_alltypes -union all -select count(123) from functional_orc_def.uncomp_src_alltypes -union all -select count(*) from functional.alltypes ----- PLAN -PLAN-ROOT SINK -| -00:UNION -| pass-through-operands: all -| row-size=8B cardinality=4 -| -|--08:AGGREGATE [FINALIZE] -| | output: count(*) -| | row-size=8B cardinality=1 -| | -| 07:SCAN HDFS [functional.alltypes] -| HDFS partitions=24/24 files=24 size=478.45KB -| row-size=0B cardinality=7.30K -| -|--06:AGGREGATE [FINALIZE] -| | output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) -| | row-size=8B cardinality=1 -| | -| 05:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] -| HDFS partitions=24/24 files=24 size=205.47KB -| row-size=4B cardinality=24 -| -|--04:AGGREGATE [FINALIZE] -| | output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) -| | row-size=8B cardinality=1 -| | -| 03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] -| HDFS partitions=24/24 files=24 size=205.47KB -| row-size=4B cardinality=24 -| -02:AGGREGATE [FINALIZE] -| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) -| row-size=8B cardinality=1 -| -01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - HDFS partitions=24/24 files=24 size=205.47KB - row-size=4B cardinality=24 ----- DISTRIBUTEDPLAN -PLAN-ROOT SINK -| -00:UNION -| pass-through-operands: all -| row-size=8B cardinality=4 -| -|--16:AGGREGATE [FINALIZE] -| | output: count:merge(*) -| | row-size=8B cardinality=1 -| | -| 15:EXCHANGE [UNPARTITIONED] -| | -| 08:AGGREGATE -| | output: count(*) -| | row-size=8B cardinality=1 -| | -| 07:SCAN HDFS [functional.alltypes] -| HDFS partitions=24/24 files=24 size=478.45KB -| row-size=0B cardinality=7.30K -| -|--14:AGGREGATE [FINALIZE] -| | output: count:merge(*) -| | row-size=8B cardinality=1 -| | -| 13:EXCHANGE [UNPARTITIONED] -| | -| 06:AGGREGATE -| | output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) -| | row-size=8B cardinality=1 -| | -| 05:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] -| HDFS partitions=24/24 files=24 size=205.47KB -| row-size=4B cardinality=24 -| -|--12:AGGREGATE [FINALIZE] -| | output: count:merge(*) -| | row-size=8B cardinality=1 -| | -| 11:EXCHANGE [UNPARTITIONED] -| | -| 04:AGGREGATE -| | output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) -| | row-size=8B cardinality=1 -| | -| 03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] -| HDFS partitions=24/24 files=24 size=205.47KB -| row-size=4B cardinality=24 -| -10:AGGREGATE [FINALIZE] -| output: count:merge(*) -| row-size=8B cardinality=1 -| -09:EXCHANGE [UNPARTITIONED] -| -02:AGGREGATE -| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) -| row-size=8B cardinality=1 -| -01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - HDFS partitions=24/24 files=24 size=205.47KB - row-size=4B cardinality=24 -==== -# Verify that the ORC count(*) optimization is applied even if there is more than -# one item in the select list. -select count(*), count(1), count(123) from functional_orc_def.uncomp_src_alltypes ----- PLAN -PLAN-ROOT SINK -| -01:AGGREGATE [FINALIZE] -| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) -| row-size=8B cardinality=1 -| -00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - HDFS partitions=24/24 files=24 size=205.47KB - row-size=4B cardinality=24 -==== -# Select count(<partition col>) - the optimization is disabled because it's not a -# count(<literal>) or count(*) aggregate function. -select count(year) from functional_orc_def.uncomp_src_alltypes ----- PLAN -PLAN-ROOT SINK -| -01:AGGREGATE [FINALIZE] -| output: count(`year`) -| row-size=8B cardinality=1 -| -00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - HDFS partitions=24/24 files=24 size=205.47KB - row-size=4B cardinality=13.07K -==== -# Group by partition columns. -select month, count(*) from functional_orc_def.uncomp_src_alltypes group by month, year ----- PLAN -PLAN-ROOT SINK -| -01:AGGREGATE [FINALIZE] -| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) -| group by: `month`, `year` -| row-size=16B cardinality=24 -| -00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - HDFS partitions=24/24 files=24 size=205.47KB - row-size=16B cardinality=24 -==== -# The optimization is disabled because tinyint_col is not a partition col. -select tinyint_col, count(*) from functional_orc_def.uncomp_src_alltypes group by tinyint_col, year ----- PLAN -PLAN-ROOT SINK -| -01:AGGREGATE [FINALIZE] -| output: count(*) -| group by: tinyint_col, `year` -| row-size=13B cardinality=13.07K -| -00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - HDFS partitions=24/24 files=24 size=205.47KB - row-size=5B cardinality=13.07K -==== -# The optimization is disabled because it can not be applied to the 1st aggregate -# function. -select avg(year), count(*) from functional_orc_def.uncomp_src_alltypes ----- PLAN -PLAN-ROOT SINK -| -01:AGGREGATE [FINALIZE] -| output: avg(`year`), count(*) -| row-size=16B cardinality=1 -| -00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - HDFS partitions=24/24 files=24 size=205.47KB - row-size=4B cardinality=13.07K -==== -# Optimization is not applied because the inner count(*) is not materialized. The outer -# count(*) does not reference a base table. -select count(*) from (select count(*) from functional_orc_def.uncomp_src_alltypes) t ----- PLAN -PLAN-ROOT SINK -| -02:AGGREGATE [FINALIZE] -| output: count(*) -| row-size=8B cardinality=1 -| -01:AGGREGATE [FINALIZE] -| row-size=0B cardinality=1 -| -00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - HDFS partitions=24/24 files=24 size=205.47KB - partition key scan - row-size=0B cardinality=24 -==== -# The optimization is applied if count(*) is in the having clause. -select 1 from functional_orc_def.uncomp_src_alltypes having count(*) > 1 ----- PLAN -PLAN-ROOT SINK -| -01:AGGREGATE [FINALIZE] -| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) -| having: count(*) > 1 -| row-size=8B cardinality=0 -| -00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - HDFS partitions=24/24 files=24 size=205.47KB - row-size=4B cardinality=24 -==== -# The count(*) optimization is applied in the inline view. -select count(*), count(a) from (select count(1) as a from functional_orc_def.uncomp_src_alltypes) t ----- PLAN -PLAN-ROOT SINK -| -02:AGGREGATE [FINALIZE] -| output: count(*), count(count(*)) -| row-size=16B cardinality=1 -| -01:AGGREGATE [FINALIZE] -| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) -| row-size=8B cardinality=1 -| -00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - HDFS partitions=24/24 files=24 size=205.47KB - row-size=4B cardinality=24 -==== -# The count(*) optimization is applied to the inline view even if there is a join. -select * -from functional.alltypes x inner join ( - select count(1) as a from functional_orc_def.uncomp_src_alltypes group by year -) t on x.id = t.a; ----- PLAN -PLAN-ROOT SINK -| -03:HASH JOIN [INNER JOIN] -| hash predicates: x.id = count(*) -| runtime filters: RF000 <- count(*) -| row-size=101B cardinality=2 -| -|--02:AGGREGATE [FINALIZE] -| | output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) -| | group by: `year` -| | row-size=12B cardinality=2 -| | -| 01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] -| HDFS partitions=24/24 files=24 size=205.47KB -| row-size=12B cardinality=24 -| -00:SCAN HDFS [functional.alltypes x] - HDFS partitions=24/24 files=24 size=478.45KB - runtime filters: RF000 -> x.id - row-size=89B cardinality=7.30K -==== -# The count(*) optimization is not applied if there is more than 1 table ref. -select count(*) from functional_orc_def.uncomp_src_alltypes a, functional_orc_def.uncomp_src_alltypes b ----- PLAN -PLAN-ROOT SINK -| -03:AGGREGATE [FINALIZE] -| output: count(*) -| row-size=8B cardinality=1 -| -02:NESTED LOOP JOIN [CROSS JOIN] -| row-size=0B cardinality=170.85M -| -|--01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes b] -| HDFS partitions=24/24 files=24 size=205.47KB -| row-size=0B cardinality=13.07K -| -00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes a] - HDFS partitions=24/24 files=24 size=205.47KB - row-size=0B cardinality=13.07K -==== -# The count(*) optimization is applied if all predicates are on partition columns only. -select count(1) from functional_orc_def.uncomp_src_alltypes where year < 2010 and month > 8; ----- PLAN -PLAN-ROOT SINK -| -01:AGGREGATE [FINALIZE] -| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) -| row-size=8B cardinality=1 -| -00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - partition predicates: `year` < 2010, `month` > 8 - HDFS partitions=4/24 files=4 size=33.53KB - row-size=8B cardinality=4 -==== -# tinyint_col is not a partition column so the optimization is disabled. -select count(1) from functional_orc_def.uncomp_src_alltypes where year < 2010 and tinyint_col > 8; ----- PLAN -PLAN-ROOT SINK -| -01:AGGREGATE [FINALIZE] -| output: count(*) -| row-size=8B cardinality=1 -| -00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - partition predicates: `year` < 2010 - HDFS partitions=12/24 files=12 size=102.74KB - predicates: tinyint_col > 8 - row-size=1B cardinality=654 -==== -# Optimization is applied after constant folding. -select count(1 + 2 + 3) from functional_orc_def.uncomp_src_alltypes ----- PLAN -PLAN-ROOT SINK -| -01:AGGREGATE [FINALIZE] -| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) -| row-size=8B cardinality=1 -| -00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - HDFS partitions=24/24 files=24 size=205.47KB - row-size=4B cardinality=24 -==== -# Optimization is not applied to count(null). -select count(1 + null + 3) from functional_orc_def.uncomp_src_alltypes -union all -select count(null) from functional_orc_def.uncomp_src_alltypes ----- PLAN -PLAN-ROOT SINK -| -00:UNION -| pass-through-operands: all -| row-size=8B cardinality=2 -| -|--04:AGGREGATE [FINALIZE] -| | output: count(NULL) -| | row-size=8B cardinality=1 -| | -| 03:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] -| HDFS partitions=24/24 files=24 size=205.47KB -| row-size=0B cardinality=13.07K -| -02:AGGREGATE [FINALIZE] -| output: count(NULL + 3) -| row-size=8B cardinality=1 -| -01:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - HDFS partitions=24/24 files=24 size=205.47KB - row-size=0B cardinality=13.07K -==== -# Optimization is not applied when selecting from an empty table. -select count(*) from functional_orc_def.emptytable ----- PLAN -PLAN-ROOT SINK -| -01:AGGREGATE [FINALIZE] -| output: count(*) -| row-size=8B cardinality=0 -| -00:SCAN HDFS [functional_orc_def.emptytable] - partitions=0/0 files=0 size=0B - row-size=0B cardinality=0 -==== -# Optimization is not applied when all partitions are pruned. -select count(1) from functional_orc_def.uncomp_src_alltypes where year = -1 ----- PLAN -PLAN-ROOT SINK -| -01:AGGREGATE [FINALIZE] -| output: count(*) -| row-size=8B cardinality=0 -| -00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - partition predicates: `year` = -1 - partitions=0/24 files=0 size=0B - row-size=0B cardinality=0 -==== -# Optimization is not applied across query blocks, even though it would be correct here. -select count(*) from (select int_col from functional_orc_def.uncomp_src_alltypes) t ----- PLAN -PLAN-ROOT SINK -| -01:AGGREGATE [FINALIZE] -| output: count(*) -| row-size=8B cardinality=1 -| -00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - HDFS partitions=24/24 files=24 size=205.47KB - row-size=0B cardinality=13.07K -==== -# In general, optimization is not applied when there is a distinct agg. -select count(*), count(distinct 1) from functional_orc_def.uncomp_src_alltypes ----- PLAN -PLAN-ROOT SINK -| -02:AGGREGATE [FINALIZE] -| output: count(1), count:merge(*) -| row-size=16B cardinality=1 -| -01:AGGREGATE -| output: count(*) -| group by: 1 -| row-size=9B cardinality=1 -| -00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - HDFS partitions=24/24 files=24 size=205.47KB - row-size=0B cardinality=13.07K -==== -# The optimization is applied here because only the count(*) and a partition column are -# materialized. Non-materialized agg exprs are ignored. -select year, cnt from ( - select year, count(bigint_col), count(*) cnt, avg(int_col) - from functional_orc_def.uncomp_src_alltypes - where month=1 - group by year -) t ----- PLAN -PLAN-ROOT SINK -| -01:AGGREGATE [FINALIZE] -| output: sum_init_zero(functional_orc_def.uncomp_src_alltypes.stats: num_rows) -| group by: `year` -| row-size=12B cardinality=2 -| -00:SCAN HDFS [functional_orc_def.uncomp_src_alltypes] - partition predicates: `month` = 1 - HDFS partitions=2/24 files=2 size=17.07KB - row-size=12B cardinality=2 -==== diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test index 7ec11f482..41a10602f 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-stats-agg.test @@ -1,7 +1,6 @@ # Verify that that the parquet count(*) optimization is applied in all count(*) or # count(<literal>) cases when scanning a Parquet table. In the last case, we are scanning -# a text table, so the optimization is not applied. The optimization is observed when -# the cardinality of the Parquet scan (24) is the same as # the # of files (24). +# a text table, so the optimization is not applied. select count(*) from functional_parquet.alltypes union all select count(1) from functional_parquet.alltypes @@ -30,7 +29,7 @@ PLAN-ROOT SINK | | | 05:SCAN HDFS [functional_parquet.alltypes] | HDFS partitions=24/24 files=24 size=200.45KB -| row-size=8B cardinality=24 +| row-size=8B cardinality=12.75K | |--04:AGGREGATE [FINALIZE] | | output: sum_init_zero(functional_parquet.alltypes.stats: num_rows) @@ -38,7 +37,7 @@ PLAN-ROOT SINK | | | 03:SCAN HDFS [functional_parquet.alltypes] | HDFS partitions=24/24 files=24 size=200.45KB -| row-size=8B cardinality=24 +| row-size=8B cardinality=12.75K | 02:AGGREGATE [FINALIZE] | output: sum_init_zero(functional_parquet.alltypes.stats: num_rows) @@ -46,7 +45,7 @@ PLAN-ROOT SINK | 01:SCAN HDFS [functional_parquet.alltypes] HDFS partitions=24/24 files=24 size=200.45KB - row-size=8B cardinality=24 + row-size=8B cardinality=12.75K ---- DISTRIBUTEDPLAN PLAN-ROOT SINK | @@ -80,7 +79,7 @@ PLAN-ROOT SINK | | | 05:SCAN HDFS [functional_parquet.alltypes] | HDFS partitions=24/24 files=24 size=200.45KB -| row-size=8B cardinality=24 +| row-size=8B cardinality=12.75K | |--12:AGGREGATE [FINALIZE] | | output: count:merge(*) @@ -94,7 +93,7 @@ PLAN-ROOT SINK | | | 03:SCAN HDFS [functional_parquet.alltypes] | HDFS partitions=24/24 files=24 size=200.45KB -| row-size=8B cardinality=24 +| row-size=8B cardinality=12.75K | 10:AGGREGATE [FINALIZE] | output: count:merge(*) @@ -108,7 +107,7 @@ PLAN-ROOT SINK | 01:SCAN HDFS [functional_parquet.alltypes] HDFS partitions=24/24 files=24 size=200.45KB - row-size=8B cardinality=24 + row-size=8B cardinality=12.75K ==== # Verify that the parquet count(*) optimization is applied even if there is more than # one item in the select list. @@ -122,7 +121,7 @@ PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.alltypes] HDFS partitions=24/24 files=24 size=200.45KB - row-size=8B cardinality=24 + row-size=8B cardinality=12.75K ==== # Select count(<partition col>) - the optimization should be disabled because it's not a # count(<literal>) or count(*) aggregate function. @@ -150,7 +149,7 @@ PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.alltypes] HDFS partitions=24/24 files=24 size=200.45KB - row-size=16B cardinality=24 + row-size=16B cardinality=12.75K ==== # The optimization is disabled because tinyint_col is not a partition col. select tinyint_col, count(*) from functional_parquet.alltypes group by tinyint_col, year @@ -209,7 +208,7 @@ PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.alltypes] HDFS partitions=24/24 files=24 size=200.45KB - row-size=8B cardinality=24 + row-size=8B cardinality=12.75K ==== # The count(*) optimization is applied in the inline view. select count(*), count(a) from (select count(1) as a from functional_parquet.alltypes) t @@ -226,7 +225,7 @@ PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.alltypes] HDFS partitions=24/24 files=24 size=200.45KB - row-size=8B cardinality=24 + row-size=8B cardinality=12.75K ==== # The count(*) optimization is applied to the inline view even if there is a join. select * @@ -248,7 +247,7 @@ PLAN-ROOT SINK | | | 01:SCAN HDFS [functional_parquet.alltypes] | HDFS partitions=24/24 files=24 size=200.45KB -| row-size=12B cardinality=24 +| row-size=12B cardinality=12.75K | 00:SCAN HDFS [functional.alltypes x] HDFS partitions=24/24 files=24 size=478.45KB @@ -287,7 +286,7 @@ PLAN-ROOT SINK 00:SCAN HDFS [functional_parquet.alltypes] partition predicates: `year` < 2010, `month` > 8 HDFS partitions=4/24 files=4 size=33.53KB - row-size=8B cardinality=4 + row-size=8B cardinality=2.13K ==== # tinyint_col is not a partition column so the optimization is disabled. select count(1) from functional_parquet.alltypes where year < 2010 and tinyint_col > 8; @@ -315,7 +314,7 @@ PLAN-ROOT SINK | 00:SCAN HDFS [functional_parquet.alltypes] HDFS partitions=24/24 files=24 size=200.45KB - row-size=8B cardinality=24 + row-size=8B cardinality=12.75K ==== # Optimization is not applied to count(null). select count(1 + null + 3) from functional_parquet.alltypes @@ -421,5 +420,5 @@ PLAN-ROOT SINK 00:SCAN HDFS [functional_parquet.alltypes] partition predicates: `month` = 1 HDFS partitions=2/24 files=2 size=17.07KB - row-size=12B cardinality=2 + row-size=12B cardinality=1.09K ==== diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test index cb58de723..b7abe2566 100644 --- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test +++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test @@ -1869,7 +1869,6 @@ select count(*) from tpch_parquet.lineitem ---- PLAN Max Per-Host Resource Reservation: Memory=128.00KB Threads=2 Per-Host Resource Estimates: Memory=10MB -Codegen disabled by planner Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 @@ -1891,12 +1890,11 @@ PLAN-ROOT SINK columns: all extrapolated-rows=disabled max-scan-range-rows=2.14M mem-estimate=1.00MB mem-reservation=128.00KB thread-reservation=1 - tuple-ids=0 row-size=8B cardinality=3 + tuple-ids=0 row-size=8B cardinality=6.00M in pipelines: 00(GETNEXT) ---- DISTRIBUTEDPLAN Max Per-Host Resource Reservation: Memory=128.00KB Threads=3 Per-Host Resource Estimates: Memory=10MB -Codegen disabled by planner Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 @@ -1931,12 +1929,11 @@ Per-Host Resources: mem-estimate=1.02MB mem-reservation=128.00KB thread-reservat columns: all extrapolated-rows=disabled max-scan-range-rows=2.14M mem-estimate=1.00MB mem-reservation=128.00KB thread-reservation=1 - tuple-ids=0 row-size=8B cardinality=3 + tuple-ids=0 row-size=8B cardinality=6.00M in pipelines: 00(GETNEXT) ---- PARALLELPLANS Max Per-Host Resource Reservation: Memory=128.00KB Threads=2 Per-Host Resource Estimates: Memory=80MB -Codegen disabled by planner Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1 @@ -1971,7 +1968,7 @@ Per-Instance Resources: mem-estimate=80.02MB mem-reservation=128.00KB thread-res columns: all extrapolated-rows=disabled max-scan-range-rows=2.14M mem-estimate=80.00MB mem-reservation=128.00KB thread-reservation=0 - tuple-ids=0 row-size=8B cardinality=3 + tuple-ids=0 row-size=8B cardinality=6.00M in pipelines: 00(GETNEXT) ==== # Sort diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-compound-predicate-push-down.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-compound-predicate-push-down.test index e5ef490f8..c7feddf64 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-compound-predicate-push-down.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-compound-predicate-push-down.test @@ -39,7 +39,6 @@ select count(1) from ice_compound_pred_pd; BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY show files in ice_compound_pred_pd; @@ -424,7 +423,6 @@ select count(1) from ice_compound_pred_pd1; BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY show files in ice_compound_pred_pd1; diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test index fcec4cb69..9aadc2711 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-in-predicate-push-down.test @@ -33,7 +33,6 @@ select count(1) from ice_pred_pd1; BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY show files in ice_pred_pd1; @@ -63,6 +62,7 @@ aggregation(SUM, NumRowGroups): 3 ---- QUERY # Filtering only on a partition column is done by Iceberg and in Impala we can get the results # simply using file metadata. +# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 3. select count(1) from @@ -72,8 +72,7 @@ where ---- RESULTS 9 ---- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 3 +aggregation(SUM, NumRowGroups): 3 ==== ---- QUERY # The IN predicate matches two row groups @@ -447,6 +446,7 @@ aggregation(SUM, NumRowGroups): 1 ==== ---- QUERY # NOT_IN could be answered using file metadata if only partition cols are included +# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 1. select count(1) from @@ -456,8 +456,7 @@ where ---- RESULTS 3 ---- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 1 +aggregation(SUM, NumRowGroups): 1 ==== ---- QUERY # NOT_IN does not work because col_dt is not the partition column @@ -548,7 +547,6 @@ select count(1) from ice_pred_pd2; BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY show files in ice_pred_pd2; diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-is-null-predicate-push-down.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-is-null-predicate-push-down.test index 6b7ba819d..e6207e791 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-is-null-predicate-push-down.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-is-null-predicate-push-down.test @@ -35,7 +35,6 @@ select count(1) from ice_is_null_pred_pd; BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY show files in ice_is_null_pred_pd; diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test index a1b955a94..9ce8545cb 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-partitioned-insert.test @@ -170,18 +170,17 @@ select count(*) from ice_bigints; BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY # When filtered only by partition column Iceberg can do the filtering and no need to read data in Impala. +# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 1. select count(*) from ice_bigints where i = 0 and j = 0; ---- RESULTS 1217 ---- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 +aggregation(SUM, NumRowGroups): 1 aggregation(SUM, RowsRead): 0 -aggregation(SUM, NumFileMetadataRead): 1 ==== ---- QUERY # When not just partition columns are involved in the filtering then Impala has to read data to answer the query. @@ -281,6 +280,7 @@ select count(*) from alltypes_part; Expression 'timestamp_col' (type: TIMESTAMP) would need to be cast to STRING for column 'string_col' ==== ---- QUERY +# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 4. select count(*) from alltypes_part where bool_col = true; ---- RESULTS @@ -288,10 +288,10 @@ where bool_col = true; ---- TYPES BIGINT ---- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 4 +aggregation(SUM, NumRowGroups): 4 ==== ---- QUERY +# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 4. select count(*) from alltypes_part where float_col = 0; ---- RESULTS @@ -299,10 +299,10 @@ where float_col = 0; ---- TYPES BIGINT ---- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 4 +aggregation(SUM, NumRowGroups): 4 ==== ---- QUERY +# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 4. select count(*) from alltypes_part where double_col = 0; ---- RESULTS @@ -310,10 +310,10 @@ where double_col = 0; ---- TYPES BIGINT ---- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 4 +aggregation(SUM, NumRowGroups): 4 ==== ---- QUERY +# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 2. select count(*) from alltypes_part where date_col = '2009-01-01'; ---- RESULTS @@ -321,10 +321,10 @@ where date_col = '2009-01-01'; ---- TYPES BIGINT ---- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 2 +aggregation(SUM, NumRowGroups): 2 ==== ---- QUERY +# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 4. select count(*) from alltypes_part where string_col = '0'; ---- RESULTS @@ -332,8 +332,7 @@ where string_col = '0'; ---- TYPES BIGINT ---- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 4 +aggregation(SUM, NumRowGroups): 4 ==== ---- QUERY # 'timestamp_col' is not a partitioning column, so min/max stats will not be used to diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test index da56563eb..9310878c5 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-plain-count-star-optimization.test @@ -13,7 +13,6 @@ select count(*) from ice_tbl; 0 ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY insert into @@ -34,7 +33,6 @@ select count(*) from ice_tbl; 3 ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY create table ice_tbl_u1 stored as iceberg as select * from ice_tbl; @@ -47,7 +45,6 @@ select count(*) from ice_tbl_u1; 3 ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY insert into @@ -69,7 +66,6 @@ select count(*) from ice_tbl; 6 ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY create table ice_tbl_u2 stored as iceberg as select * from ice_tbl; @@ -82,7 +78,6 @@ select count(*) from ice_tbl_u2; 6 ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY insert into @@ -105,7 +100,6 @@ select count(*) from ice_tbl; 8 ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY select count(*) from ice_tbl for system_time as of now(); @@ -113,7 +107,6 @@ select count(*) from ice_tbl for system_time as of now(); 8 ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY set explain_level=3; @@ -145,6 +138,7 @@ explain select 123, count(*), 321 from ice_tbl; ==== ---- QUERY # Filtering by a partition column results in Iceberg doing the filtering instead of Impala. +# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 2. select count(*) from @@ -154,8 +148,7 @@ where ---- RESULTS 4 ---- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 2 +aggregation(SUM, NumRowGroups): 2 ==== ---- QUERY select @@ -167,7 +160,6 @@ having ---- RESULTS ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 4 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY select @@ -181,7 +173,6 @@ group by 4 ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 4 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY select @@ -192,7 +183,6 @@ from 6 ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 4 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY truncate ice_tbl; @@ -205,7 +195,6 @@ select count(*) from ice_tbl; 0 ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY create table parq_tbl(col_i INT, col_s STRING) PARTITIONED BY(x INT) STORED AS PARQUET; @@ -213,6 +202,7 @@ create table parq_tbl(col_i INT, col_s STRING) PARTITIONED BY(x INT) STORED AS P 'Table has been created.' ==== ---- QUERY +# IMPALA-11123: Behavior changes after a revert: NumRowGroups changed from 0 to 3. insert into parq_tbl PARTITION(x = 12340) values (0, "a"); insert into parq_tbl PARTITION(x = 12341) values (1, "b"); insert into parq_tbl PARTITION(x = 12342) values (2, "c"); @@ -220,8 +210,7 @@ select count(*) from parq_tbl; ---- RESULTS 3 ---- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 3 +aggregation(SUM, NumRowGroups): 3 ==== ---- QUERY select count(*) as c from ice_tbl_u1 union all (select count(*) c from ice_tbl_u2) order by c; @@ -232,7 +221,6 @@ select count(*) as c from ice_tbl_u1 union all (select count(*) c from ice_tbl_u BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY with u1 as (select count(*) from ice_tbl_u1), u2 as (select count(*) from ice_tbl_u2) select * from u1, u2; @@ -242,7 +230,6 @@ with u1 as (select count(*) from ice_tbl_u1), u2 as (select count(*) from ice_tb BIGINT,BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY with u1 as (select count(*) from ice_tbl_u1), @@ -254,5 +241,4 @@ u2 as (select count(*) from ice_tbl_u1 union all (select count(*) from ice_tbl_u BIGINT,BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 -==== \ No newline at end of file +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test index c16399812..a1728253a 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-upper-lower-bound-metrics.test @@ -20,7 +20,6 @@ select count(*) from ice_types1; BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY show files in ice_types1; @@ -252,7 +251,6 @@ select count(*) from ice_types2; BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY show files in ice_types2; @@ -370,7 +368,6 @@ select count(*) from ice_types3; BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY show files in ice_types3; diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-plain-count-star-optimization.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-plain-count-star-optimization.test index 21ac4a1fe..0edf877d4 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-plain-count-star-optimization.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-plain-count-star-optimization.test @@ -20,7 +20,6 @@ BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 4 aggregation(SUM, NumOrcStripes): 4 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY with u1 as (select count(*) as c from functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files), @@ -35,7 +34,6 @@ BIGINT,BIGINT,TINYINT,BIGINT,BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 4 aggregation(SUM, NumOrcStripes): 4 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY select count(*) as c from iceberg_v2_positional_not_all_data_files_have_delete_files for system_version as of 752781918366351945 @@ -58,7 +56,6 @@ BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 2 aggregation(SUM, NumOrcStripes): 2 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY with u1 as (select count(*) as c from functional_parquet.iceberg_v2_positional_not_all_data_files_have_delete_files for system_version as of 752781918366351945), @@ -73,5 +70,4 @@ BIGINT,BIGINT,TINYINT,BIGINT,BIGINT ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 2 aggregation(SUM, NumOrcStripes): 2 -aggregation(SUM, NumFileMetadataRead): 0 -==== \ No newline at end of file +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test index a2af39918..5cc292ee3 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes-orc.test @@ -39,7 +39,6 @@ SELECT count(*) from iceberg_v2_no_deletes_orc bigint ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY COMPUTE STATS iceberg_v2_positional_delete_all_rows_orc @@ -81,7 +80,6 @@ SELECT count(*) from iceberg_v2_positional_delete_all_rows_orc for system_versio bigint ---- RUNTIME_PROFILE aggregation(SUM, NumOrcStripes): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY SELECT count(*) from iceberg_v2_positional_delete_all_rows_orc @@ -91,7 +89,6 @@ SELECT count(*) from iceberg_v2_positional_delete_all_rows_orc bigint ---- RUNTIME_PROFILE aggregation(SUM, NumOrcStripes): 2 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY COMPUTE STATS iceberg_v2_positional_not_all_data_files_have_delete_files_orc @@ -145,7 +142,6 @@ SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files_ bigint ---- RUNTIME_PROFILE aggregation(SUM, NumOrcStripes): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files_orc for system_version as of 5003445199566617082 @@ -155,7 +151,6 @@ SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files_ bigint ---- RUNTIME_PROFILE aggregation(SUM, NumOrcStripes): 2 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files_orc @@ -165,7 +160,6 @@ SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files_ bigint ---- RUNTIME_PROFILE aggregation(SUM, NumOrcStripes): 4 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY COMPUTE STATS iceberg_v2_partitioned_position_deletes_orc @@ -207,7 +201,6 @@ SELECT count(*) from iceberg_v2_partitioned_position_deletes_orc for system_vers bigint ---- RUNTIME_PROFILE aggregation(SUM, NumOrcStripes): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY SELECT count(*) from iceberg_v2_partitioned_position_deletes_orc @@ -217,7 +210,6 @@ SELECT count(*) from iceberg_v2_partitioned_position_deletes_orc bigint ---- RUNTIME_PROFILE aggregation(SUM, NumOrcStripes): 6 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY SELECT count(*) from iceberg_v2_no_deletes_orc where i = 2; diff --git a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test index e9c7b985f..0b9675a5f 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test +++ b/testdata/workloads/functional-query/queries/QueryTest/iceberg-v2-read-position-deletes.test @@ -39,7 +39,6 @@ SELECT count(*) from iceberg_v2_no_deletes bigint ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY COMPUTE STATS iceberg_v2_delete_positional @@ -81,7 +80,6 @@ SELECT count(*) from iceberg_v2_delete_positional for system_version as of 68169 bigint ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY SELECT count(*) from iceberg_v2_delete_positional; @@ -91,7 +89,6 @@ SELECT count(*) from iceberg_v2_delete_positional; bigint ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 2 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY COMPUTE STATS iceberg_v2_positional_delete_all_rows @@ -133,7 +130,6 @@ SELECT count(*) from iceberg_v2_positional_delete_all_rows for system_version as bigint ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY SELECT count(*) from iceberg_v2_positional_delete_all_rows @@ -143,7 +139,6 @@ SELECT count(*) from iceberg_v2_positional_delete_all_rows bigint ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 2 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY COMPUTE STATS iceberg_v2_positional_not_all_data_files_have_delete_files @@ -197,7 +192,6 @@ SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files bigint ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files for system_version as of 752781918366351945 @@ -207,7 +201,6 @@ SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files bigint ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 2 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files @@ -217,7 +210,6 @@ SELECT count(*) from iceberg_v2_positional_not_all_data_files_have_delete_files bigint ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 4 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY COMPUTE STATS iceberg_v2_positional_update_all_rows @@ -268,7 +260,6 @@ SELECT count(*) from iceberg_v2_positional_update_all_rows for system_version as bigint ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 2 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY SELECT count(*) from iceberg_v2_positional_update_all_rows @@ -278,7 +269,6 @@ SELECT count(*) from iceberg_v2_positional_update_all_rows bigint ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 2 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY COMPUTE STATS iceberg_v2_partitioned_position_deletes @@ -320,7 +310,6 @@ SELECT count(*) from iceberg_v2_partitioned_position_deletes for system_version bigint ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY SELECT count(*) from iceberg_v2_partitioned_position_deletes @@ -330,7 +319,6 @@ SELECT count(*) from iceberg_v2_partitioned_position_deletes bigint ---- RUNTIME_PROFILE aggregation(SUM, NumRowGroups): 6 -aggregation(SUM, NumFileMetadataRead): 0 ==== ---- QUERY SELECT count(*) from iceberg_v2_no_deletes where i = 2; diff --git a/testdata/workloads/functional-query/queries/QueryTest/mixed-format.test b/testdata/workloads/functional-query/queries/QueryTest/mixed-format.test index f4b7730a4..2d5bf9e80 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/mixed-format.test +++ b/testdata/workloads/functional-query/queries/QueryTest/mixed-format.test @@ -25,17 +25,16 @@ bigint, bigint 280,1260 ==== ---- QUERY -# IMPALA-11123: IMPALA-5861 add this test to verify that 'RowRead' counter is not double -# counted for zero slot scan. IMPALA-11123 remove incerement of 'RowRead' counter -# in case of optimized count(star) and zero slot scan query. This cause reduction of -# 'RowsRead' value from 1200 to 900 since the other 300 are served through -# zero slot scan. We do not verify 'NumFileMetadataRead' since it does not stay the same -# over different test vector permutation. +# IMPALA-5861: RowsRead counter should be accurate for table scan that returns +# zero slots. This test is run with various batch_size values, which helps +# reproduce the bug. Scanning multiple file formats triggers the bug because +# the Parquet count(*) rewrite is disabled when non-Parquet file formats are +# present. select count(*) from functional.alltypesmixedformat ---- TYPES bigint ---- RESULTS 1200 ---- RUNTIME_PROFILE -aggregation(SUM, RowsRead): 900 +aggregation(SUM, RowsRead): 1200 ==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/orc-stats-agg.test b/testdata/workloads/functional-query/queries/QueryTest/orc-stats-agg.test deleted file mode 100644 index 4c9bdfb50..000000000 --- a/testdata/workloads/functional-query/queries/QueryTest/orc-stats-agg.test +++ /dev/null @@ -1,152 +0,0 @@ -==== ----- QUERY -# Tests the correctness of the ORC count(*) optimization. -select count(1) -from functional_orc_def.uncomp_src_alltypes ----- RESULTS -7300 ----- TYPES -bigint ----- RUNTIME_PROFILE -aggregation(SUM, NumOrcStripes): 0 -aggregation(SUM, NumFileMetadataRead): 24 -aggregation(SUM, RowsRead): 0 -===== ----- QUERY -# Tests the correctness of zero slot scan over ORC. -# Does not verify 'NumFileMetadataRead' here since codegen vs non-codegen yield -# different number. -select 1 from functional_orc_def.alltypestiny ----- RESULTS -1 -1 -1 -1 -1 -1 -1 -1 ----- TYPES -tinyint ----- RUNTIME_PROFILE -aggregation(SUM, NumOrcStripes): 0 -aggregation(SUM, RowsRead): 0 -===== ----- QUERY -# ORC count(*) optimization with predicates on the partition columns. -select count(1) -from functional_orc_def.uncomp_src_alltypes where year < 2010 and month > 8 ----- RESULTS -1220 ----- TYPES -bigint ----- RUNTIME_PROFILE -aggregation(SUM, NumOrcStripes): 0 -aggregation(SUM, NumFileMetadataRead): 4 -aggregation(SUM, RowsRead): 0 -===== ----- QUERY -# ORC count(*) optimization with group by partition columns. -select year, month, count(1) -from functional_orc_def.uncomp_src_alltypes group by year, month ----- RESULTS -2009,1,310 -2009,2,280 -2009,3,310 -2009,4,300 -2009,5,310 -2009,6,300 -2009,7,310 -2009,8,310 -2009,9,300 -2009,10,310 -2009,11,300 -2009,12,310 -2010,1,310 -2010,2,280 -2010,3,310 -2010,4,300 -2010,5,310 -2010,6,300 -2010,7,310 -2010,8,310 -2010,9,300 -2010,10,310 -2010,11,300 -2010,12,310 ----- TYPES -int, int, bigint ----- RUNTIME_PROFILE -aggregation(SUM, NumOrcStripes): 0 -aggregation(SUM, NumFileMetadataRead): 24 -aggregation(SUM, RowsRead): 0 -===== ----- QUERY -# ORC count(*) optimization with both group by and predicates on partition columns. -select count(1) -from functional_orc_def.uncomp_src_alltypes where year < 2010 and month > 8 -group by month ----- RESULTS -310 -300 -310 -300 ----- TYPES -bigint ----- RUNTIME_PROFILE -aggregation(SUM, NumOrcStripes): 0 -aggregation(SUM, NumFileMetadataRead): 4 -aggregation(SUM, RowsRead): 0 -===== ----- QUERY -# ORC count(*) optimization with the result going into a join. -select x.bigint_col from functional_orc_def.uncomp_src_alltypes x - inner join ( - select count(1) as a from functional_orc_def.uncomp_src_alltypes group by year - ) t on x.id = t.a; ----- RESULTS -0 -0 ----- TYPES -bigint ----- RUNTIME_PROFILE -aggregation(SUM, NumOrcStripes): 24 -aggregation(SUM, NumFileMetadataRead): 24 -aggregation(SUM, RowsRead): 7300 -===== ----- QUERY -# ORC count(*) optimization with the agg function in the having clause. -select 1 from functional_orc_def.uncomp_src_alltypes having count(*) > 1 ----- RESULTS -1 ----- TYPES -tinyint ----- RUNTIME_PROFILE -aggregation(SUM, NumOrcStripes): 0 -aggregation(SUM, NumFileMetadataRead): 24 -aggregation(SUM, RowsRead): 0 -==== ----- QUERY -# Verify that 0 is returned for count(*) on an empty table. -select count(1) from functional_orc_def.emptytable ----- RESULTS -0 ----- TYPES -bigint ----- RUNTIME_PROFILE -aggregation(SUM, NumOrcStripes): 0 -aggregation(SUM, NumFileMetadataRead): 0 -aggregation(SUM, RowsRead): 0 -===== ----- QUERY -# Verify that 0 is returned when all partitions are pruned. -select count(1) from functional_orc_def.uncomp_src_alltypes where year = -1 ----- RESULTS -0 ----- TYPES -bigint ----- RUNTIME_PROFILE -aggregation(SUM, NumOrcStripes): 0 -aggregation(SUM, NumFileMetadataRead): 0 -aggregation(SUM, RowsRead): 0 -===== diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test b/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test index 43959d7b5..620c50bef 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test +++ b/testdata/workloads/functional-query/queries/QueryTest/parquet-stats-agg.test @@ -7,30 +7,6 @@ from functional_parquet.alltypes 7300 ---- TYPES bigint ----- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 24 -aggregation(SUM, RowsRead): 0 -===== ----- QUERY -# Tests the correctness of zero slot scan over Parquet. -# Not checking 'NumFileMetadataRead' here since codegen vs non-codegen yield -# different number. -select 1 from functional_orc_def.alltypestiny ----- RESULTS -1 -1 -1 -1 -1 -1 -1 -1 ----- TYPES -tinyint ----- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, RowsRead): 0 ===== ---- QUERY # Parquet count(*) optimization with predicates on the partition columns. @@ -40,10 +16,6 @@ from functional_parquet.alltypes where year < 2010 and month > 8 1220 ---- TYPES bigint ----- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 4 -aggregation(SUM, RowsRead): 0 ===== ---- QUERY # Parquet count(*) optimization with group by partition columns. @@ -76,10 +48,6 @@ from functional_parquet.alltypes group by year, month 2010,12,310 ---- TYPES int, int, bigint ----- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 24 -aggregation(SUM, RowsRead): 0 ===== ---- QUERY # Parquet count(*) optimization with both group by and predicates on partition columns. @@ -93,10 +61,6 @@ group by month 300 ---- TYPES bigint ----- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 4 -aggregation(SUM, RowsRead): 0 ===== ---- QUERY # Parquet count(*) optimization with the result going into a join. @@ -109,10 +73,6 @@ select x.bigint_col from functional.alltypes x 0 ---- TYPES bigint ----- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 24 -aggregation(SUM, RowsRead): 7300 ===== ---- QUERY # Parquet count(*) optimization with the agg function in the having clause. @@ -121,10 +81,6 @@ select 1 from functional_parquet.alltypes having count(*) > 1 1 ---- TYPES tinyint ----- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 24 -aggregation(SUM, RowsRead): 0 ==== ---- QUERY # Verify that 0 is returned for count(*) on an empty table. @@ -133,10 +89,6 @@ select count(1) from functional_parquet.emptytable 0 ---- TYPES bigint ----- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 -aggregation(SUM, RowsRead): 0 ===== ---- QUERY # Verify that 0 is returned when all partitions are pruned. @@ -145,10 +97,6 @@ select count(1) from functional_parquet.alltypes where year = -1 0 ---- TYPES bigint ----- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 0 -aggregation(SUM, RowsRead): 0 ===== ---- QUERY # Test different row group size combinations. @@ -166,9 +114,6 @@ select count(*) from tpch_parquet.lineitem 6001215 ---- TYPES bigint ----- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, RowsRead): 0 ===== ---- QUERY # IMPALA-5679: Count(*) with group by on a string partition column. @@ -191,8 +136,4 @@ select string_col, count(*) from $DATABASE.string_partitioned_table group by str '9',730 ---- TYPES string, bigint ----- RUNTIME_PROFILE -aggregation(SUM, NumRowGroups): 0 -aggregation(SUM, NumFileMetadataRead): 10 -aggregation(SUM, RowsRead): 0 ===== diff --git a/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test b/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test index 3938ff3ec..0d9e17330 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test +++ b/testdata/workloads/functional-query/queries/QueryTest/partition-key-scans.test @@ -11,10 +11,6 @@ INT ---- RUNTIME_PROFILE # Confirm that only one row per file is read. aggregation(SUM, RowsRead): 24 ----- RUNTIME_PROFILE: table_format=parquet,orc -# Confirm that only one metadata per file is read. -aggregation(SUM, RowsRead): 0 -aggregation(SUM, NumFileMetadataRead): 24 ==== ---- QUERY # Test with more complex multiple distinct aggregation. @@ -27,10 +23,6 @@ BIGINT,BIGINT ---- RUNTIME_PROFILE # Confirm that only one row per file is read. aggregation(SUM, RowsRead): 24 ----- RUNTIME_PROFILE: table_format=parquet,orc -# Confirm that only one metadata per file is read. -aggregation(SUM, RowsRead): 0 -aggregation(SUM, NumFileMetadataRead): 24 ==== ---- QUERY # Distinct aggregation with multiple columns. @@ -66,10 +58,6 @@ INT,INT ---- RUNTIME_PROFILE # Confirm that only one row per file is read. aggregation(SUM, RowsRead): 24 ----- RUNTIME_PROFILE: table_format=parquet,orc -# Confirm that only one metadata per file is read. -aggregation(SUM, RowsRead): 0 -aggregation(SUM, NumFileMetadataRead): 24 ==== ---- QUERY # Partition key scan combined with analytic function. @@ -83,10 +71,6 @@ INT,BIGINT ---- RUNTIME_PROFILE # Confirm that only one row per file is read. aggregation(SUM, RowsRead): 24 ----- RUNTIME_PROFILE: table_format=parquet,orc -# Confirm that only one metadata per file is read. -aggregation(SUM, RowsRead): 0 -aggregation(SUM, NumFileMetadataRead): 24 ==== ---- QUERY # Partition scan combined with sort. @@ -123,10 +107,6 @@ INT,INT ---- RUNTIME_PROFILE # Confirm that only one row per file is read. aggregation(SUM, RowsRead): 24 ----- RUNTIME_PROFILE: table_format=parquet,orc -# Confirm that only one metadata per file is read. -aggregation(SUM, RowsRead): 0 -aggregation(SUM, NumFileMetadataRead): 24 ==== ---- QUERY # Partition key scan combined with predicate on partition columns @@ -141,10 +121,6 @@ INT,INT ---- RUNTIME_PROFILE # Confirm that only one row per file is read. aggregation(SUM, RowsRead): 2 ----- RUNTIME_PROFILE: table_format=parquet,orc -# Confirm that only one metadata per file is read. -aggregation(SUM, RowsRead): 0 -aggregation(SUM, NumFileMetadataRead): 2 ==== ---- QUERY # Partition key scan combined with having predicate. @@ -160,10 +136,6 @@ INT,INT ---- RUNTIME_PROFILE # Confirm that only one row per file is read. aggregation(SUM, RowsRead): 24 ----- RUNTIME_PROFILE: table_format=parquet,orc -# Confirm that only one metadata per file is read. -aggregation(SUM, RowsRead): 0 -aggregation(SUM, NumFileMetadataRead): 24 ==== ---- QUERY # Empty table should not return any rows diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test index 698e66da7..13057c0d2 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test +++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters.test @@ -15,10 +15,6 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1" 620 ---- RUNTIME_PROFILE row_regex: .*RowsRead: 2.43K .* ----- RUNTIME_PROFILE: table_format=parquet,orc -row_regex: .*RowsReturned: 2.43K .* -aggregation(SUM, RowsRead): 2 -aggregation(SUM, NumFileMetadataRead): 48 ==== ---- QUERY # Now turn on local filtering: we expect to see a reduction in scan volume. @@ -53,10 +49,6 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1" 620 ---- RUNTIME_PROFILE row_regex: .*RowsRead: 2.43K .* ----- RUNTIME_PROFILE: table_format=parquet,orc -row_regex: .*RowsReturned: 2.43K .* -aggregation(SUM, RowsRead): 2 -aggregation(SUM, NumFileMetadataRead): 48 ==== ---- QUERY # Shuffle join, global mode. Expect filters to be propagated. diff --git a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_mt_dop.test b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_mt_dop.test index 1a6c30cdd..b6fab637d 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_mt_dop.test +++ b/testdata/workloads/functional-query/queries/QueryTest/runtime_filters_mt_dop.test @@ -15,10 +15,6 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1" 620 ---- RUNTIME_PROFILE row_regex: .*RowsRead: 2.43K .* ----- RUNTIME_PROFILE: table_format=parquet,orc -row_regex: .*RowsReturned: 2.43K .* -aggregation(SUM, RowsRead): 2 -aggregation(SUM, NumFileMetadataRead): 48 ==== ---- QUERY # Now turn on local filtering: we expect to see a reduction in scan volume. @@ -53,10 +49,6 @@ on p.month = b.int_col and b.month = 1 and b.string_col = "1" 620 ---- RUNTIME_PROFILE row_regex: .*RowsRead: 2.43K .* ----- RUNTIME_PROFILE: table_format=parquet,orc -row_regex: .*RowsReturned: 2.43K .* -aggregation(SUM, RowsRead): 2 -aggregation(SUM, NumFileMetadataRead): 48 ==== ---- QUERY # Shuffle join, global mode. Expect filters to be propagated. diff --git a/testdata/workloads/functional-query/queries/QueryTest/scanners.test b/testdata/workloads/functional-query/queries/QueryTest/scanners.test index 8dd741f67..eff43d53d 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/scanners.test +++ b/testdata/workloads/functional-query/queries/QueryTest/scanners.test @@ -238,9 +238,6 @@ tinyint 1 ---- RUNTIME_PROFILE aggregation(SUM, RowsRead): 100 ----- RUNTIME_PROFILE: table_format=parquet,orc -aggregation(SUM, RowsRead): 0 -aggregation(SUM, RowsReturned): 200 ==== ---- QUERY select year, count(*) from alltypes group by year diff --git a/tests/custom_cluster/test_executor_groups.py b/tests/custom_cluster/test_executor_groups.py index 74af1051d..22da7969f 100644 --- a/tests/custom_cluster/test_executor_groups.py +++ b/tests/custom_cluster/test_executor_groups.py @@ -771,7 +771,7 @@ class TestExecutorGroups(CustomClusterTestSuite): different number of executors and memory limit in each.""" # A small query with estimated memory per host of 10MB that can run on the small # executor group - SMALL_QUERY = "select count(*) from tpcds_parquet.date_dim where d_year=2022;" + SMALL_QUERY = "select count(*) from tpcds_parquet.date_dim;" # A large query with estimated memory per host of 132MB that can only run on # the large executor group. LARGE_QUERY = "select * from tpcds_parquet.store_sales where ss_item_sk = 1 limit 50;" diff --git a/tests/custom_cluster/test_query_retries.py b/tests/custom_cluster/test_query_retries.py index 56e852970..7d039708a 100644 --- a/tests/custom_cluster/test_query_retries.py +++ b/tests/custom_cluster/test_query_retries.py @@ -79,11 +79,6 @@ class TestQueryRetries(CustomClusterTestSuite): union all select count(*) from functional.alltypes where bool_col = sleep(50)""" - # A simple count query with predicate. The predicate is needed so that the planner does - # not create the optimized count(star) query plan. - _count_query = "select count(*) from tpch_parquet.lineitem where l_orderkey < 50" - _count_query_result = "55" - @classmethod def get_workload(cls): return 'functional-query' @@ -257,7 +252,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Kill an impalad, and run a query. The query should be retried. killed_impalad = self.__kill_random_impalad() - query = self._count_query + query = "select count(*) from tpch_parquet.lineitem" handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true'}) self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60) @@ -269,7 +264,7 @@ class TestQueryRetries(CustomClusterTestSuite): results = self.client.fetch(query, handle) assert results.success assert len(results.data) == 1 - assert self._count_query_result in results.data[0] + assert "6001215" in results.data[0] # The runtime profile of the retried query. retried_runtime_profile = self.client.get_runtime_profile(handle) @@ -317,7 +312,7 @@ class TestQueryRetries(CustomClusterTestSuite): # and the query should be retried. Add delay before admission so that the 2nd node # is removed from the blacklist before scheduler makes schedule for the retried # query. - query = self._count_query + query = "select count(*) from tpch_parquet.lineitem" handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true', 'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'}) @@ -330,7 +325,7 @@ class TestQueryRetries(CustomClusterTestSuite): results = self.client.fetch(query, handle) assert results.success assert len(results.data) == 1 - assert self._count_query_result in results.data[0] + assert "6001215" in results.data[0] # The runtime profile of the retried query. retried_runtime_profile = self.client.get_runtime_profile(handle) @@ -380,7 +375,7 @@ class TestQueryRetries(CustomClusterTestSuite): rpc_not_accessible_impalad = self.cluster.impalads[1] assert rpc_not_accessible_impalad.service.krpc_port == FAILED_KRPC_PORT - query = self._count_query + query = "select count(*) from tpch_parquet.lineitem" handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true', 'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'}) @@ -703,7 +698,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Kill an impalad, and run a query. The query should be retried. self.cluster.impalads[1].kill() - query = self._count_query + query = "select count(*) from tpch_parquet.lineitem" handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true'}) self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60) @@ -742,7 +737,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Kill an impalad, and run a query. The query should be retried. self.cluster.impalads[1].kill() - query = self._count_query + query = "select count(*) from tpch_parquet.lineitem" handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true'}) self.__wait_until_retry_state(handle, 'RETRYING') @@ -772,7 +767,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Kill an impalad, and run a query. The query should be retried. self.cluster.impalads[1].kill() - query = self._count_query + query = "select count(*) from tpch_parquet.lineitem" handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true'}) @@ -796,7 +791,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Kill an impalad, and run a query. The query should be retried. self.cluster.impalads[1].kill() - query = self._count_query + query = "select count(*) from tpch_parquet.lineitem" self.hs2_client.set_configuration({'retry_failed_queries': 'true'}) self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024') self.hs2_client.execute_async(query) @@ -823,7 +818,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Kill an impalad, and run a query. The query should be retried. self.cluster.impalads[1].kill() - query = self._count_query + query = "select count(*) from tpch_parquet.lineitem" self.execute_query_async(query, query_options={'retry_failed_queries': 'true'}) # The number of in-flight queries is 0 at the beginning, then 1 when the original # query is submitted. It's 2 when the retried query is registered. Although the retry @@ -853,7 +848,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Kill an impalad, and run a query. The query should be retried. self.cluster.impalads[1].kill() - query = self._count_query + query = "select count(*) from tpch_parquet.lineitem" handle = self.execute_query_async(query, query_options={'retry_failed_queries': 'true', 'query_timeout_s': '1'}) self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60) @@ -892,7 +887,7 @@ class TestQueryRetries(CustomClusterTestSuite): # Kill an impalad, and run a query. The query should be retried. self.cluster.impalads[1].kill() - query = self._count_query + query = "select count(*) from tpch_parquet.lineitem" client = self.cluster.get_first_impalad().service.create_beeswax_client() client.set_configuration({'retry_failed_queries': 'true'}) handle = client.execute_async(query) @@ -922,7 +917,7 @@ class TestQueryRetries(CustomClusterTestSuite): """Test query retries with the HS2 protocol. Enable the results set cache as well and test that query retries work with the results cache.""" self.cluster.impalads[1].kill() - query = self._count_query + query = "select count(*) from tpch_parquet.lineitem" self.hs2_client.set_configuration({'retry_failed_queries': 'true'}) self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024') handle = self.hs2_client.execute_async(query) @@ -931,7 +926,7 @@ class TestQueryRetries(CustomClusterTestSuite): results = self.hs2_client.fetch(query, handle) assert results.success assert len(results.data) == 1 - assert results.data[0] == self._count_query_result + assert int(results.data[0]) == 6001215 # Validate the live exec summary. retried_query_id = \ diff --git a/tests/query_test/test_aggregation.py b/tests/query_test/test_aggregation.py index 1cf2942ff..0df1767a7 100644 --- a/tests/query_test/test_aggregation.py +++ b/tests/query_test/test_aggregation.py @@ -260,6 +260,24 @@ class TestAggregationQueries(ImpalaTestSuite): # Verify codegen was enabled for all four stages of the aggregation. assert_codegen_enabled(result.runtime_profile, [1, 2, 4, 6]) + def test_parquet_count_star_optimization(self, vector, unique_database): + if (vector.get_value('table_format').file_format != 'text' or + vector.get_value('table_format').compression_codec != 'none'): + # No need to run this test on all file formats + pytest.skip() + self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database) + vector.get_value('exec_option')['batch_size'] = 1 + self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database) + + def test_kudu_count_star_optimization(self, vector, unique_database): + if (vector.get_value('table_format').file_format != 'text' or + vector.get_value('table_format').compression_codec != 'none'): + # No need to run this test on all file formats + pytest.skip() + self.run_test_case('QueryTest/kudu-stats-agg', vector, unique_database) + vector.get_value('exec_option')['batch_size'] = 1 + self.run_test_case('QueryTest/kudu-stats-agg', vector, unique_database) + def test_ndv(self): """Test the version of NDV() that accepts a scale value argument against different column data types. The scale argument is an integer in range @@ -303,55 +321,17 @@ class TestAggregationQueries(ImpalaTestSuite): for j in range(0, 11): assert(ndv_results[i - 1][j] == int(ndv_vals[j])) - def test_grouping_sets(self, vector): - """Tests for ROLLUP, CUBE and GROUPING SETS.""" - if vector.get_value('table_format').file_format == 'hbase': - pytest.xfail(reason="IMPALA-283 - HBase null handling is inconsistent") - self.run_test_case('QueryTest/grouping-sets', vector) - - -class TestAggregationQueriesRunOnce(ImpalaTestSuite): - """Run the aggregation test suite similarly as TestAggregationQueries, but with stricter - constraint. Each test in this class only run once by setting uncompressed text dimension - for all exploration strategy. However, they may not necessarily target uncompressed text - table format. This also run with codegen enabled and disabled to exercise our - non-codegen code""" - @classmethod - def get_workload(self): - return 'functional-query' - - @classmethod - def add_test_dimensions(cls): - super(TestAggregationQueriesRunOnce, cls).add_test_dimensions() - - cls.ImpalaTestMatrix.add_dimension( - create_exec_option_dimension(disable_codegen_options=[False, True])) - - cls.ImpalaTestMatrix.add_dimension( - create_uncompressed_text_dimension(cls.get_workload())) - - def test_parquet_count_star_optimization(self, vector, unique_database): - self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database) - vector.get_value('exec_option')['batch_size'] = 1 - self.run_test_case('QueryTest/parquet-stats-agg', vector, unique_database) - - def test_kudu_count_star_optimization(self, vector): - self.run_test_case('QueryTest/kudu-stats-agg', vector) - vector.get_value('exec_option')['batch_size'] = 1 - self.run_test_case('QueryTest/kudu-stats-agg', vector) - - def test_orc_count_star_optimization(self, vector): - self.run_test_case('QueryTest/orc-stats-agg', vector) - vector.get_value('exec_option')['batch_size'] = 1 - self.run_test_case('QueryTest/orc-stats-agg', vector) - - def test_sampled_ndv(self, vector): + def test_sampled_ndv(self, vector, unique_database): """The SAMPLED_NDV() function is inherently non-deterministic and cannot be reasonably made deterministic with existing options so we test it separately. The goal of this test is to ensure that SAMPLED_NDV() works on all data types and returns approximately sensible estimates. It is not the goal of this test to ensure tight error bounds on the NDV estimates. SAMPLED_NDV() is expected be inaccurate on small data sets like the ones we use in this test.""" + if (vector.get_value('table_format').file_format != 'text' or + vector.get_value('table_format').compression_codec != 'none'): + # No need to run this test on all file formats + pytest.skip() # NDV() is used a baseline to compare SAMPLED_NDV(). Both NDV() and SAMPLED_NDV() # are based on HyperLogLog so NDV() is roughly the best that SAMPLED_NDV() can do. @@ -405,6 +385,12 @@ class TestAggregationQueriesRunOnce(ImpalaTestSuite): for i in range(14, 16): self.appx_equals(int(sampled_ndv_vals[i]) * sample_perc, int(ndv_vals[i]), 2.0) + def test_grouping_sets(self, vector): + """Tests for ROLLUP, CUBE and GROUPING SETS.""" + if vector.get_value('table_format').file_format == 'hbase': + pytest.xfail(reason="IMPALA-283 - HBase null handling is inconsistent") + self.run_test_case('QueryTest/grouping-sets', vector) + class TestDistinctAggregation(ImpalaTestSuite): """Run the distinct aggregation test suite, with codegen and shuffle_distinct_exprs diff --git a/tests/query_test/test_iceberg.py b/tests/query_test/test_iceberg.py index 37ad24e32..4d4db1e37 100644 --- a/tests/query_test/test_iceberg.py +++ b/tests/query_test/test_iceberg.py @@ -453,7 +453,6 @@ class TestIcebergTable(IcebergTestSuite): assert len(data.data) == 1 assert expected in data.data assert "NumRowGroups" not in data.runtime_profile - assert "NumFileMetadataRead" not in data.runtime_profile def expect_results_t(ts, expected_results, expected_cols): expect_results( diff --git a/tests/query_test/test_scanners.py b/tests/query_test/test_scanners.py index 1813483fa..45072e47a 100644 --- a/tests/query_test/test_scanners.py +++ b/tests/query_test/test_scanners.py @@ -1623,8 +1623,7 @@ class TestOrc(ImpalaTestSuite): def _misaligned_orc_stripes_helper( self, table_name, rows_in_table, num_scanners_with_no_reads=0): """Checks if 'num_scanners_with_no_reads' indicates the expected number of scanners - that don't read anything because the underlying file is poorly formatted. - Additionally, test that select count(star) match with expected number of rows. + that don't read anything because the underlying file is poorly formatted """ query = 'select * from %s' % table_name result = self.client.execute(query) @@ -1645,11 +1644,6 @@ class TestOrc(ImpalaTestSuite): total += int(n) assert total == num_scanners_with_no_reads - # Test that select count(star) match with expected number of rows. - query = 'select count(*) from %s' % table_name - result = self.client.execute(query) - assert int(result.data[0]) == rows_in_table - # Skip this test on non-HDFS filesystems, because orc-type-check.test contains Hive # queries that hang in some cases (IMPALA-9345). It would be possible to separate # the tests that use Hive and run most tests on S3, but I think that running these on @@ -1783,13 +1777,13 @@ class TestOrc(ImpalaTestSuite): "CREATE TABLE {db}.{tbl} (id BIGINT) STORED AS ORC", unique_database, test_name, test_files) err = self.execute_query_expect_failure(self.client, - "select count(id) from {0}.{1}".format(unique_database, test_name)) + "select count(*) from {0}.{1}".format(unique_database, test_name)) assert expected_error in str(err) def test_invalid_schema(self, vector, unique_database): """Test scanning of ORC file with malformed schema.""" self._run_invalid_schema_test(unique_database, "corrupt_schema", - "Encountered parse error in tail of ORC file") + "Encountered parse error during schema selection") self._run_invalid_schema_test(unique_database, "corrupt_root_type", "Root of the selected type returned by the ORC lib is not STRUCT: boolean.") diff --git a/tests/util/test_file_parser.py b/tests/util/test_file_parser.py index f1746e45c..256bc5277 100644 --- a/tests/util/test_file_parser.py +++ b/tests/util/test_file_parser.py @@ -264,19 +264,15 @@ def parse_test_file_text(text, valid_section_names, skip_unknown_sections=True): # evaluated for all formats that don't have a commented section for this query. if subsection_name == 'RUNTIME_PROFILE': if subsection_comment is not None and subsection_comment is not "": - allowed_formats = ['kudu', 'parquet', 'orc'] + allowed_formats = ['kudu'] if not subsection_comment.startswith("table_format="): raise RuntimeError('RUNTIME_PROFILE comment (%s) must be of the form ' - '"table_format=FORMAT[,FORMAT2,...]"' % subsection_comment) - parsed_formats = subsection_comment[13:].split(',') - for table_format in parsed_formats: - if table_format not in allowed_formats: - raise RuntimeError('RUNTIME_PROFILE table format (%s) must be in: %s' % - (table_format, allowed_formats)) - else: - subsection_name_for_format = 'RUNTIME_PROFILE_%s' % table_format - parsed_sections[subsection_name_for_format] = subsection_str - continue + '"table_format=FORMAT"' % subsection_comment) + table_format = subsection_comment[13:] + if table_format not in allowed_formats: + raise RuntimeError('RUNTIME_PROFILE table format (%s) must be in: %s' % + (table_format, allowed_formats)) + subsection_name = 'RUNTIME_PROFILE_%s' % table_format parsed_sections[subsection_name] = subsection_str
