[ 
https://issues.apache.org/jira/browse/HIVE-18301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16330190#comment-16330190
 ] 

liyunzhang commented on HIVE-18301:
-----------------------------------

[~lirui]:
{quote}My understanding is if the HadoopRDD is cached, the records are not 
produced by record reader and IOContext is not populated. Therefore the 
information in IOContext will be unavailable, e.g. the input path. This may 
cause problem because some operators need to take certain actions when input 
file changes – {{Operator::cleanUpInputFileChanged}}.
 So basically my point is we have to figure out the scenarios where IOContext 
is necessary. Then decide whether we should disable caching in such cases.
{quote}
Yes, if HadoopRDD is cached, it will not call
{code:java}
CombineHiveRecordReader#init
->HiveContextAwareRecordReader.initIOContext
->IOContext.setInputPath
{code}
. 
 It will use the cached result to call MapOperator#process(Writable value), so 
NPE is thrown because at that time IOContext.getInputPath return null. Now I 
just modify the code of MapOperator#process(Writable value) like 
[link|https://github.com/kellyzly/hive/commit/e81b7df572e2c543095f55dd160b428c355da2fb]

Here my question is 
 1. when {{context.getIoCxt().getInputPath() == null}}, I think in this 
situation, this record is from cache not from CombineHiveRecordReader. We need 
not to call MapOperator#cleanUpInputFileChanged because 
MapOperator#cleanUpInputFileChanged is only designed for one Mapper scanning 
multiple files(like CombineFileInputFormat) and multiple partitions and 
inputPath will change in these situations and need to call 
{{cleanUpInputFileChanged}} to reinitialize [some 
variables|https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java#L532]
 but we need not consider reinitialization for a cached record. Is my 
understanding right? If right, is there any other way to judge this is cached 
record or not except by {{context.getIoCxt().getInputPath() == null}}
 2. how to initiliaze IOContext#getInputPath in cache situation? we need this 
variable to reinitialize MapOperator::currentCtxs in 
MapOperator#initializeContexts
{code:java}
  public void initializeContexts() {
    Path fpath = getExecContext().getCurrentInputPath();
    String nominalPath = getNominalPath(fpath);
    Map<Operator<?>, MapOpCtx> contexts = opCtxMap.get(nominalPath);
    currentCtxs = contexts.values().toArray(new MapOpCtx[contexts.size()]);
  }
{code}
in the code, we store MapOpCtx for every MapOperator in opCtxMap(Map<String, 
Map<Operator<?>, MapOpCtx>>). In table with partitions, there will be multiple 
elements in opCtxMap( opCtxMap.keySet() is a set containing partition names). 
Currently I test on a table without partitions and can directly use 
opCtxMap.values().iterator().next() to initialize 
[context|https://github.com/kellyzly/hive/blob/HIVE-17486.4/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java#L713]
 and runs successfully in yarn mode. But I guess this is not right with 
partitioned table.

> Investigate to enable MapInput cache in Hive on Spark
> -----------------------------------------------------
>
>                 Key: HIVE-18301
>                 URL: https://issues.apache.org/jira/browse/HIVE-18301
>             Project: Hive
>          Issue Type: Bug
>            Reporter: liyunzhang
>            Assignee: liyunzhang
>            Priority: Major
>
> Before IOContext problem is found in MapTran when spark rdd cache is enabled 
> in HIVE-8920.
> so we disabled rdd cache in MapTran at 
> [SparkPlanGenerator|https://github.com/kellyzly/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java#L202].
>   The problem is IOContext seems not initialized correctly in the spark yarn 
> client/cluster mode and caused the exception like 
> {code}
> Job aborted due to stage failure: Task 93 in stage 0.0 failed 4 times, most 
> recent failure: Lost task 93.3 in stage 0.0 (TID 616, bdpe48): 
> java.lang.RuntimeException: Error processing row: 
> java.lang.NullPointerException
>       at 
> org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:165)
>       at 
> org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:48)
>       at 
> org.apache.hadoop.hive.ql.exec.spark.HiveMapFunctionResultList.processNextRecord(HiveMapFunctionResultList.java:27)
>       at 
> org.apache.hadoop.hive.ql.exec.spark.HiveBaseFunctionResultList.hasNext(HiveBaseFunctionResultList.java:85)
>       at 
> scala.collection.convert.Wrappers$JIteratorWrapper.hasNext(Wrappers.scala:42)
>       at 
> org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
>       at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
>       at org.apache.spark.scheduler.Task.run(Task.scala:85)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.hadoop.hive.ql.exec.AbstractMapOperator.getNominalPath(AbstractMapOperator.java:101)
>       at 
> org.apache.hadoop.hive.ql.exec.MapOperator.cleanUpInputFileChangedOp(MapOperator.java:516)
>       at 
> org.apache.hadoop.hive.ql.exec.Operator.cleanUpInputFileChanged(Operator.java:1187)
>       at 
> org.apache.hadoop.hive.ql.exec.MapOperator.process(MapOperator.java:546)
>       at 
> org.apache.hadoop.hive.ql.exec.spark.SparkMapRecordHandler.processRow(SparkMapRecordHandler.java:152)
>       ... 12 more
> Driver stacktrace:
> {code}
> in yarn client/cluster mode, sometimes 
> [ExecMapperContext#currentInputPath|https://github.com/kellyzly/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java#L109]
>  is null when rdd cach is enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to