This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit d7068ace15b5c7affe0812155f037789905ef74d Author: stiga-huang <[email protected]> AuthorDate: Mon Sep 27 10:12:29 2021 +0800 IMPALA-10894: Pushing down predicates in reading "original files" of ACID tables ACID tables can have "original files" that don't have full ACID schema. For instance, if we upgrade a non-ACID table to full ACID, the original files won't be changed so they don't have ACID columns, i.e. operation, originalTransaction, bucket, rowid, and currentTransaction. Besides rowid, the other 4 columns can be calculated based on the file path. We calculate the rowid as row index inside the file. This is done by setting a first row id for the split then the OrcStructReader fills the rowid slot with values auto-incremented by one. However, if we push down predicates into the ORC reader, some rows may be skipped. The ORC lib guarantees that rows in a returned batch are consecutive. But consecutive batches may skip rows in the middle. So we can't simply auto-increment the first row id by 1 to calculate the row index. Instead, we should use orc::RowReader::getRowNumber() to update the first row index of the returned batch. This patch changes the row index initialization logic to use orc::RowReader::getRowNumber(), and removes the branch that skips pushing down predicates on such case. Tests: - Ran test_full_acid_original_files Change-Id: I5bfdb624fcaf62ffa22f53025761b9dee3fe58a2 Reviewed-on: http://gerrit.cloudera.org:8080/17870 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/hdfs-orc-scanner.cc | 22 +++++----------------- 1 file changed, 5 insertions(+), 17 deletions(-) diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc index e6647cd..99a8fb9 100644 --- a/be/src/exec/hdfs-orc-scanner.cc +++ b/be/src/exec/hdfs-orc-scanner.cc @@ -708,9 +708,6 @@ Status HdfsOrcScanner::NextStripe() { advance_stripe_ = false; stripe_rows_read_ = 0; - bool first_invocation = stripe_idx_ == -1; - int64_t skipped_rows = 0; - // Loop until we have found a non-empty stripe. while (true) { // Reset the parse status for the next stripe. @@ -740,17 +737,11 @@ Status HdfsOrcScanner::NextStripe() { stripe_mid_pos < split_offset + split_length)) { // Middle pos not in split, this stripe will be handled by a different scanner. // Mark if the stripe overlaps with the split. - if (first_invocation) skipped_rows += stripe->getNumberOfRows(); misaligned_stripe_skipped |= CheckStripeOverlapsSplit(stripe_offset, stripe_offset + stripe_len, split_offset, split_offset + split_length); continue; } - // Set the file row index in 'orc_root_reader_' accordingly. - if (first_invocation && acid_synthetic_rowid_ != nullptr) { - orc_root_reader_->SetFileRowIndex(skipped_rows); - } - COUNTER_ADD(num_stripes_counter_, 1); row_reader_options_.range(stripe->getOffset(), stripe_len); try { @@ -791,6 +782,11 @@ Status HdfsOrcScanner::AssembleRows(RowBatch* row_batch) { try { end_of_stripe_ |= !row_reader_->next(*orc_root_batch_); RETURN_IF_ERROR(orc_root_reader_->UpdateInputBatch(orc_root_batch_.get())); + if (acid_synthetic_rowid_ != nullptr) { + // Set the first row index of the batch. The ORC reader guarantees that rows + // are consecutive in the returned batch. + orc_root_reader_->SetFileRowIndex(row_reader_->getRowNumber()); + } if (end_of_stripe_) break; // no more data to process } catch (ResourceError& e) { parse_status_ = e.GetStatus(); @@ -1053,14 +1049,6 @@ Status HdfsOrcScanner::PrepareSearchArguments() { const TupleDescriptor* min_max_tuple_desc = scan_node_->min_max_tuple_desc(); if (!min_max_tuple_desc) return Status::OK(); - // TODO(IMPALA-10894): pushing down predicates into the ORC reader will mess up the - // synthetic(fake) row id, because the row index in the returned batch might differ - // from the index in file (due to some rows are skipped). - if (acid_synthetic_rowid_ != nullptr) { - VLOG_FILE << "Skip pushing down predicates on non-ACID ORC files under an ACID " - "table: " << filename(); - return Status::OK(); - } // Clone the min/max statistics conjuncts. RETURN_IF_ERROR(ScalarExprEvaluator::Clone(&obj_pool_, state_,
