[ 
https://issues.apache.org/jira/browse/TEZ-2587?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14625544#comment-14625544
 ] 

Siddharth Seth commented on TEZ-2587:
-------------------------------------

The InputFormat interface provides Configuration (with which the IF is 
initialized) as the only means to share information. We've been hit by this 
several times in the past - things like Hive checking if a file has changed 
after each record etc, since there's no concept of callbacks etc.
Tez doesn't define it's own InputFormat which could expose the Context to the 
user. Inputs can be used as a workaround, to then populate some information 
into the Configuration instance used by the InputFormat. IIRC, MRInput itself 
does not do this. Some MR layer may be setting up a task attempt id in the 
Configuration, but I don't think that's very usable.
One possible change that can be made is to have MRInput populate certain fields 
into Configuration from the InputContext. The fields which Hitesh mentioned 
would be good candidates.

> Tez should provide attemptId (or some other ways of linking multiple threads 
> for the same task)
> -----------------------------------------------------------------------------------------------
>
>                 Key: TEZ-2587
>                 URL: https://issues.apache.org/jira/browse/TEZ-2587
>             Project: Apache Tez
>          Issue Type: Bug
>            Reporter: Sergey Shelukhin
>            Assignee: Siddharth Seth
>
> There are at least 2 threads calling Hive code for every task; thread #1
> {noformat}
>       at 
> org.apache.hadoop.hive.ql.io.HiveInputFormat.getRecordReader(HiveInputFormat.java:303)
>       at 
> org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.initNextRecordReader(TezGroupedSplitsInputFormat.java:189)
>       at 
> org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat$TezGroupedSplitsRecordReader.<init>(TezGroupedSplitsInputFormat.java:131)
>       at 
> org.apache.hadoop.mapred.split.TezGroupedSplitsInputFormat.getRecordReader(TezGroupedSplitsInputFormat.java:97)
>       at 
> org.apache.tez.mapreduce.lib.MRReaderMapred.setupOldRecordReader(MRReaderMapred.java:152)
>       at 
> org.apache.tez.mapreduce.lib.MRReaderMapred.<init>(MRReaderMapred.java:73)
>       at 
> org.apache.tez.mapreduce.input.MultiMRInput.initFromEvent(MultiMRInput.java:177)
>       at 
> org.apache.tez.mapreduce.input.MultiMRInput.handleEvents(MultiMRInput.java:146)
>       at 
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.handleEvent(LogicalIOProcessorRuntimeTask.java:650)
>       at 
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.access$600(LogicalIOProcessorRuntimeTask.java:103)
>       at 
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask$1.runInternal(LogicalIOProcessorRuntimeTask.java:720)
>       at org.apache.tez.common.RunnableWithNdc.run(RunnableWithNdc.java:35)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> Thread #2
> {noformat}
>       at 
> org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:139)
>       at 
> org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:349)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:71)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable$1.run(TaskRunner2Callable.java:60)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at javax.security.auth.Subject.doAs(Subject.java:422)
>       at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:60)
>       at 
> org.apache.tez.runtime.task.TaskRunner2Callable.callInternal(TaskRunner2Callable.java:35)
>       at org.apache.tez.common.CallableWithNdc.call(CallableWithNdc.java:36)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
> {noformat}
> Right now, there's no way for these threads to communicate with each other or 
> share data.
> While processor callee has access to some context objects and stuff, the 
> input thread doesn't have access to anything.
> Hive used globals to work around that, however this is both ugly, and no 
> longer works if multiple tasks run in the same process.
> There should be some way for the threads to talk... either IO thread should 
> have access to ProcessorContext somehow, or maybe both should have attemptId 
> added to the supplied conf. Perhaps it's possible to add a global method to 
> get ProcessorContext by attemptId then, or if not we can arrange our own ugly 
> globals by attemptId.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to