IMPALA-5186: Handle failed CreateAndOpenScanner() in MT scan. The bug was that a failed CreateAndOpenScanner() could cause a scanner to be closed twice leading to freed memory being accessed. The fix is straightforward.
Testing: - I cleaned up test_failpoints.py and added an MT_DOP test dimension to cover this bug. - Core tests passed. Change-Id: I777c9b8ef2eb5b556c9b145d231c543b3b8ae270 Reviewed-on: http://gerrit.cloudera.org:8080/6618 Reviewed-by: Alex Behm <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/491154c8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/491154c8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/491154c8 Branch: refs/heads/master Commit: 491154c8ef7a9b6b1be2430c1bf119e21ce171c3 Parents: fcefe47 Author: Alex Behm <[email protected]> Authored: Mon Apr 10 23:31:08 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Fri Apr 14 01:50:30 2017 +0000 ---------------------------------------------------------------------- be/src/exec/base-sequence-scanner.cc | 1 + be/src/exec/hdfs-parquet-scanner.cc | 1 + be/src/exec/hdfs-scan-node-base.cc | 2 + be/src/exec/hdfs-scan-node-base.h | 3 +- be/src/exec/hdfs-scan-node-mt.cc | 8 ++- be/src/exec/hdfs-scanner.cc | 9 ++- be/src/exec/hdfs-scanner.h | 4 ++ be/src/exec/hdfs-text-scanner.cc | 1 + tests/failure/test_failpoints.py | 112 +++++++++++++----------------- 9 files changed, 73 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/be/src/exec/base-sequence-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/base-sequence-scanner.cc b/be/src/exec/base-sequence-scanner.cc index 432dac8..411d87c 100644 --- a/be/src/exec/base-sequence-scanner.cc +++ b/be/src/exec/base-sequence-scanner.cc @@ -97,6 +97,7 @@ Status BaseSequenceScanner::Open(ScannerContext* context) { } void BaseSequenceScanner::Close(RowBatch* row_batch) { + DCHECK(!is_closed_); VLOG_FILE << "Bytes read past scan range: " << -stream_->bytes_left(); VLOG_FILE << "Average block size: " << (num_syncs_ > 1 ? total_block_size_ / (num_syncs_ - 1) : 0); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/be/src/exec/hdfs-parquet-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index aba2bef..046ec46 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -267,6 +267,7 @@ Status HdfsParquetScanner::Open(ScannerContext* context) { } void HdfsParquetScanner::Close(RowBatch* row_batch) { + DCHECK(!is_closed_); if (row_batch != nullptr) { FlushRowGroupResources(row_batch); row_batch->tuple_data_pool()->AcquireData(template_tuple_pool_.get(), false); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/be/src/exec/hdfs-scan-node-base.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc index 238155f..24802e3 100644 --- a/be/src/exec/hdfs-scan-node-base.cc +++ b/be/src/exec/hdfs-scan-node-base.cc @@ -684,9 +684,11 @@ Status HdfsScanNodeBase::CreateAndOpenScanner(HdfsPartitionDescriptor* partition if (!status.ok()) { RowBatch* batch = (HasRowBatchQueue()) ? scanner->get()->batch() : NULL; scanner->get()->Close(batch); + scanner->reset(); } } else { context->ClearStreams(); + scanner->reset(); } return status; } http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/be/src/exec/hdfs-scan-node-base.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-base.h b/be/src/exec/hdfs-scan-node-base.h index 7777d4d..1711bb5 100644 --- a/be/src/exec/hdfs-scan-node-base.h +++ b/be/src/exec/hdfs-scan-node-base.h @@ -452,8 +452,7 @@ class HdfsScanNodeBase : public ScanNode { Status IssueInitialScanRanges(RuntimeState* state); /// Create and open new scanner for this partition type. - /// If the scanner is successfully created, it is returned in 'scanner'. - /// Passes 'add_batches_to_queue' to the scanner constructor. + /// If the scanner is successfully created and opened, it is returned in 'scanner'. Status CreateAndOpenScanner(HdfsPartitionDescriptor* partition, ScannerContext* context, boost::scoped_ptr<HdfsScanner>* scanner); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/be/src/exec/hdfs-scan-node-mt.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc index 9fad46e..53aa026 100644 --- a/be/src/exec/hdfs-scan-node-mt.cc +++ b/be/src/exec/hdfs-scan-node-mt.cc @@ -89,7 +89,13 @@ Status HdfsScanNodeMt::GetNext(RuntimeState* state, RowBatch* row_batch, bool* e HdfsPartitionDescriptor* partition = hdfs_table_->GetPartition(partition_id); scanner_ctx_.reset(new ScannerContext( runtime_state_, this, partition, scan_range_, filter_ctxs())); - RETURN_IF_ERROR(CreateAndOpenScanner(partition, scanner_ctx_.get(), &scanner_)); + Status status = CreateAndOpenScanner(partition, scanner_ctx_.get(), &scanner_); + if (!status.ok()) { + DCHECK(scanner_ == NULL); + // Avoid leaking unread buffers in the scan range. + scan_range_->Cancel(status); + return status; + } } Status status = scanner_->GetNext(row_batch); http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/be/src/exec/hdfs-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.cc b/be/src/exec/hdfs-scanner.cc index d613b15..a529668 100644 --- a/be/src/exec/hdfs-scanner.cc +++ b/be/src/exec/hdfs-scanner.cc @@ -63,6 +63,7 @@ HdfsScanner::HdfsScanner(HdfsScanNodeBase* scan_node, RuntimeState* state) context_(NULL), stream_(NULL), eos_(false), + is_closed_(false), scanner_conjunct_ctxs_(NULL), template_tuple_pool_(new MemPool(scan_node->mem_tracker())), template_tuple_(NULL), @@ -83,6 +84,7 @@ HdfsScanner::HdfsScanner() context_(NULL), stream_(NULL), eos_(false), + is_closed_(false), scanner_conjunct_ctxs_(NULL), template_tuple_pool_(NULL), template_tuple_(NULL), @@ -132,12 +134,17 @@ Status HdfsScanner::Open(ScannerContext* context) { } void HdfsScanner::Close(RowBatch* row_batch) { - if (decompressor_.get() != NULL) decompressor_->Close(); + DCHECK(!is_closed_); + if (decompressor_.get() != NULL) { + decompressor_->Close(); + decompressor_.reset(); + } for (const auto& entry: scanner_conjuncts_map_) Expr::Close(entry.second, state_); for (const auto& entry: scanner_dict_filter_map_) Expr::Close(entry.second, state_); obj_pool_.Clear(); stream_ = NULL; context_->ClearStreams(); + is_closed_ = true; } Status HdfsScanner::InitializeWriteTuplesFn(HdfsPartitionDescriptor* partition, http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/be/src/exec/hdfs-scanner.h ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h index f61c5fc..417ade7 100644 --- a/be/src/exec/hdfs-scanner.h +++ b/be/src/exec/hdfs-scanner.h @@ -142,6 +142,7 @@ class HdfsScanner { /// and memory in mem pools to the given row batch. If the row batch is NULL, /// those resources are released instead. In any case, releases all other resources /// that are not backing returned rows (e.g. temporary decompression buffers). + /// This function is not idempotent and must only be called once. virtual void Close(RowBatch* row_batch); /// Only valid to call if the parent scan node is single-threaded. @@ -202,6 +203,9 @@ class HdfsScanner { /// Only relevant when calling the GetNext() interface. bool eos_; + /// Starts as false and is set to true in Close(). + bool is_closed_; + /// Clones of the conjuncts ExprContexts in scan_node_->conjuncts_map(). Each scanner /// has its own ExprContexts so the conjuncts can be safely evaluated in parallel. HdfsScanNodeBase::ConjunctsMap scanner_conjuncts_map_; http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/be/src/exec/hdfs-text-scanner.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/hdfs-text-scanner.cc b/be/src/exec/hdfs-text-scanner.cc index ced7ab1..0a66460 100644 --- a/be/src/exec/hdfs-text-scanner.cc +++ b/be/src/exec/hdfs-text-scanner.cc @@ -169,6 +169,7 @@ Status HdfsTextScanner::ProcessSplit() { } void HdfsTextScanner::Close(RowBatch* row_batch) { + DCHECK(!is_closed_); // Need to close the decompressor before transferring the remaining resources to // 'row_batch' because in some cases there is memory allocated in the decompressor_'s // temp_memory_pool_. http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/491154c8/tests/failure/test_failpoints.py ---------------------------------------------------------------------- diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py index b51950e..301762b 100644 --- a/tests/failure/test_failpoints.py +++ b/tests/failure/test_failpoints.py @@ -30,32 +30,26 @@ from tests.common.skip import SkipIf, SkipIfS3, SkipIfIsilon, SkipIfLocal from tests.common.test_dimensions import create_exec_option_dimension from tests.common.test_vector import ImpalaTestDimension -FAILPOINT_ACTION = ['FAIL', 'CANCEL', 'MEM_LIMIT_EXCEEDED'] -FAILPOINT_LOCATION = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'CLOSE'] +FAILPOINT_ACTIONS = ['FAIL', 'CANCEL', 'MEM_LIMIT_EXCEEDED'] +FAILPOINT_LOCATIONS = ['PREPARE', 'PREPARE_SCANNER', 'OPEN', 'GETNEXT', 'CLOSE'] # Map debug actions to their corresponding query options' values. FAILPOINT_ACTION_MAP = {'FAIL': 'FAIL', 'CANCEL': 'WAIT', 'MEM_LIMIT_EXCEEDED': 'MEM_LIMIT_EXCEEDED'} - -# The goal of this query is to use all of the node types. -# TODO: This query could be simplified a bit... -QUERY = """ -select a.int_col, count(b.int_col) int_sum, count(c.c_name) -from functional_hbase.alltypesagg a, tpch_parquet.customer c -join - (select * from alltypes - where year=2009 and month=1 order by int_col limit 2500 - union all - select * from alltypes - where year=2009 and month=2 limit 3000) b -on (a.int_col = b.int_col) and (a.int_col = c.c_custkey) -where c.c_mktsegment = 'BUILDING' -group by a.int_col -order by int_sum -""" - -# TODO: Update to include INSERT when we support failpoints in the HDFS/Hbase sinks using -# a similar pattern as test_cancellation.py -QUERY_TYPE = ["SELECT"] +MT_DOP_VALUES = [0, 4] + +# Queries should cover all exec nodes. +QUERIES = [ + "select * from alltypessmall", + "select count(*) from alltypessmall", + "select count(int_col) from alltypessmall group by id", + "select 1 from alltypessmall a join alltypessmall b on a.id = b.id", + "select 1 from alltypessmall a join alltypessmall b on a.id != b.id", + "select 1 from alltypessmall order by id", + "select 1 from alltypessmall order by id limit 100", + "select * from alltypessmall union all select * from alltypessmall", + "select row_number() over (partition by int_col order by id) from alltypessmall", + "select c from (select id c from alltypessmall order by id limit 10) v where c = 1" +] @SkipIf.skip_hbase # -skip_hbase argument specified @SkipIfS3.hbase # S3: missing coverage: failures @@ -67,69 +61,46 @@ class TestFailpoints(ImpalaTestSuite): return 'functional-query' @classmethod - def parse_plan_nodes_from_explain_output(cls, query, use_db="default"): - """Parses the EXPLAIN <query> output and returns a map of node_name->list(node_id)""" - client = cls.create_impala_client() - client.execute("use %s" % use_db) - explain_result = client.execute("explain " + QUERY) - # Maps plan node names to their respective node ids. Expects format of <ID>:<NAME> - node_id_map = defaultdict(list) - for row in explain_result.data: - match = re.search(r'\s*(?P<node_id>\d+)\:(?P<node_type>\S+\s*\S+)', row) - if match is not None: - node_id_map[match.group('node_type')].append(int(match.group('node_id'))) - return node_id_map - - @classmethod def add_test_dimensions(cls): super(TestFailpoints, cls).add_test_dimensions() - # Executing an explain on the the test query will fail in an enviornment where hbase - # tables don't exist (s3). Since this happens before the tests are run, the skipif - # marker won't catch it. If 's3' is detected as a file system, return immedietely. - if os.getenv("TARGET_FILESYSTEM") in ["s3", "isilon", "local"]: return - node_id_map = TestFailpoints.parse_plan_nodes_from_explain_output(QUERY, "functional") - assert node_id_map cls.ImpalaTestMatrix.add_dimension( - ImpalaTestDimension('location', *FAILPOINT_LOCATION)) + ImpalaTestDimension('query', *QUERIES)) cls.ImpalaTestMatrix.add_dimension( - ImpalaTestDimension('target_node', *(node_id_map.items()))) + ImpalaTestDimension('action', *FAILPOINT_ACTIONS)) cls.ImpalaTestMatrix.add_dimension( - ImpalaTestDimension('action', *FAILPOINT_ACTION)) + ImpalaTestDimension('location', *FAILPOINT_LOCATIONS)) cls.ImpalaTestMatrix.add_dimension( - ImpalaTestDimension('query_type', *QUERY_TYPE)) + ImpalaTestDimension('mt_dop', *MT_DOP_VALUES)) cls.ImpalaTestMatrix.add_dimension( create_exec_option_dimension([0], [False], [0])) - # These are invalid test cases. - # For more info see IMPALA-55 and IMPALA-56. - cls.ImpalaTestMatrix.add_constraint(lambda v: not ( - v.get_value('action') == 'FAIL' and - v.get_value('location') in ['CLOSE'] and - v.get_value('target_node')[0] in ['AGGREGATE', 'HASH JOIN']) and - not (v.get_value('location') in ['PREPARE'] and - v.get_value('action') == 'CANCEL')) - # Don't create CLOSE:WAIT debug actions to avoid leaking plan fragments (there's no # way to cancel a plan fragment once Close() has been called) cls.ImpalaTestMatrix.add_constraint( lambda v: not (v.get_value('action') == 'CANCEL' and v.get_value('location') == 'CLOSE')) - # No need to test error in scanner preparation for non-scan nodes. - cls.ImpalaTestMatrix.add_constraint( - lambda v: (v.get_value('location') != 'PREPARE_SCANNER' or - v.get_value('target_node')[0] == 'SCAN HDFS')) - # Run serially because this can create enough memory pressure to invoke the Linux OOM # killer on machines with 30GB RAM. This makes the test run in 4 minutes instead of 1-2. @pytest.mark.execute_serially def test_failpoints(self, vector): - query = QUERY - node_type, node_ids = vector.get_value('target_node') + query = vector.get_value('query') action = vector.get_value('action') location = vector.get_value('location') + vector.get_value('exec_option')['mt_dop'] = vector.get_value('mt_dop') - for node_id in node_ids: + if action == "CANCEL" and location == "PREPARE": + pytest.xfail(reason="IMPALA-5202 leads to a hang.") + + try: + plan_node_ids = self.__parse_plan_nodes_from_explain(query, vector) + except ImpalaBeeswaxException as e: + if "MT_DOP not supported" in str(e): + pytest.xfail(reason="MT_DOP not supported.") + else: + raise e + + for node_id in plan_node_ids: debug_action = '%d:%s:%s' % (node_id, location, FAILPOINT_ACTION_MAP[action]) LOG.info('Current debug action: SET DEBUG_ACTION=%s' % debug_action) vector.get_value('exec_option')['debug_action'] = debug_action @@ -146,6 +117,19 @@ class TestFailpoints(ImpalaTestSuite): del vector.get_value('exec_option')['debug_action'] self.execute_query(query, vector.get_value('exec_option')) + def __parse_plan_nodes_from_explain(self, query, vector): + """Parses the EXPLAIN <query> output and returns a list of node ids. + Expects format of <ID>:<NAME>""" + explain_result =\ + self.execute_query("explain " + query, vector.get_value('exec_option'), + table_format=vector.get_value('table_format')) + node_ids = [] + for row in explain_result.data: + match = re.search(r'\s*(?P<node_id>\d+)\:(?P<node_type>\S+\s*\S+)', row) + if match is not None: + node_ids.append(int(match.group('node_id'))) + return node_ids + def __execute_fail_action(self, query, vector): try: self.execute_query(query, vector.get_value('exec_option'),
