[
https://issues.apache.org/jira/browse/TEZ-2001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rajesh Balamohan updated TEZ-2001:
----------------------------------
Attachment: TEZ-2001.4.patch
Addressing review comments
TEZ_RUNTIME_DISABLE_FINAL_MERGE_IN_SORTER - Should the config be a positive
instead of a negative (ENABLE instead of DISABLE). That's normally easier to
reason about.
- Done. Renamed to TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER and made changes
accordingly in other places.
ShuffleUtils.generateEventOnSpill - Instead if sending in Path indexFile, can
we send the TezIndexRecord to this API directly. For DefaultSorter /
PipelinedSorter - this may already be cached and avoids a disk read. Other
places it can be constructed before the API call.
- Fixed.
"boolean sendEmptyPartitionDetails = conf.getBoolean(" - Send in as a parameter
instead of reading from conf
- Fixed.
In copySucceeded, the commit logic does not check if the specific spillId has
previously been fetched. There's some parts of the patch which try handling
this - like the getMapsForHost code. (This may be part of a future jira though)
"if (eventInfo != null && srcAttemptIdentifier.getAttemptNumber() > 0) {" -
this fails for KILLED attempts as well, which I believe is intentional ?
- Right. This is intentional.
After the exception is reported (shuffleScheduler.reportException), what should
happen ? Should the rest of the code be executed, or should we just return.
- Fixed. Added return statement.
Similar to the failFast in copySucceeded, addKnownMapOutput should have the
same check ? (fails earlier than after a copy is successful)
- Added fail fast.
"if (oldId.getSpillEventId() == id.getSpillEventId()) {" - This is not checking
versions for different eventIds. Ideally, there should be no repetition of
event Ids for the same source due to previous fail fast code. (In the future -
this becomes more important if we allow multiple attempts in case of
deterministic spills)
- Yes, added TODO to revisit when deterministic spills are fixed.
The sanity check inside LOG.debug - Don't think this is very useful. It's based
on InputAttemptIdentifier.toString - which includes verison. The intent of
de-duping is to make sure multiple versions don't exist. Could probably skip
this entire check.
- Fixed. Removed this section.
"+ "Pipelined shuffle will be disabled in this run,");" - In case of incorrect
configs for FINAL_MERGE, which one wins - enable pipeline or disable final
merge ? My first thought was that it's better to disable final merge if
pipelining is enabled.
- Fixed. Checking this explicitly in OrderedPartitionedKVOutput itself. It is
better to ensure that final merge option is disabled when pipelining is enabled
& throw exceptions when needed.
- Added test cases in TestOnFileSortedOutput.
In PipelineSorter - indexCacheList is populated only during merge. That could
be done earlier during the spill to keep the IndexRecord in memory.
(DefaultSorter already does this). Future jira item though.
- Addressed as part of fixing #2.
DefaultSorter - using "spillFileIndexPaths.get" may not work. Spilled Records
are cached in memory - and a single one is written to disk during final merge.
- Fixed.
DefaultSorter - the finalOutputBytesCounter is not incremented if the final
merge is disabled.
- Fixed.
TezTaskOutputFiles - In the examples in the javadocs, please include the appId
/ attemptId like it exists today.
- Added examples in javadoc.
failureCounts in ShuffleScheduler - does anything need to change here ? It'll
count each spill failing separately.
- Yes, did this intentionally for fail fast. Need to revisit this as a part of
failure-handling scenarios.
OPTIMIZE_LOCAL_FETCH will not work for PipelineShuffle. It tries to construct
FileNames in the Fetcher, and then makes use of the InputIdentifier in the
MergeManager. Fix here or in a follow up ?
- For e.g, it would try to fetch from
output/attempt_1418684642047_0006_1_00_000000_0_10003_0/file.out when
OPTIMIZE_LOCAL_FETCH is enabled. I haven't seen any issue here. Am I mising
something?
Minor:
Speculative execution needs to be turned "off" missing in Javadoc
- Fixed.
Changing remaining to a List from a Set in the Fetcher leads to some
inefficiency - since the size of this list can be ~30, and remove() calls can
be expensive. We may want to fix this later - by using the spillId in the
hashCode - or a wrapping structure for just this.
ShuffleEventInfo should be a static class
- Fixed.
ShuffleEventInfo.id - not sure what this is used for.
- Added only for debugging (toString())
DefaultSorter: Log a message if pipelining is enabled, but DefaultSorter is
being used. Also a log message for config being used in terms of final spill.
- Added warn log message when pipelining is enabled with DefaultSorter.
Nits:
"LOG.trace("Patch..incremental" - Remove Patch from the trace line
- Fixed.
addIdentifier - variable not required.
- Fixed.
"LOG.info("Final merge disabled. Number of events sent " + events.size());" -
Format similar to the individual spill Log message in PipelineSorter
- Removed this log.
> Support pipelined data transfer for ordered output
> --------------------------------------------------
>
> Key: TEZ-2001
> URL: https://issues.apache.org/jira/browse/TEZ-2001
> Project: Apache Tez
> Issue Type: Improvement
> Reporter: Rajesh Balamohan
> Assignee: Rajesh Balamohan
> Attachments: TEZ-2001.1.patch, TEZ-2001.2.patch, TEZ-2001.3.patch,
> TEZ-2001.4.patch, benchmark_q17_10TB.png, dag_plan.jpg
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)