[
https://issues.apache.org/jira/browse/IMPALA-7335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589578#comment-16589578
]
Pooja Nilangekar edited comment on IMPALA-7335 at 8/23/18 6:14 PM:
-------------------------------------------------------------------
>From my understanding of the HdfsScanNode, here are the set of
>transitions/dependencies among the state variables:
# HdfsScanNode::thread_state_::batch_queue_::shutdown_ is only modified once
(false -> true) in the HdfsScanNode::SetDoneInternal(). When the value is
changed, it is guaranteed that done_ is set to true and reader_context_ is
cancelled. It is read in GetBatch() and AddMaterializedRowBatch() functions.
*It acts as a point of synchronization for GetNext() since it invokes
GetNextInternal() which invokes GetBatch(). If the batch returned is NULL, the
status_ is returned to the FragmentInstanceState.*
# HdfsScanNodeBase::progress_ is initialized in HdfsScanNode::Open() and
decremented each time HdfsScanner::Close() is called. It is only decremented
after initialization. *When ScannerThread() or GetNext() functions view its
value as 0**, they invoke SetDone() which transitions the done_ flag, cancels
all ranges in the reader_context_ and sets the shutdown_ flag to true.* It is
used in ThreadTokenAvailableCb to avoid spawning excessive threads, however
this doesn't lead to any non-trivial failure. The extra thread spawned can
simply terminate.
# HdfsScanNode::status_ It transitions in two places. ThreadTokenAvailableCb()
[when thread creation fails] and ScannerThread() [when ProcessSplit() returns
an error]. *In both cases, modification of status_ is followed by a call to
SetDoneInternal().*
# HdfsScanNode::done_ It transitions only via SetDoneInternal.() However, it
is used by multiple threads to signal the completion of the Scan. (It can be
due to completing all ranges, encountering a non-recoverable error or reaching
the scan node's limit).
Here are the dependencies among the transitions.
progress_ --> done_ --> shutdown_ & status_ --> done_ --> shutdown_
Additionally, progress_ --> status_ is a dependency which is not
deterministically followed. One approach to fix the race would be to update the
status_ and invoke SetDoneInternal() in HdfsScanNode::ProcessSplit() before
invoking scanner->Close() in case of failures (a similar flow was a part of the
code before IMPALA-2667). This will ensure that the first non-ok status always
updates the status_ before done_ and shutdown_ flags are set.
was (Author: poojanilangekar):
>From my understanding of the HdfsScanNode, here are the set of
>transitions/dependencies among the state variables:
# HdfsScanNode::thread_state_::batch_queue_::shutdown_ is only modified once
(false -> true) in the HdfsScanNode::SetDoneInternal(). When the value is
changed, it is guaranteed that done_ is set to true and reader_context_ is
cancelled. It is read in GetBatch() and AddMaterializedRowBatch() functions.
*It acts as a point of synchronization for GetNext() since it invokes
GetNextInternal() which invokes GetBatch(). If the batch returned is NULL, the
status_ is returned to the FragmentInstanceState.*
# HdfsScanNodeBase::progress_ is initialized in HdfsScanNode::Open() and
decremented each time HdfsScanner::Close() is called. It is only decremented
after initialization. *When ScannerThread() or GetNext() functions view its
value as 0**, they invoke SetDone() which transitions the done_ flag, cancels
all ranges in the reader_context_ and sets the shutdown_ flag to true.* It is
used in ThreadTokenAvailableCb to avoid spawning excessive threads, however
this doesn't lead to any non-trivial failure. The extra thread spawned can
simply terminate.
# HdfsScanNode::status_ It transitions in two places. ThreadTokenAvailableCb()
[when thread creation fails] and ScannerThread() [when ProcessSplit() returns
an error]. *In both cases, modification of status_ is followed by a call to
SetDoneInternal().*
# HdfsScanNode::done_ It transitions only via SetDoneInternal.() However, it
is used by multiple threads to signal the completion of the Scan. (It can be
due to completing all ranges, encountering a non-recoverable error or reaching
the scan node's limit).
Here are the dependencies among the transitions.
progress_ --> done_ --> shutdown_ & status_ --> done_ --> shutdown_
Additionally, progress_ --> status_ is a dependency which is not
deterministically followed. One approach to fix the race would be to update the
status_ and invoke SetDoneInternal() in HdfsScanNode::ProcessSplit() before
invoking scanner->Close() in case of failures (a similar flow was a part of the
code before IMPALA-2667). This will ensure that the first non-ok status always
updates the status_ before done_ and shutdown_ flags are set.
> Assertion Failure - test_corrupt_files
> --------------------------------------
>
> Key: IMPALA-7335
> URL: https://issues.apache.org/jira/browse/IMPALA-7335
> Project: IMPALA
> Issue Type: Bug
> Affects Versions: Impala 3.1.0
> Reporter: nithya
> Assignee: Pooja Nilangekar
> Priority: Blocker
> Labels: broken-build
>
> test_corrupt_files fails
>
> query_test.test_scanners.TestParquet.test_corrupt_files[exec_option:
> \\{'batch_size': 0, 'num_nodes': 0, 'disable_codegen_rows_threshold': 0,
> 'disable_codegen': False, 'abort_on_error': 1, 'debug_action': None,
> 'exec_single_node_rows_threshold': 0} | table_format: parquet/none] (from
> pytest)
>
> {code:java}
> Error Message
> query_test/test_scanners.py:300: in test_corrupt_files
> self.run_test_case('QueryTest/parquet-abort-on-error', vector)
> common/impala_test_suite.py:420: in run_test_case assert False, "Expected
> exception: %s" % expected_str E AssertionError: Expected exception: Column
> metadata states there are 11 values, but read 10 values from column id.
> STACKTRACE
> query_test/test_scanners.py:300: in test_corrupt_files
> self.run_test_case('QueryTest/parquet-abort-on-error', vector)
> common/impala_test_suite.py:420: in run_test_case
> assert False, "Expected exception: %s" % expected_str
> E AssertionError: Expected exception: Column metadata states there are 11
> values, but read 10 values from column id.
> Standard Error
> -- executing against localhost:21000
> use functional_parquet;
> SET batch_size=0;
> SET num_nodes=0;
> SET disable_codegen_rows_threshold=0;
> SET disable_codegen=False;
> SET abort_on_error=0;
> SET exec_single_node_rows_threshold=0;
> -- executing against localhost:21000
> set num_nodes=1;
> -- executing against localhost:21000
> set num_scanner_threads=1;
> -- executing against localhost:21000
> select id, cnt from bad_column_metadata t, (select count(*) cnt from
> t.int_array) v;
> -- executing against localhost:21000
> SET NUM_NODES="0";
> -- executing against localhost:21000
> SET NUM_SCANNER_THREADS="0";
> -- executing against localhost:21000
> set num_nodes=1;
> -- executing against localhost:21000
> set num_scanner_threads=1;
> -- executing against localhost:21000
> select id from bad_column_metadata;
> -- executing against localhost:21000
> SET NUM_NODES="0";
> -- executing against localhost:21000
> SET NUM_SCANNER_THREADS="0";
> -- executing against localhost:21000
> SELECT * from bad_parquet_strings_negative_len;
> -- executing against localhost:21000
> SELECT * from bad_parquet_strings_out_of_bounds;
> -- executing against localhost:21000
> use functional_parquet;
> SET batch_size=0;
> SET num_nodes=0;
> SET disable_codegen_rows_threshold=0;
> SET disable_codegen=False;
> SET abort_on_error=1;
> SET exec_single_node_rows_threshold=0;
> -- executing against localhost:21000
> set num_nodes=1;
> -- executing against localhost:21000
> set num_scanner_threads=1;
> -- executing against localhost:21000
> select id, cnt from bad_column_metadata t, (select count(*) cnt from
> t.int_array) v;
> -- executing against localhost:21000
> SET NUM_NODES="0";
> -- executing against localhost:21000
> SET NUM_SCANNER_THREADS="0";
> -- executing against localhost:21000
> set num_nodes=1;
> -- executing against localhost:21000
> set num_scanner_threads=1;
> -- executing against localhost:21000
> select id from bad_column_metadata;
> -- executing against localhost:21000
> SET NUM_NODES="0";
> -- executing against localhost:21000
> SET NUM_SCANNER_THREADS="0";
> {code}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]