IMPALA-3962: Clean up scratch tuple batch on scan failures The parquet scanner doesn't clean up 'scratch_batch_' properly which causes it to process a partially filled scratch_batch_ if any of the column reader fails. This change cleans up the scratch batch if any of the parquet column readers fails. The clean up involves freeing the mem_pool of scratch_batch_ and setting number of tuples in scratch_batch_ to 0.
This change also extends debug action to emulate the behavior of exceeding the query's memory limit. Change-Id: If1e27a1517d09ccaabdae1492b7e1fbf661ae3e5 Reviewed-on: http://gerrit.cloudera.org:8080/3991 Reviewed-by: Michael Ho <[email protected]> Tested-by: Internal Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/2aa86309 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/2aa86309 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/2aa86309 Branch: refs/heads/master Commit: 2aa86309d15cba8a965c83d81308e32df899ec8a Parents: f613dcd Author: Michael Ho <[email protected]> Authored: Mon Aug 15 00:19:54 2016 -0700 Committer: Internal Jenkins <[email protected]> Committed: Fri Aug 19 22:37:28 2016 +0000 ---------------------------------------------------------------------- be/src/exec/exec-node.cc | 3 +++ be/src/exec/hdfs-parquet-scanner.cc | 28 +++++++++++++++------------- be/src/exec/hdfs-scan-node.cc | 4 ++++ be/src/exec/hdfs-scan-node.h | 5 +++++ be/src/exec/hdfs-scanner.cc | 1 - be/src/exec/hdfs-scanner.h | 5 +++++ be/src/exec/parquet-column-readers.cc | 27 +++++++++++++++++++++++++++ be/src/exec/parquet-column-readers.h | 5 +++++ common/thrift/PlanNodes.thrift | 3 ++- tests/failure/test_failpoints.py | 17 ++++++++++------- 10 files changed, 76 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/exec-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index aeceb6c..937c6f2 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -437,6 +437,9 @@ Status ExecNode::ExecDebugAction(TExecNodePhase::type phase, RuntimeState* state ErrorMsg(TErrorCode::INTERNAL_ERROR, "Debug Action: INJECT_ERROR_LOG")); return Status::OK(); } + if (debug_action_ == TDebugAction::MEM_LIMIT_EXCEEDED) { + mem_tracker()->MemLimitExceeded(state, "Debug Action: MEM_LIMIT_EXCEEDED"); + } return Status::OK(); } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index 613761b..ee5f4d9 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -343,11 +343,11 @@ Status HdfsParquetScanner::GetNextInternal(RowBatch* row_batch, bool* eos) { } while (advance_row_group_ || column_readers_[0]->RowGroupAtEnd()) { + // Transfer resources and clear streams if there is any leftover from the previous + // row group. We will create new streams for the next row group. + FlushRowGroupResources(row_batch); + context_->ClearStreams(); if (!advance_row_group_) { - // End of the previous row group. Transfer resources and clear streams because - // we will create new streams for the next row group. - FlushRowGroupResources(row_batch); - context_->ClearStreams(); Status status = ValidateEndOfRowGroup(column_readers_, row_group_idx_, row_group_rows_read_); if (!status.ok()) RETURN_IF_ERROR(state_->LogOrReturnError(status.msg())); @@ -501,17 +501,19 @@ Status HdfsParquetScanner::AssembleRows( scratch_batch_->mem_pool(), scratch_capacity, tuple_byte_size_, scratch_batch_->tuple_mem, &scratch_batch_->num_tuples); } - if (UNLIKELY(!continue_execution)) { - *skip_row_group = true; - return Status::OK(); - } // Check that all column readers populated the same number of values. - if (c != 0 && UNLIKELY(last_num_tuples != scratch_batch_->num_tuples)) { - parse_status_.MergeStatus(Substitute("Corrupt Parquet file '$0': column '$1' " - "had $2 remaining values but expected $3", filename(), - col_reader->schema_element().name, last_num_tuples, - scratch_batch_->num_tuples)); + bool num_tuples_mismatch = c != 0 && last_num_tuples != scratch_batch_->num_tuples; + if (UNLIKELY(!continue_execution || num_tuples_mismatch)) { + // Skipping this row group. Free up all the resources with this row group. + scratch_batch_->mem_pool()->FreeAll(); + scratch_batch_->num_tuples = 0; *skip_row_group = true; + if (num_tuples_mismatch) { + parse_status_.MergeStatus(Substitute("Corrupt Parquet file '$0': column '$1' " + "had $2 remaining values but expected $3", filename(), + col_reader->schema_element().name, last_num_tuples, + scratch_batch_->num_tuples)); + } return Status::OK(); } last_num_tuples = scratch_batch_->num_tuples; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/hdfs-scan-node.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index 1421c0b..6077f99 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -451,6 +451,10 @@ void HdfsScanNode::TransferToScanNodePool(MemPool* pool) { scan_node_pool_->AcquireData(pool, false); } +Status HdfsScanNode::TriggerDebugAction() { + return ExecDebugAction(TExecNodePhase::GETNEXT, runtime_state_); +} + Status HdfsScanNode::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); runtime_state_ = state; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/hdfs-scan-node.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h index 05ee50e..ae38856 100644 --- a/be/src/exec/hdfs-scan-node.h +++ b/be/src/exec/hdfs-scan-node.h @@ -317,6 +317,7 @@ class HdfsScanNode : public ScanNode { protected: friend class ScannerContext; + friend class HdfsScanner; RuntimeState* runtime_state_; @@ -571,6 +572,10 @@ class HdfsScanNode : public ScanNode { /// true if all filters arrived within the time limit (as measured from the time of /// RuntimeFilterBank::RegisterFilter()), false otherwise. bool WaitForRuntimeFilters(int32_t time_ms); + + /// Calls ExecDebugAction(). Returns the status based on the debug action specified + /// for the query. + Status TriggerDebugAction(); }; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/hdfs-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index 6b4373e..81cd1f0 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -669,4 +669,3 @@ void HdfsScanner::ReportColumnParseError(const SlotDescriptor* desc, if (state_->abort_on_error() && parse_status_.ok()) parse_status_ = Status(ss.str()); } } - http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/hdfs-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index a5e3b51..53711ab 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -378,6 +378,11 @@ class HdfsScanner { /// This is called from WriteAlignedTuples. bool ReportTupleParseError(FieldLocation* fields, uint8_t* errors, int row_idx); + /// Triggers debug action of the scan node. This is currently used by parquet column + /// readers to exercise various failure paths in parquet scanner. Returns the status + /// returned by the scan node's TriggerDebugAction(). + Status TriggerDebugAction() { return scan_node_->TriggerDebugAction(); } + /// Utility function to append an error message for an invalid row. This is called /// from ReportTupleParseError() /// row_idx is the index of the row in the current batch. Subclasses should override http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/parquet-column-readers.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc index c7e3e17..48acb27 100644 --- a/be/src/exec/parquet-column-readers.cc +++ b/be/src/exec/parquet-column-readers.cc @@ -54,6 +54,15 @@ DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false, // Also, this limit is in place to prevent impala from reading corrupt parquet files. DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size in bytes"); +// Trigger debug action on every 128 tuples produced. This is useful in exercising the +// failure or cancellation path. +#ifndef NDEBUG +#define DEBUG_ACTION_TRIGGER (127) +#define SHOULD_TRIGGER_DEBUG_ACTION(x) ((x & DEBUG_ACTION_TRIGGER) == 0) +#else +#define SHOULD_TRIGGER_DEBUG_ACTION(x) (false) +#endif + namespace impala { const string PARQUET_MEM_LIMIT_EXCEEDED = "HdfsParquetScanner::$0() failed to allocate " @@ -330,6 +339,9 @@ class ScalarColumnReader : public BaseScalarColumnReader { } val_count += ret_val_count; num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx); + if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) { + continue_execution &= TriggerDebugAction(); + } } *num_values = val_count; return continue_execution; @@ -630,6 +642,15 @@ class BoolColumnReader : public BaseScalarColumnReader { BitReader bool_values_; }; +bool ParquetColumnReader::TriggerDebugAction() { + Status status = parent_->TriggerDebugAction(); + if (!status.ok()) { + if (!status.IsCancelled()) parent_->parse_status_.MergeStatus(status); + return false; + } + return true; +} + bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) { int val_count = 0; @@ -645,6 +666,9 @@ bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values, if (pos_slot_desc_ != NULL) ReadPosition(tuple); continue_execution = ReadValue(pool, tuple); ++val_count; + if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) { + continue_execution &= TriggerDebugAction(); + } } *num_values = val_count; return continue_execution; @@ -658,6 +682,9 @@ bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool, Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size); continue_execution = ReadNonRepeatedValue(pool, tuple); ++val_count; + if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) { + continue_execution &= TriggerDebugAction(); + } } *num_values = val_count; return continue_execution; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/be/src/exec/parquet-column-readers.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h index 8435e71..e1a3061 100644 --- a/be/src/exec/parquet-column-readers.h +++ b/be/src/exec/parquet-column-readers.h @@ -287,6 +287,11 @@ class ParquetColumnReader { // rep_level_ is always valid and equal to 0 if col not in collection. if (max_rep_level() == 0) rep_level_ = 0; } + + /// Trigger debug action. Returns false if the debug action deems that the + /// parquet column reader should halt execution. In which case, 'parse_status_' + /// is also updated. + bool TriggerDebugAction(); }; /// Reader for a single column from the parquet file. It's associated with a http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/common/thrift/PlanNodes.thrift ---------------------------------------------------------------------- diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift index 43b64d2..3e31120 100644 --- a/common/thrift/PlanNodes.thrift +++ b/common/thrift/PlanNodes.thrift @@ -64,7 +64,8 @@ enum TExecNodePhase { enum TDebugAction { WAIT, FAIL, - INJECT_ERROR_LOG + INJECT_ERROR_LOG, + MEM_LIMIT_EXCEEDED, } // Preference for replica selection http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/2aa86309/tests/failure/test_failpoints.py ---------------------------------------------------------------------- diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py index d43bfd7..6eac663 100644 --- a/tests/failure/test_failpoints.py +++ b/tests/failure/test_failpoints.py @@ -29,20 +29,25 @@ from tests.common.skip import SkipIf, SkipIfS3, SkipIfIsilon, SkipIfLocal from tests.common.test_dimensions import create_exec_option_dimension from tests.common.test_vector import TestDimension -FAILPOINT_ACTION = ['FAIL', 'CANCEL'] +FAILPOINT_ACTION = ['FAIL', 'CANCEL', 'MEM_LIMIT_EXCEEDED'] FAILPOINT_LOCATION = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'CLOSE'] +# Map debug actions to their corresponding query options' values. +FAILPOINT_ACTION_MAP = {'FAIL': 'FAIL', 'CANCEL': 'WAIT', + 'MEM_LIMIT_EXCEEDED': 'MEM_LIMIT_EXCEEDED'} # The goal of this query is to use all of the node types. # TODO: This query could be simplified a bit... QUERY = """ -select a.int_col, count(b.int_col) int_sum from functional_hbase.alltypesagg a +select a.int_col, count(b.int_col) int_sum, count(l.l_shipdate) +from functional_hbase.alltypesagg a, tpch_nested_parquet.customer c, c.c_orders.o_lineitems l join (select * from alltypes where year=2009 and month=1 order by int_col limit 2500 union all select * from alltypes where year=2009 and month=2 limit 3000) b -on (a.int_col = b.int_col) +on (a.int_col = b.int_col) and (a.int_col = c.c_custkey) +where c.c_mktsegment = 'BUILDING' group by a.int_col order by int_sum """ @@ -109,7 +114,6 @@ class TestFailpoints(ImpalaTestSuite): lambda v: (v.get_value('location') != 'PREPARE_SCANNER' or v.get_value('target_node')[0] == 'SCAN HDFS')) - def test_failpoints(self, vector): query = QUERY node_type, node_ids = vector.get_value('target_node') @@ -117,14 +121,13 @@ class TestFailpoints(ImpalaTestSuite): location = vector.get_value('location') for node_id in node_ids: - debug_action = '%d:%s:%s' % (node_id, location, - 'WAIT' if action == 'CANCEL' else 'FAIL') + debug_action = '%d:%s:%s' % (node_id, location, FAILPOINT_ACTION_MAP[action]) LOG.info('Current dubug action: SET DEBUG_ACTION=%s' % debug_action) vector.get_value('exec_option')['debug_action'] = debug_action if action == 'CANCEL': self.__execute_cancel_action(query, vector) - elif action == 'FAIL': + elif action == 'FAIL' or action == 'MEM_LIMIT_EXCEEDED': self.__execute_fail_action(query, vector) else: assert 0, 'Unknown action: %s' % action
