[ 
https://issues.apache.org/jira/browse/TEZ-2001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14315301#comment-14315301
 ] 

Bikas Saha commented on TEZ-2001:
---------------------------------

Summarizing an offline email thread on this jira in comment format.

=========================================================================================================
From: Bikas

It wasn’t clear in which scenarios it would be useful. 
 
Because it looked like it would be useful when there is a single wave of 
running mappers and all the reducers are also running. Such jobs would very 
likely be small (due to small data) and the mappers would not really be 
spilling in that case. So there would be no pipelining of spills. Other cases 
are likely taken care of by slow start.
 
But looking at it in the light of skew can provide a different perspective. 
Lets say all mappers but a small number N have finished and all reducers have 
been slow started and have fetched the results of the other completed mappers. 
These N skewed mappers somehow had lots of data and so are slow and also 
spilling. This situation now looks like a single wave of mappers with all 
reducers running. Thus pipelining of shuffles would be useful in this case to 
overlay intermediate data pull with the mappers execution time. The 
effectiveness of the gains would be determined by the time taken by the mapper 
to run vs the time taken to fetch.
 
Is the above understanding correct? If so, then should we plan to implement the 
pipelining scheme in a manner that fits it better. That would mean, not 
pipelining all mappers but having the shuffle vertex manager monitor stats on 
the mappers. Then the shuffle vertex manager could figure out which mappers are 
stragglers due to data (and spilling). Then it would send them an event to 
initiate pipelining. The shuffle output would work as usual until it receives 
such a signal. Upon receiving this signal it would send a DM event for the 
already spilled fragments and continue to send more events for future spills 
until done. This way we will not pay the cost of pipelining for every mapper 
whilst being able to benefit from it for stragglers.
 
Btw, the change also improves shuffle output to not do a final merge but let 
the reducers do the merge (which they are doing anyways across mappers). Given 
the ability of the shuffle service to pipeline multiple chunks in bulk on the 
same connection, this would be a net win. This win is going to be beneficial to 
all mappers. So perhaps we can make that change separate from the pipelining 
change so that’s its independently tested and production ready.

=========================================================================================================
From: Rajesh

Right, in the patch posted for ordered case, not all mappers would end up in 
pipelined mode.  It works pretty much in the mode you have described.
1.      User sets up tez.runtime.pipelined-shuffle.enable (true/false)
2.      All mappers behave in normal way, until a spill (including final spill) 
is done in pipelinedsorter.
1.      This is similar to what you mentioned as "This way we will not pay the 
cost of pipelining for every mapper whilst being able to benefit from it for 
stragglers."
3.      When a spill happens in specific mappers (stragglers),
1.      DM event is sent out with spill details
2.      Final merge is completely avoided in mapper side for pipelined 
transfers. Less IO.
3.      Fetcher pulls the data in incremental fashion.
4.      It is quite possible that the final_event could be fetched a lot faster 
than the other spill events. Hence spill_id is maintained in DM events for 
pipelined cases and appropriate checks are done in Shuffle side.
4.      However, certain changes need to be done in consumer/fetcher side.
1.      What should we do when a source went down after downloading parts of 
the spills.  These spills could have been merged already in the consumer side 
(as merger thread runs in parallel).  
If we make changes to ShuffleVertexManager (I.e event getting triggered in VM 
to do pipelining), #4 mentioned above applies.  Plz let me know if I am missing 
something here.
 
Shuffle vertex manager monitoring stats on the mappers can be extremely 
beneficial to prioritize scheduling fetchers (I.e, identifying the mapper which 
is churning out lots of data & using it to download the data much earlier)

=========================================================================================================
From: Bikas

I see what you are saying. However if there is a large (ETL) job with lots of 
data per mapper then its likely they will all spill to some extent thus 
triggering extra DM events for pipelining without being able to benefit from 
it. In the worst case, we may not be able to turn this awesome feature on by 
default due to false positives.
 
My suggestion to turn this on by events to specific mappers instead of all 
mappers via config option would allow us to turn this on always (after testing) 
such that it does magic when needed without affecting the rest of the jobs.
 
Case 2) below. Does this mean that final merge avoidance in the mapper requires 
the use of pipelined DM events and cannot be done without them? Can that be 
changed by introducing a new encoding in the fetch URL that gives the same 
information as the different spills do? Even without overloading the URL, the 
payload of the DM event for shuffle could be enhanced to give all the info 
about spills in one final event so as to avoid the final merge in the mapper. 
Essentially the fetch mechanism becomes entirely spill based with the 
difference between pipelining and non-pipelining being sending all spill info 
in multiple events vs one event.
 
Case 4) below. Failures can only be handled in our re-execution based fault 
tolerance by making the compute deterministic. So the question is whether our 
spill is deterministic? If the same mapper runs on the same data and produces 
the same output then would we produce the same spills? If yes, we are good 
because the DM event spill encoding and reducer side checks would ensure that 
we don’t fetch any spills that have already been fetched. This would be similar 
to existing logic that does not fetch a mapper’s output if the mapper is rerun 
if that output has already been fetched from the previous mapper version. 
Essentially, the shuffle fetch side logic becomes mapper+spills dependent vs 
being just mapper dependent.
 
On the other hand, if spills are not deterministic then the solution would be 
to find out how to make them so. I would expect our spills to be deterministic 
because they are based on fixed sizes and for deterministic output records the 
size based calculations should trigger identical spills. Right? Alternatively, 
if we move to the mapper+spills model then the reducer would know if its 
pulling all spills or partial spills based on the metadata in the DM event. 
Partial spills could be fetched and merged separately from complete spills. In 
any case, the final KVs to the reducer is a view over multiple merged files. So 
we can afford to have multiple merge files. The partial spill merge file can be 
dropped based on retries of mappers. However, the better solution would be to 
make the spill deterministic.
 
This is in fact needed, because if pipelining is targeting the skew/straggler 
scenario, then it is very likely that a speculative instance will be launched 
for the straggling mapper. Thus we cannot limit the versions of such mappers to 
1 and also the reducer needs to be able to fetch the results from the 
speculated mapper if that finishes first (while its also sending pipelined 
events). Thus not being able to handle multiple retries of the mapper would 
make the feature a non-starter for its primary use case.

=========================================================================================================
From: Rajesh

Case 2)
- For pipelining, we definitely need additional DM events in order to 
communicate to fetchers that some partial data/spill is available.  If we 
encode this information in url, fetchers can begin pulling the data only after 
the source has finished. This would in a way, defeat the purpose of pipelining.
- But I see your point where in, we can avoid final merge even without 
pipelining + some additional changes to fetcher side.
- Let PipelinedSorter spill (assume 4 spills)
- Spills are stored in different directories locally.
- Skip final merge.
- Add number of spills in fetcher URL
- Final DM event (single event) is sent out with URL details
- Need to handle empty-partitions case, where we don’t set URL in DM event.
- Fetcher parses URL & spill details (as > 1)
- Fetcher behaves as if it got 4 different events and enqueues 4 items to its 
fetcher queue.
- When all the 4 parts are received, it sets SUCCESS flag for that attempt.
 
- If we have to enable pipelining (I.e to let fetchers pull spills early), we 
need to probably add more info to URL to indicate that more events would be 
coming through.
 
Case 4)
- I am not 100% sure if spills can be deterministic.  For majority of usecases 
it would be.  However, there can be other weird scenarios in Hive wherein this 
is not deterministic (may be sampling usecase or so).  Even if a mapper fails, 
it might trigger re-execution for the same data in another machine, but based 
on the random/sample function, it might end up generating different spill 
boundaries.  It might end up having different results in case of failures, but 
might be acceptable for whatever reasons. Gopal is the best person for 
providing more corner cases. :) 
- Idea was to start supporting partial fetch failures in follow up JIRA. At a 
high level, we can mark the partially fetched outputs so that they are not 
qualified for merging in parallel.  So, even when source dies down, we can end 
up fetching the entire set again without issues (but some amount of network 
wastage has happened in the previous fetching).
 
If we prioritize the fixes (i.e fixing first “avoid partially downloaded spills 
from getting merged”), then skipping final-merge for mappers would be possible 
and production ready in pipelinedsorter. :)

=========================================================================================================
From: Bikas

If the task is non-deterministic then Tez cannot guarantee deterministic 
results under failures. So if task non-determinism is making spills 
non-deterministic then IMO, that’s a won’t fix/unsupported. If spills are 
deterministic for deterministic tasks then we can make use that assumption to 
make a much better solution than setting aside partial merges and re-fetching 
the same data again from the re-run.

=========================================================================================================
From: Gopal

Tasks are logically deterministic within any partition/reducer output.

That’s an axiom, but there is no forcing function to ensure they are 
byte-for-byte deterministic because the spill indexes needn’t match up, because 
the shuffle handler does the mapping from index -> offsets. 

The current Hive model triggers 10% off-loads off the in-memory aggregations, 
to get a higher hit-rate for reductions instead collecting all keys as they are 
encountered, the 10% fit entirely depends on the traversal order of the buckets 
in an in-memory hash table.

We can revert to the slow-path of MRv2 and turn off all map-side in-memory 
reductions in Hive to get to predictable spilling, but it’s not the way we want 
to go. 

FYI, this is a non-issue right now, if you merely test out the code base but 
the hashtable traversal is not deterministically forced on JDK8  due to the 
removal of jdk.map.althashing.threshold.

=========================================================================================================
From: Siddharth

I'm not sure this should be unsupported. If tasks generated non deterministic 
output - either byte for byte, or the final output across different attempts 
(which is possible with some Hive non-deterministic operations like random) - 
then the number of attempts can just be set to 1. Once we support vertex level 
configuration, this will no longer apply to the entire DAG. 

There can be different strategies for handling failure - avoiding merge, 
keeping track of merged spills and not re-fetching them. The same applies to 
the Unordered case as well - and can be configured per vertex.

Using the VertexManager is nice - but seems like a follow on item, along with 
failure handling semantics.


> Support pipelined data transfer for ordered output
> --------------------------------------------------
>
>                 Key: TEZ-2001
>                 URL: https://issues.apache.org/jira/browse/TEZ-2001
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Rajesh Balamohan
>            Assignee: Rajesh Balamohan
>         Attachments: TEZ-2001.1.patch, TEZ-2001.2.patch
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to