[
https://issues.apache.org/jira/browse/TEZ-2001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14315301#comment-14315301
]
Bikas Saha commented on TEZ-2001:
---------------------------------
Summarizing an offline email thread on this jira in comment format.
=========================================================================================================
From: Bikas
It wasn’t clear in which scenarios it would be useful.
Because it looked like it would be useful when there is a single wave of
running mappers and all the reducers are also running. Such jobs would very
likely be small (due to small data) and the mappers would not really be
spilling in that case. So there would be no pipelining of spills. Other cases
are likely taken care of by slow start.
But looking at it in the light of skew can provide a different perspective.
Lets say all mappers but a small number N have finished and all reducers have
been slow started and have fetched the results of the other completed mappers.
These N skewed mappers somehow had lots of data and so are slow and also
spilling. This situation now looks like a single wave of mappers with all
reducers running. Thus pipelining of shuffles would be useful in this case to
overlay intermediate data pull with the mappers execution time. The
effectiveness of the gains would be determined by the time taken by the mapper
to run vs the time taken to fetch.
Is the above understanding correct? If so, then should we plan to implement the
pipelining scheme in a manner that fits it better. That would mean, not
pipelining all mappers but having the shuffle vertex manager monitor stats on
the mappers. Then the shuffle vertex manager could figure out which mappers are
stragglers due to data (and spilling). Then it would send them an event to
initiate pipelining. The shuffle output would work as usual until it receives
such a signal. Upon receiving this signal it would send a DM event for the
already spilled fragments and continue to send more events for future spills
until done. This way we will not pay the cost of pipelining for every mapper
whilst being able to benefit from it for stragglers.
Btw, the change also improves shuffle output to not do a final merge but let
the reducers do the merge (which they are doing anyways across mappers). Given
the ability of the shuffle service to pipeline multiple chunks in bulk on the
same connection, this would be a net win. This win is going to be beneficial to
all mappers. So perhaps we can make that change separate from the pipelining
change so that’s its independently tested and production ready.
=========================================================================================================
From: Rajesh
Right, in the patch posted for ordered case, not all mappers would end up in
pipelined mode. It works pretty much in the mode you have described.
1. User sets up tez.runtime.pipelined-shuffle.enable (true/false)
2. All mappers behave in normal way, until a spill (including final spill)
is done in pipelinedsorter.
1. This is similar to what you mentioned as "This way we will not pay the
cost of pipelining for every mapper whilst being able to benefit from it for
stragglers."
3. When a spill happens in specific mappers (stragglers),
1. DM event is sent out with spill details
2. Final merge is completely avoided in mapper side for pipelined
transfers. Less IO.
3. Fetcher pulls the data in incremental fashion.
4. It is quite possible that the final_event could be fetched a lot faster
than the other spill events. Hence spill_id is maintained in DM events for
pipelined cases and appropriate checks are done in Shuffle side.
4. However, certain changes need to be done in consumer/fetcher side.
1. What should we do when a source went down after downloading parts of
the spills. These spills could have been merged already in the consumer side
(as merger thread runs in parallel).
If we make changes to ShuffleVertexManager (I.e event getting triggered in VM
to do pipelining), #4 mentioned above applies. Plz let me know if I am missing
something here.
Shuffle vertex manager monitoring stats on the mappers can be extremely
beneficial to prioritize scheduling fetchers (I.e, identifying the mapper which
is churning out lots of data & using it to download the data much earlier)
=========================================================================================================
From: Bikas
I see what you are saying. However if there is a large (ETL) job with lots of
data per mapper then its likely they will all spill to some extent thus
triggering extra DM events for pipelining without being able to benefit from
it. In the worst case, we may not be able to turn this awesome feature on by
default due to false positives.
My suggestion to turn this on by events to specific mappers instead of all
mappers via config option would allow us to turn this on always (after testing)
such that it does magic when needed without affecting the rest of the jobs.
Case 2) below. Does this mean that final merge avoidance in the mapper requires
the use of pipelined DM events and cannot be done without them? Can that be
changed by introducing a new encoding in the fetch URL that gives the same
information as the different spills do? Even without overloading the URL, the
payload of the DM event for shuffle could be enhanced to give all the info
about spills in one final event so as to avoid the final merge in the mapper.
Essentially the fetch mechanism becomes entirely spill based with the
difference between pipelining and non-pipelining being sending all spill info
in multiple events vs one event.
Case 4) below. Failures can only be handled in our re-execution based fault
tolerance by making the compute deterministic. So the question is whether our
spill is deterministic? If the same mapper runs on the same data and produces
the same output then would we produce the same spills? If yes, we are good
because the DM event spill encoding and reducer side checks would ensure that
we don’t fetch any spills that have already been fetched. This would be similar
to existing logic that does not fetch a mapper’s output if the mapper is rerun
if that output has already been fetched from the previous mapper version.
Essentially, the shuffle fetch side logic becomes mapper+spills dependent vs
being just mapper dependent.
On the other hand, if spills are not deterministic then the solution would be
to find out how to make them so. I would expect our spills to be deterministic
because they are based on fixed sizes and for deterministic output records the
size based calculations should trigger identical spills. Right? Alternatively,
if we move to the mapper+spills model then the reducer would know if its
pulling all spills or partial spills based on the metadata in the DM event.
Partial spills could be fetched and merged separately from complete spills. In
any case, the final KVs to the reducer is a view over multiple merged files. So
we can afford to have multiple merge files. The partial spill merge file can be
dropped based on retries of mappers. However, the better solution would be to
make the spill deterministic.
This is in fact needed, because if pipelining is targeting the skew/straggler
scenario, then it is very likely that a speculative instance will be launched
for the straggling mapper. Thus we cannot limit the versions of such mappers to
1 and also the reducer needs to be able to fetch the results from the
speculated mapper if that finishes first (while its also sending pipelined
events). Thus not being able to handle multiple retries of the mapper would
make the feature a non-starter for its primary use case.
=========================================================================================================
From: Rajesh
Case 2)
- For pipelining, we definitely need additional DM events in order to
communicate to fetchers that some partial data/spill is available. If we
encode this information in url, fetchers can begin pulling the data only after
the source has finished. This would in a way, defeat the purpose of pipelining.
- But I see your point where in, we can avoid final merge even without
pipelining + some additional changes to fetcher side.
- Let PipelinedSorter spill (assume 4 spills)
- Spills are stored in different directories locally.
- Skip final merge.
- Add number of spills in fetcher URL
- Final DM event (single event) is sent out with URL details
- Need to handle empty-partitions case, where we don’t set URL in DM event.
- Fetcher parses URL & spill details (as > 1)
- Fetcher behaves as if it got 4 different events and enqueues 4 items to its
fetcher queue.
- When all the 4 parts are received, it sets SUCCESS flag for that attempt.
- If we have to enable pipelining (I.e to let fetchers pull spills early), we
need to probably add more info to URL to indicate that more events would be
coming through.
Case 4)
- I am not 100% sure if spills can be deterministic. For majority of usecases
it would be. However, there can be other weird scenarios in Hive wherein this
is not deterministic (may be sampling usecase or so). Even if a mapper fails,
it might trigger re-execution for the same data in another machine, but based
on the random/sample function, it might end up generating different spill
boundaries. It might end up having different results in case of failures, but
might be acceptable for whatever reasons. Gopal is the best person for
providing more corner cases. :)
- Idea was to start supporting partial fetch failures in follow up JIRA. At a
high level, we can mark the partially fetched outputs so that they are not
qualified for merging in parallel. So, even when source dies down, we can end
up fetching the entire set again without issues (but some amount of network
wastage has happened in the previous fetching).
If we prioritize the fixes (i.e fixing first “avoid partially downloaded spills
from getting merged”), then skipping final-merge for mappers would be possible
and production ready in pipelinedsorter. :)
=========================================================================================================
From: Bikas
If the task is non-deterministic then Tez cannot guarantee deterministic
results under failures. So if task non-determinism is making spills
non-deterministic then IMO, that’s a won’t fix/unsupported. If spills are
deterministic for deterministic tasks then we can make use that assumption to
make a much better solution than setting aside partial merges and re-fetching
the same data again from the re-run.
=========================================================================================================
From: Gopal
Tasks are logically deterministic within any partition/reducer output.
That’s an axiom, but there is no forcing function to ensure they are
byte-for-byte deterministic because the spill indexes needn’t match up, because
the shuffle handler does the mapping from index -> offsets.
The current Hive model triggers 10% off-loads off the in-memory aggregations,
to get a higher hit-rate for reductions instead collecting all keys as they are
encountered, the 10% fit entirely depends on the traversal order of the buckets
in an in-memory hash table.
We can revert to the slow-path of MRv2 and turn off all map-side in-memory
reductions in Hive to get to predictable spilling, but it’s not the way we want
to go.
FYI, this is a non-issue right now, if you merely test out the code base but
the hashtable traversal is not deterministically forced on JDK8 due to the
removal of jdk.map.althashing.threshold.
=========================================================================================================
From: Siddharth
I'm not sure this should be unsupported. If tasks generated non deterministic
output - either byte for byte, or the final output across different attempts
(which is possible with some Hive non-deterministic operations like random) -
then the number of attempts can just be set to 1. Once we support vertex level
configuration, this will no longer apply to the entire DAG.
There can be different strategies for handling failure - avoiding merge,
keeping track of merged spills and not re-fetching them. The same applies to
the Unordered case as well - and can be configured per vertex.
Using the VertexManager is nice - but seems like a follow on item, along with
failure handling semantics.
> Support pipelined data transfer for ordered output
> --------------------------------------------------
>
> Key: TEZ-2001
> URL: https://issues.apache.org/jira/browse/TEZ-2001
> Project: Apache Tez
> Issue Type: Improvement
> Reporter: Rajesh Balamohan
> Assignee: Rajesh Balamohan
> Attachments: TEZ-2001.1.patch, TEZ-2001.2.patch
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)