IMPALA-5197: Erroneous corrupted Parquet file message The Parquet file column reader may fail in the middle of producing a scratch tuple batch for various reasons such as exceeding memory limit or cancellation. In which case, the scratch tuple batch may not have materialized all the rows in a row group. We shouldn't erroneously report that the file is corrupted in this case as the column reader didn't completely read the entire row group.
A new test case is added to verify that we won't see this error message. A new failpoint phase GETNEXT_SCANNER is also added to differentiate it from the GETNEXT in the scan node itself. Change-Id: I9138039ec60fbe9deff250b8772036e40e42e1f6 Reviewed-on: http://gerrit.cloudera.org:8080/6787 Reviewed-by: Michael Ho <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/249632b3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/249632b3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/249632b3 Branch: refs/heads/master Commit: 249632b3081e94b18361a43bd635586d6b1e0ed0 Parents: 368115c Author: Michael Ho <[email protected]> Authored: Wed Apr 19 17:27:24 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Tue May 9 09:27:39 2017 +0000 ---------------------------------------------------------------------- be/src/exec/hdfs-parquet-scanner.cc | 2 +- be/src/exec/hdfs-scan-node-base.cc | 6 +++--- be/src/exec/hdfs-scan-node-base.h | 6 +++--- be/src/exec/hdfs-scanner.h | 4 +++- be/src/exec/parquet-column-readers.cc | 27 ++++++++++++++++++--------- be/src/exec/parquet-column-readers.h | 11 +++++++---- common/thrift/PlanNodes.thrift | 1 + tests/common/impala_service.py | 1 - tests/failure/test_failpoints.py | 6 ++++-- 9 files changed, 40 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 046ec46..a6831b7 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -958,7 +958,7 @@ Status HdfsParquetScanner::AssembleRows( scratch_batch_->num_tuples = 0; DCHECK(scratch_batch_->AtEnd()); *skip_row_group = true; - if (num_tuples_mismatch) { + if (num_tuples_mismatch && continue_execution) { parse_status_.MergeStatus(Substitute("Corrupt Parquet file '$0': column '$1' " "had $2 remaining values but expected $3", filename(), col_reader->schema_element().name, last_num_tuples, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index e4ba44d..b6366a9 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -680,7 +680,7 @@ Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition partition->file_format())); } DCHECK(scanner->get() != NULL); - Status status = ExecDebugAction(TExecNodePhase::PREPARE_SCANNER, runtime_state_); + Status status = ScanNodeDebugAction(TExecNodePhase::PREPARE_SCANNER); if (status.ok()) { status = scanner->get()->Open(context); if (!status.ok()) { @@ -904,6 +904,6 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() { } } -Status HdfsScanNodeBase::TriggerDebugAction() { - return ExecDebugAction(TExecNodePhase::GETNEXT, runtime_state_); +Status HdfsScanNodeBase::ScanNodeDebugAction(TExecNodePhase::type phase) { + return ExecDebugAction(phase, runtime_state_); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/hdfs-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index 3453945..94e8952 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -494,9 +494,9 @@ class HdfsScanNodeBase : public ScanNode { /// Scan nodes with a RowBatch queue may have to synchronize calls to this function. void StopAndFinalizeCounters(); - /// Calls ExecDebugAction(). Returns the status based on the debug action specified - /// for the query. - Status TriggerDebugAction(); + /// Calls ExecNode::ExecDebugAction() with 'phase'. Returns the status based on the + /// debug action specified for the query. + Status ScanNodeDebugAction(TExecNodePhase::type phase); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/hdfs-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index 417ade7..24a3c4f 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -403,7 +403,9 @@ class HdfsScanner { /// Triggers debug action of the scan node. This is currently used by parquet column /// readers to exercise various failure paths in parquet scanner. Returns the status /// returned by the scan node's TriggerDebugAction(). - Status TriggerDebugAction() { return scan_node_->TriggerDebugAction(); } + Status ScannerDebugAction() { + return scan_node_->ScanNodeDebugAction(TExecNodePhase::GETNEXT_SCANNER); + } /// Utility function to append an error message for an invalid row. void LogRowParseError(); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/parquet-column-readers.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc index 53848e2..f1ac031 100644 --- a/be/src/exec/parquet-column-readers.cc +++ b/be/src/exec/parquet-column-readers.cc @@ -53,11 +53,15 @@ DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false, // Also, this limit is in place to prevent impala from reading corrupt parquet files. DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size in bytes"); -// Trigger debug action on every 128 tuples produced. This is useful in exercising the -// failure or cancellation path. +// Trigger debug action on every other call of Read*ValueBatch() once at least 128 +// tuples have been produced to simulate failure such as exceeding memory limit. +// Triggering it every other call so as not to always fail on the first column reader +// when materializing multiple columns. Failing on non-empty row batch tests proper +// resources freeing by the Parquet scanner. #ifndef NDEBUG -#define DEBUG_ACTION_TRIGGER (127) -#define SHOULD_TRIGGER_DEBUG_ACTION(x) ((x & DEBUG_ACTION_TRIGGER) == 0) +static int debug_count = 0; +#define SHOULD_TRIGGER_DEBUG_ACTION(num_tuples) \ + ((debug_count++ % 2) == 1 && num_tuples >= 128) #else #define SHOULD_TRIGGER_DEBUG_ACTION(x) (false) #endif @@ -361,7 +365,7 @@ class ScalarColumnReader : public BaseScalarColumnReader { val_count += ret_val_count; num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx); if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) { - continue_execution &= TriggerDebugAction(); + continue_execution &= ColReaderDebugAction(&val_count); } } *num_values = val_count; @@ -765,12 +769,17 @@ class BoolColumnReader : public BaseScalarColumnReader { BitReader bool_values_; }; -bool ParquetColumnReader::TriggerDebugAction() { - Status status = parent_->TriggerDebugAction(); +// Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error handling +// path doesn't falsely report that the file is corrupted. +bool ParquetColumnReader::ColReaderDebugAction(int* val_count) { +#ifndef NDEBUG + Status status = parent_->ScannerDebugAction(); if (!status.ok()) { if (!status.IsCancelled()) parent_->parse_status_.MergeStatus(status); + *val_count = 0; return false; } +#endif return true; } @@ -790,7 +799,7 @@ bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values, continue_execution = ReadValue(pool, tuple); ++val_count; if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) { - continue_execution &= TriggerDebugAction(); + continue_execution &= ColReaderDebugAction(&val_count); } } *num_values = val_count; @@ -806,7 +815,7 @@ bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool, continue_execution = ReadNonRepeatedValue(pool, tuple); ++val_count; if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) { - continue_execution &= TriggerDebugAction(); + continue_execution &= ColReaderDebugAction(&val_count); } } *num_values = val_count; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/be/src/exec/parquet-column-readers.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h index 9de4277..66d8815 100644 --- a/be/src/exec/parquet-column-readers.h +++ b/be/src/exec/parquet-column-readers.h @@ -288,10 +288,13 @@ class ParquetColumnReader { if (max_rep_level() == 0) rep_level_ = 0; } - /// Trigger debug action. Returns false if the debug action deems that the - /// parquet column reader should halt execution. In which case, 'parse_status_' - /// is also updated. - bool TriggerDebugAction(); + /// Called in the middle of creating a scratch tuple batch to simulate failures + /// such as exceeding memory limit or cancellation. Returns false if the debug + /// action deems that the parquet column reader should halt execution. 'val_count' + /// is the counter which the column reader uses to track the number of tuples + /// produced so far. If the column reader should halt execution, 'parse_status_' + /// is updated with the error status and 'val_count' is set to 0. + bool ColReaderDebugAction(int* val_count); }; /// Reader for a single column from the parquet file. It's associated with a http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/common/thrift/PlanNodes.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 1baffc0..e869afb 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -56,6 +56,7 @@ enum TExecNodePhase { PREPARE_SCANNER, OPEN, GETNEXT, + GETNEXT_SCANNER, CLOSE, INVALID } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/tests/common/impala_service.py ---------------------------------------------------------------------- diff --git a/tests/common/impala_service.py b/tests/common/impala_service.py index 265b1b6..3fb73bc 100644 --- a/tests/common/impala_service.py +++ b/tests/common/impala_service.py @@ -126,7 +126,6 @@ class ImpaladService(BaseImpalaService): (num_in_flight_queries, expected_val)) return False - def wait_for_num_known_live_backends(self, expected_value, timeout=30, interval=1): start_time = time() while (time() - start_time < timeout): http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/249632b3/tests/failure/test_failpoints.py ---------------------------------------------------------------------- diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py index 301762b..f67afe4 100644 --- a/tests/failure/test_failpoints.py +++ b/tests/failure/test_failpoints.py @@ -31,7 +31,7 @@ from tests.common.test_dimensions import create_exec_option_dimension from tests.common.test_vector import ImpalaTestDimension FAILPOINT_ACTIONS = ['FAIL', 'CANCEL', 'MEM_LIMIT_EXCEEDED'] -FAILPOINT_LOCATIONS = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'CLOSE'] +FAILPOINT_LOCATIONS = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'GETNEXT_SCANNER', 'CLOSE'] # Map debug actions to their corresponding query options' values. FAILPOINT_ACTION_MAP = {'FAIL': 'FAIL', 'CANCEL': 'WAIT', 'MEM_LIMIT_EXCEEDED': 'MEM_LIMIT_EXCEEDED'} @@ -39,7 +39,7 @@ MT_DOP_VALUES = [0, 4] # Queries should cover all exec nodes. QUERIES = [ - "select * from alltypessmall", + "select * from alltypes", "select count(*) from alltypessmall", "select count(int_col) from alltypessmall group by id", "select 1 from alltypessmall a join alltypessmall b on a.id = b.id", @@ -137,6 +137,8 @@ class TestFailpoints(ImpalaTestSuite): assert 'Expected Failure' except ImpalaBeeswaxException as e: LOG.debug(e) + # IMPALA-5197: None of the debug actions should trigger corrupted file message + assert 'Corrupt Parquet file' not in str(e) def __execute_cancel_action(self, query, vector): LOG.info('Starting async query execution')
