[ 
https://issues.apache.org/jira/browse/TEZ-1157?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14110166#comment-14110166
 ] 

Siddharth Seth commented on TEZ-1157:
-------------------------------------

Comments on the patch. Since this WIP, I'm guessing some of this will anyway 
end up getting fixed in the final patch.

- isDebugEnabled = LOG.isDebugEnabled() || true; | Needs to go
- Creating the list of localDisks in each Fetcher - this can be done once in 
ShuffleManager and passed in. Similarly for the localFS, reading 
sharedFetchEnabled from the Configuration etc. Also, should this list be 
sorted, in case YARN provides the list in a random order ?
- SharedInputAttemptIdentifier in ShuffleInputEventHandlerImpl - not needed, 
and never used
- this.useSharedInputs = (inputContext.getTaskAttemptNumber() == 0); | This 
will end up triggering the code path for Partitioned Output as well. Looks like 
we need Edge information in *Context.
- final boolean shareInput = this.useSharedInputs; | Not needed
- ShuffleManager.pendingLocks - not needed. outputPaths not currently used, but 
can be passed to individual Fetchers
- if (created == false && !lockFile.exists()) { | Would be useful to log a 
message here. This is effectively an error right ?

The important stuff
- The partial fetch use case seems fairly important, given that the fetching 
task may start before all the sources are complete. The list of inputs seen by 
different tasks (for a single host) can be very different.
- In getLock - I'm not sure what the purpose of obtaining a shared lock is. 
Shouldn't this just return, so that the fetcher thread is freed up - and could 
potentially be allocated to another host. If there aren't any other hosts, 
control would eventually come back to the same loop. Also, once a shared lock 
is obtained - it's highly unlikely that anyone else will be able to obtain an 
exclusive lock - so subsequent fetches will be http based.
- In doSharedFetch - if (lock == null) { <- This should never happen, at least 
if a shared lock is being obtained.
- In doSharedFetch - if an exclusive lock is obtained, and some inputs exist 
(some other task got the exclusive lock at some point) - this falls back to an 
http fetch. If the attempt list to a task is different (when starting at 
different times, or limited by the amount of work given to each fetcher), this 
will end up falling back to http fetch.
- if(inputs != srcAttempts.size()) { (and other similar checks) <- Is this 
check sufficient to use localDiskFetch ? The number of files found may match, 
but the actual files on disk may belong to different source identifiers. In 
this case, the local fetch will likely end up doing a partial fetch, and 
reporting the rest as pending (not sure if it reports failures at this point)
- Also, should setupLocalDiskFetch be called from within an exclusive lock ? 
That would cause each of the tasks to read the data serially.

- One more bit, which I think may be a problem is that the same attempt can end 
up being fetched multiple times in case of failure. (This may affect direct 
local fetch as well). The file being created may end up overwriting another 
file. Since the source tasks may have run on different nodes, there's a 
possibility of hitting this (event after we end up moving to assigning a single 
host to a single fetcher, instead of the current behaviour of a host going to 
whichever fetcher is available).

> Optimize broadcast :- Tasks pertaining to same job in same machine should not 
> download multiple copies of broadcast data
> ------------------------------------------------------------------------------------------------------------------------
>
>                 Key: TEZ-1157
>                 URL: https://issues.apache.org/jira/browse/TEZ-1157
>             Project: Apache Tez
>          Issue Type: Sub-task
>            Reporter: Rajesh Balamohan
>            Assignee: Gopal V
>              Labels: performance
>         Attachments: TEZ-1152.WIP.patch, TEZ-1157.3.WIP.patch, 
> TEZ-broadcast-shuffle+vertex-parallelism.patch
>
>
> Currently tasks (belonging to same job) running in the same machine download 
> its own copy of broadcast data.  Optimization could be to  download one copy 
> in the machine, and the rest of the tasks can refer to this downloaded copy.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to