[
https://issues.apache.org/jira/browse/TEZ-2001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14336814#comment-14336814
]
Siddharth Seth commented on TEZ-2001:
-------------------------------------
Comments on the patch
- TEZ_RUNTIME_DISABLE_FINAL_MERGE_IN_SORTER - Should the config be a positive
instead of a negative (ENABLE instead of DISABLE). That's normally easier to
reason about.
- ShuffleUtils.generateEventOnSpill - Instead if sending in Path indexFile, can
we send the TezIndexRecord to this API directly. For DefaultSorter /
PipelinedSorter - this may already be cached and avoids a disk read. Other
places it can be constructed before the API call.
- "boolean sendEmptyPartitionDetails = conf.getBoolean(" - Send in as a
parameter instead of reading from conf
- In copySucceeded, the commit logic does not check if the specific spillId has
previously been fetched. There's some parts of the patch which try handling
this - like the getMapsForHost code. (This may be part of a future jira though)
- "if (eventInfo != null && srcAttemptIdentifier.getAttemptNumber() > 0) {" -
this fails for KILLED attempts as well, which I believe is intentional ?
- After the exception is reported (shuffleScheduler.reportException), what
should happen ? Should the rest of the code be executed, or should we just
return.
- Similar to the failFast in copySucceeded, addKnownMapOutput should have the
same check ? (fails earlier than after a copy is successful)
- "if (oldId.getSpillEventId() == id.getSpillEventId()) {" - This is not
checking versions for different eventIds. Ideally, there should be no
repetition of event Ids for the same source due to previous fail fast code. (In
the future - this becomes more important if we allow multiple attempts in case
of deterministic spills)
- The sanity check inside LOG.debug - Don't think this is very useful. It's
based on InputAttemptIdentifier.toString - which includes verison. The intent
of de-duping is to make sure multiple versions don't exist. Could probably skip
this entire check.
- "+ "Pipelined shuffle will be disabled in this run,");" - In case of
incorrect configs for FINAL_MERGE, which one wins - enable pipeline or disable
final merge ? My first thought was that it's better to disable final merge if
pipelining is enabled.
- In PipelineSorter - indexCacheList is populated only during merge. That could
be done earlier during the spill to keep the IndexRecord in memory.
(DefaultSorter already does this). Future jira item though.
- DefaultSorter - using "spillFileIndexPaths.get" may not work. Spilled Records
are cached in memory - and a single one is written to disk during final merge.
- DefaultSorter - the finalOutputBytesCounter is not incremented if the final
merge is disabled.
- TezTaskOutputFiles - In the examples in the javadocs, please include the
appId / attemptId like it exists today.
- failureCounts in ShuffleScheduler - does anything need to change here ? It'll
count each spill failing separately.
- OPTIMIZE_LOCAL_FETCH will not work for PipelineShuffle. It tries to construct
FileNames in the Fetcher, and then makes use of the InputIdentifier in the
MergeManager. Fix here or in a follow up ?
Minor:
- Speculative execution needs to be turned "off" missing in Javadoc
- Changing remaining to a List from a Set in the Fetcher leads to some
inefficiency - since the size of this list can be ~30, and remove() calls can
be expensive. We may want to fix this later - by using the spillId in the
hashCode - or a wrapping structure for just this.
- ShuffleEventInfo should be a static class
- ShuffleEventInfo.id - not sure what this is used for.
- DefaultSorter: Log a message if pipelining is enabled, but DefaultSorter is
being used. Also a log message for config being used in terms of final spill.
Nits:
- "LOG.trace("Patch..incremental" - Remove Patch from the trace line
- addIdentifier - variable not required.
- "LOG.info("Final merge disabled. Number of events sent " + events.size());" -
Format similar to the individual spill Log message in PipelineSorter
Future Jiras:
- Fix optimize local fetch for pipelined shuffle
- Allow multiple source tasks - (Leave it up to users to enable / disable the
feature based on deterministic spill behaviour)
- Fault tolerance strategies - merge + restart task / merge on entire input
complete.
> Support pipelined data transfer for ordered output
> --------------------------------------------------
>
> Key: TEZ-2001
> URL: https://issues.apache.org/jira/browse/TEZ-2001
> Project: Apache Tez
> Issue Type: Improvement
> Reporter: Rajesh Balamohan
> Assignee: Rajesh Balamohan
> Attachments: TEZ-2001.1.patch, TEZ-2001.2.patch, TEZ-2001.3.patch,
> benchmark_q17_10TB.png, dag_plan.jpg
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)