[ 
https://issues.apache.org/jira/browse/TEZ-2001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajesh Balamohan updated TEZ-2001:
----------------------------------
    Attachment: TEZ-2001.4.patch

Addressing review comments

TEZ_RUNTIME_DISABLE_FINAL_MERGE_IN_SORTER - Should the config be a positive 
instead of a negative (ENABLE instead of DISABLE). That's normally easier to 
reason about.
- Done. Renamed to TEZ_RUNTIME_ENABLE_FINAL_MERGE_IN_SORTER and made changes 
accordingly in other places.

ShuffleUtils.generateEventOnSpill - Instead if sending in Path indexFile, can 
we send the TezIndexRecord to this API directly. For DefaultSorter / 
PipelinedSorter - this may already be cached and avoids a disk read. Other 
places it can be constructed before the API call.
- Fixed. 

"boolean sendEmptyPartitionDetails = conf.getBoolean(" - Send in as a parameter 
instead of reading from conf
- Fixed. 

In copySucceeded, the commit logic does not check if the specific spillId has 
previously been fetched. There's some parts of the patch which try handling 
this - like the getMapsForHost code. (This may be part of a future jira though)
"if (eventInfo != null && srcAttemptIdentifier.getAttemptNumber() > 0) {" - 
this fails for KILLED attempts as well, which I believe is intentional ?
- Right. This is intentional. 

After the exception is reported (shuffleScheduler.reportException), what should 
happen ? Should the rest of the code be executed, or should we just return.
- Fixed.  Added return statement. 

Similar to the failFast in copySucceeded, addKnownMapOutput should have the 
same check ? (fails earlier than after a copy is successful)
- Added fail fast. 

"if (oldId.getSpillEventId() == id.getSpillEventId()) {" - This is not checking 
versions for different eventIds. Ideally, there should be no repetition of 
event Ids for the same source due to previous fail fast code. (In the future - 
this becomes more important if we allow multiple attempts in case of 
deterministic spills)
- Yes, added TODO to revisit when deterministic spills are fixed. 

The sanity check inside LOG.debug - Don't think this is very useful. It's based 
on InputAttemptIdentifier.toString - which includes verison. The intent of 
de-duping is to make sure multiple versions don't exist. Could probably skip 
this entire check.
- Fixed. Removed this section. 

"+ "Pipelined shuffle will be disabled in this run,");" - In case of incorrect 
configs for FINAL_MERGE, which one wins - enable pipeline or disable final 
merge ? My first thought was that it's better to disable final merge if 
pipelining is enabled.
- Fixed. Checking this explicitly in OrderedPartitionedKVOutput itself. It is 
better to ensure that final merge option is disabled when pipelining is enabled 
& throw exceptions when needed. 
- Added test cases in TestOnFileSortedOutput.

In PipelineSorter - indexCacheList is populated only during merge. That could 
be done earlier during the spill to keep the IndexRecord in memory. 
(DefaultSorter already does this). Future jira item though.
- Addressed as part of fixing #2. 

DefaultSorter - using "spillFileIndexPaths.get" may not work. Spilled Records 
are cached in memory - and a single one is written to disk during final merge.
- Fixed. 

DefaultSorter - the finalOutputBytesCounter is not incremented if the final 
merge is disabled.
- Fixed. 

TezTaskOutputFiles - In the examples in the javadocs, please include the appId 
/ attemptId like it exists today.
- Added examples in javadoc. 

failureCounts in ShuffleScheduler - does anything need to change here ? It'll 
count each spill failing separately.
- Yes, did this intentionally for fail fast.  Need to revisit this as a part of 
failure-handling scenarios.

OPTIMIZE_LOCAL_FETCH will not work for PipelineShuffle. It tries to construct 
FileNames in the Fetcher, and then makes use of the InputIdentifier in the 
MergeManager. Fix here or in a follow up ?
- For e.g, it would try to fetch from 
output/attempt_1418684642047_0006_1_00_000000_0_10003_0/file.out when 
OPTIMIZE_LOCAL_FETCH is enabled. I haven't seen any issue here. Am I mising 
something?

Minor:
Speculative execution needs to be turned "off" missing in Javadoc
- Fixed. 

Changing remaining to a List from a Set in the Fetcher leads to some 
inefficiency - since the size of this list can be ~30, and remove() calls can 
be expensive. We may want to fix this later - by using the spillId in the 
hashCode - or a wrapping structure for just this.
ShuffleEventInfo should be a static class
- Fixed. 

ShuffleEventInfo.id - not sure what this is used for.
- Added only for debugging (toString())

DefaultSorter: Log a message if pipelining is enabled, but DefaultSorter is 
being used. Also a log message for config being used in terms of final spill.
- Added warn log message when pipelining is enabled with DefaultSorter. 

Nits:
"LOG.trace("Patch..incremental" - Remove Patch from the trace line
- Fixed.
addIdentifier - variable not required. 
- Fixed.

"LOG.info("Final merge disabled. Number of events sent " + events.size());" - 
Format similar to the individual spill Log message in PipelineSorter
- Removed this log.


> Support pipelined data transfer for ordered output
> --------------------------------------------------
>
>                 Key: TEZ-2001
>                 URL: https://issues.apache.org/jira/browse/TEZ-2001
>             Project: Apache Tez
>          Issue Type: Improvement
>            Reporter: Rajesh Balamohan
>            Assignee: Rajesh Balamohan
>         Attachments: TEZ-2001.1.patch, TEZ-2001.2.patch, TEZ-2001.3.patch, 
> TEZ-2001.4.patch, benchmark_q17_10TB.png, dag_plan.jpg
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to