[ https://issues.apache.org/jira/browse/HIVE-8457?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chao updated HIVE-8457: ----------------------- Description: Currently, on the Spark branch, each thread it is bound with a thread-local IOContext, which gets initialized when we generates an input {{HadoopRDD}}, and later used in {{MapOperator}}, {{FilterOperator}}, etc. And, given the introduction of HIVE-8118, we may have multiple downstream RDDs that share the same input {{HadoopRDD}}, and we would like to have the {{HadoopRDD}} to be cached, to avoid scanning the same table multiple times. A typical case would be like the following: {noformat} inputRDD inputRDD | | MT_11 MT_12 | | RT_1 RT_2 {noformat} Here, {{MT_11}} and {{MT_12}} are {{MapTran}} from a splitted {{MapWork}}, and {{RT_1}} and {{RT_2}} are two {{ReduceTran}}. Note that, this example is simplified, as we may also have {{ShuffleTran}} between {{MapTran}} and {{ReduceTran}}. When multiple Spark threads are running, {{MT_11} may be executed first, and it will ask for an iterator from the {{HadoopRDD}} will trigger the creation of the iterator, which in turn triggers the initialization of the {{IOContext}} associated with that particular thread. Now, before {{MT_12}} starts executing, it will also ask for an iterator from the {{HadoopRDD}}, and since the RDD is already cached, instead of creating a new iterator, it will just fetch it from the cached result. However, the problem is, this will skip the initialization of the IOContext associated with this particular thread. When {{MT_12}} starts executing, it will first initialize the {{MapOperator}}, but since the {{IOContext}} is not initialized, this will fail miserably. was: Currently, on the Spark branch, each thread it is bound with a thread-local IOContext, which gets initialized when we generates a input {{HadoopRDD}}, and later used in {{MapOperator}}, {{FilterOperator}}, etc. And, given the introduction of HIVE-8118, we may have multiple downstream RDDs that share the same input {{HadoopRDD}}, and we would like to have the {{HadoopRDD}} to be cached, to avoid scanning the same table multiple times. A typical case would be like the following: {noformat} inputRDD inputRDD | | MT_11 MT_12 | | RT_1 RT_2 {noformat} Here, {{MT_11}} and {{MT_12}} are {{MapTran}}s from a splitted {{MapWork}}, and {{RT_1}} and {{RT_2}} are two {{ReduceTran}}s. Note that, this example is simplified, as we may also have {{ShuffleTran}} between {{MapTran}} and {{ReduceTran}}. When multiple Spark threads are running, {{MT_11} may be executed first, and it will ask for an iterator from the {{HadoopRDD}} will trigger the creation of the iterator, which in turn triggers the initialization of the {{IOContext}} associated with that particular thread. Now, before {{MT_12}} starts executing, it will also ask for an iterator from the {{HadoopRDD}}, and since the RDD is already cached, instead of creating a new iterator, it will just fetch it from the cached result. However, the problem is, this will skip the initialization of the IOContext associated with this particular thread. When {{MT_12}} starts executing, it will first initialize the {{MapOperator}}, but since the {{IOContext}} is not initialized, this will fail miserably. > MapOperator initialization when multiple Spark threads is enabled. [Spark > Branch] > --------------------------------------------------------------------------------- > > Key: HIVE-8457 > URL: https://issues.apache.org/jira/browse/HIVE-8457 > Project: Hive > Issue Type: Bug > Components: Spark > Reporter: Chao > > Currently, on the Spark branch, each thread it is bound with a thread-local > IOContext, which gets initialized when we generates an input {{HadoopRDD}}, > and later used in {{MapOperator}}, {{FilterOperator}}, etc. > And, given the introduction of HIVE-8118, we may have multiple downstream > RDDs that share the same input {{HadoopRDD}}, and we would like to have the > {{HadoopRDD}} to be cached, to avoid scanning the same table multiple times. > A typical case would be like the following: > {noformat} > inputRDD inputRDD > | | > MT_11 MT_12 > | | > RT_1 RT_2 > {noformat} > Here, {{MT_11}} and {{MT_12}} are {{MapTran}} from a splitted {{MapWork}}, > and {{RT_1}} and {{RT_2}} are two {{ReduceTran}}. Note that, this example is > simplified, as we may also have {{ShuffleTran}} between {{MapTran}} and > {{ReduceTran}}. > When multiple Spark threads are running, {{MT_11} may be executed first, and > it will ask for an iterator from the {{HadoopRDD}} will trigger the creation > of the iterator, which in turn triggers the initialization of the > {{IOContext}} associated with that particular thread. > Now, before {{MT_12}} starts executing, it will also ask for an iterator from > the > {{HadoopRDD}}, and since the RDD is already cached, instead of creating a new > iterator, it will just fetch it from the cached result. However, the problem > is, this will skip the initialization of the IOContext associated with this > particular thread. When {{MT_12}} starts executing, it will first initialize > the {{MapOperator}}, but since the {{IOContext}} is not initialized, this > will fail miserably. -- This message was sent by Atlassian JIRA (v6.3.4#6332)