[
https://issues.apache.org/jira/browse/HIVE-8457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chao updated HIVE-8457:
-----------------------
Description:
Currently, on the Spark branch, each thread it is bound with a thread-local
IOContext, which gets initialized when we generates an input {{HadoopRDD}}, and
later used in {{MapOperator}}, {{FilterOperator}}, etc.
And, given the introduction of HIVE-8118, we may have multiple downstream RDDs
that share the same input {{HadoopRDD}}, and we would like to have the
{{HadoopRDD}} to be cached, to avoid scanning the same table multiple times. A
typical case would be like the following:
{noformat}
inputRDD inputRDD
| |
MT_11 MT_12
| |
RT_1 RT_2
{noformat}
Here, {{MT_11}} and {{MT_12}} are {{MapTran}} from a splitted {{MapWork}},
and {{RT_1}} and {{RT_2}} are two {{ReduceTran}}. Note that, this example is
simplified, as we may also have {{ShuffleTran}} between {{MapTran}} and
{{ReduceTran}}.
When multiple Spark threads are running, {{MT_11}} may be executed first, and
it will ask for an iterator from the {{HadoopRDD}} will trigger the creation of
the iterator, which in turn triggers the initialization of the {{IOContext}}
associated with that particular thread.
*Now, the problem is*: before {{MT_12}} starts executing, it will also ask for
an iterator from the
{{HadoopRDD}}, and since the RDD is already cached, instead of creating a new
iterator, it will just fetch it from the cached result. However, *this will
skip the initialization of the IOContext associated with this particular
thread*. And, when {{MT_12}} starts executing, it will try to initialize the
{{MapOperator}}, but since the {{IOContext}} is not initialized, this will fail
miserably.
was:
Currently, on the Spark branch, each thread it is bound with a thread-local
IOContext, which gets initialized when we generates an input {{HadoopRDD}}, and
later used in {{MapOperator}}, {{FilterOperator}}, etc.
And, given the introduction of HIVE-8118, we may have multiple downstream RDDs
that share the same input {{HadoopRDD}}, and we would like to have the
{{HadoopRDD}} to be cached, to avoid scanning the same table multiple times. A
typical case would be like the following:
{noformat}
inputRDD inputRDD
| |
MT_11 MT_12
| |
RT_1 RT_2
{noformat}
Here, {{MT_11}} and {{MT_12}} are {{MapTran}} from a splitted {{MapWork}},
and {{RT_1}} and {{RT_2}} are two {{ReduceTran}}. Note that, this example is
simplified, as we may also have {{ShuffleTran}} between {{MapTran}} and
{{ReduceTran}}.
When multiple Spark threads are running, {{MT_11}} may be executed first, and
it will ask for an iterator from the {{HadoopRDD}} will trigger the creation of
the iterator, which in turn triggers the initialization of the {{IOContext}}
associated with that particular thread.
*Now, the problem is*: before {{MT_12}} starts executing, it will also ask for
an iterator from the
{{HadoopRDD}}, and since the RDD is already cached, instead of creating a new
iterator, it will just fetch it from the cached result. However, this will skip
the initialization of the IOContext associated with this particular thread.
And, when {{MT_12}} starts executing, it will try to initialize the
{{MapOperator}}, but since the {{IOContext}} is not initialized, this will fail
miserably.
> MapOperator initialization when multiple Spark threads is enabled. [Spark
> Branch]
> ---------------------------------------------------------------------------------
>
> Key: HIVE-8457
> URL: https://issues.apache.org/jira/browse/HIVE-8457
> Project: Hive
> Issue Type: Bug
> Components: Spark
> Reporter: Chao
>
> Currently, on the Spark branch, each thread it is bound with a thread-local
> IOContext, which gets initialized when we generates an input {{HadoopRDD}},
> and later used in {{MapOperator}}, {{FilterOperator}}, etc.
> And, given the introduction of HIVE-8118, we may have multiple downstream
> RDDs that share the same input {{HadoopRDD}}, and we would like to have the
> {{HadoopRDD}} to be cached, to avoid scanning the same table multiple times.
> A typical case would be like the following:
> {noformat}
> inputRDD inputRDD
> | |
> MT_11 MT_12
> | |
> RT_1 RT_2
> {noformat}
> Here, {{MT_11}} and {{MT_12}} are {{MapTran}} from a splitted {{MapWork}},
> and {{RT_1}} and {{RT_2}} are two {{ReduceTran}}. Note that, this example is
> simplified, as we may also have {{ShuffleTran}} between {{MapTran}} and
> {{ReduceTran}}.
> When multiple Spark threads are running, {{MT_11}} may be executed first, and
> it will ask for an iterator from the {{HadoopRDD}} will trigger the creation
> of the iterator, which in turn triggers the initialization of the
> {{IOContext}} associated with that particular thread.
> *Now, the problem is*: before {{MT_12}} starts executing, it will also ask
> for an iterator from the
> {{HadoopRDD}}, and since the RDD is already cached, instead of creating a new
> iterator, it will just fetch it from the cached result. However, *this will
> skip the initialization of the IOContext associated with this particular
> thread*. And, when {{MT_12}} starts executing, it will try to initialize the
> {{MapOperator}}, but since the {{IOContext}} is not initialized, this will
> fail miserably.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)