[
https://issues.apache.org/jira/browse/TEZ-3334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15950550#comment-15950550
]
Siddharth Seth commented on TEZ-3334:
-------------------------------------
Looked at parts of the patch which change existing code (except for the Ordered
case). Comments, questions, etc on the patch. (Don't think there's a lot of big
change request). The improved performance on shuffle, and deletion for
completed dags in long running sessions will be nice to try out - the second
one especially useful on compute heavy nodes which may not have a lot of local
storage.
- The config TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID - Does this belong to
TezConfiguration or TezRuntimeConfiguration? (On a kind of related note: Is the
config per AM or per dag, when session mode is in use)
- mapreduce_shuffle - Leave this as a constant in Constants. Also add a
constant for tez_shuffle, so that the possible values are easy to find. (Or
just doc it in the configuration class)
- TEZ_AM_DAG_DELETE_ENABLED - The property is not really used. Could be renamed
to something like tez.dag.cleanup.after.dag.completion
- TEZ_AM_DELETION_TRACKER_CLASS - Not sure why this is needed. Can
cleanupEnabled=true mean set up the default class. false=Don't set it up or
setup a NoOp implementation of the interface
- DagDeletionRunnable accesses TezRuntimeUtils, whcih is in
tez-runtime-library. Ideally tez-dag should not be accessing anything from
tez-runtime-library. I believe there's 2 places where it is currently used,
which is why the dependency exists. Not adding another one is a nice to have.
- Does the cleanup have any implications on the next dag being submitted to the
session? It doesn't look like it will - other than some threads running to make
http calls.
- In DeletionTracker, can we use a CachedThreadPool with a max size based on
the configuration, and a min size of 0, with a thread TTL of 10-15 seconds. The
threads are only required after a dag completes.
- LocalContainerLauncher: shufflePort=..deserialize. Will this information
always be available from the environment?
- shouldDelete is read but not respected
- TezContainerLauncherImpl - ByteBuffer portInfo
=response.getAllServicesMetaData().get(conf.get(TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID,TezConfiguration.TEZ_AM_SHUFFLE_AUXILIARY_SERVICE_ID_DEFAULT));
- Could be split into multiple calls, in case there's a possibility of a NPE.
- AMContainerHelpers - auxiliaryService is read from Configuration each time.
This should be passed in as a parameter, and read only once somewhere higher up
in the dag.
- Are all the new dependencies required? (nm, mapreduce, etc)
- TestShuffleHandlerJobs - avoid using "/tmp" (think this is on hdfs?).
Parameter name mrrTez - should really be tez*. Some more tests to exercise the
UnorderedCase would be useful. Are the tests exercising multiple partition
fetches at the same time?
- constructURL -> &dag= . rename to &dagIdx=?
- Bunch of code has moved from ShuffleUtils to TezRuntimeUtils?
- Existing: ShuffleUtils deserializes shuffleMetadata for each event that it
generates. Seems a little unnecessary.
- ShuffleManager: has moved to using a BitSet instead of a ConcurrentSet. May
need to look at the synchronization points required for the bitset. (access to
the map had some synchronized and some non synchronized access)
- Variable renames. pendingInputsOfOnePartition is no longer a valid name.
- Configuration seems to be passed in as a parameter to several classes. This
was explicitly avoided, at least an attempt was made to avoid this. Can only
the relevant bit of information be passed as parameters, instead of the entire
Configuration object.
- TezTaskOutputFiles - Configuration should not be passed in. Should we just
add a new implementation of TezTaskOuutput - 1 for the MR shuffle handler,
another for the Tez Shuffle handler. (inheriting from a common class with a
small change for the dag directory)
- Will all source tasks belonging to the same host always send over the same
partitions? It looks like that's what the Fetcher is requesting.
- In optimizeLocalFetch - report success only after an entire source is done
(i.e. all partitions for it). Otherwise the same input may be reported multiple
times, which could be problematic.
-
{code}
- // Sanity check
- // we are guaranteed that key is not null
- if (srcAttemptsRemaining.get(srcAttemptId.toString()) == null) {
- // wrongMapErrs.increment(1);
- LOG.warn("Invalid input. Received output for headerPathComponent: "
- + pathComponent + "nextRemainingSrcAttemptId: "
- + getNextRemainingAttempt() + ", mappedSrcAttemptId: " +
srcAttemptId);
- return false;
- }
{code}
Why was this removed?
- InputHost.clearAndGetOnePartition - rename. This is no longer giving back a
single partition only.
- Unrelated: numDmeEvents.addAndGet seems to be miscounted
- What impact does this have on the error reporting mechanics in the
OrderedCase?
- ShuffleInputEventHandler.processCompositeRoutedDataMovementEvent - this sends
empty partition information, and then sends a CompositeEvent which could have
the same partitions (previously reported as empty). Does this have any impact?
- Related. This will lead to additional entries in shuffleInfoEventsMap, even
after the entire shuffle has completed. Not sure if this will causes issues
- pendingInputsOfOnePartition - name is likely not valid any longer
- {code}
+ if(input instanceof CompositeInputAttemptIdentifier) {
+ // LLL Is there an off by 0 case here. i.e. getInputIdentiferCount
will be set to 1 for a regular event.
+ // We start scanning the bitst early. Also depends on how
completedInputSet is setup.
+ if (completedInputSet.nextClearBit(input.getInputIdentifier()) >=
+ input.getInputIdentifier() + ((CompositeInputAttemptIdentifier)
input).getInputIdentifierCount()) {
+ inputIter.remove();
+ continue;
+ }
+ } else {
+ if (completedInputSet.get(input.getInputIdentifier())) {
+ inputIter.remove();
+ continue;
+ }
{code}
Is there an off by 1 error here? Specifically. input.getInputIdentifier() +
((CompositeInputAttemptIdentifier) input).getInputIdentifierCount(). Think
getInputIdentifierCount is always at least 1.
- Eventually, plan to move to a cleaner Tez specific inteface, (and using the
current one with jobId=, reduce= as a backward compatibility layer
> Tez Custom Shuffle Handler
> --------------------------
>
> Key: TEZ-3334
> URL: https://issues.apache.org/jira/browse/TEZ-3334
> Project: Apache Tez
> Issue Type: New Feature
> Reporter: Jonathan Eagles
> Attachments: TEZ-3334.1.patch
>
>
> For conditions where auto-parallelism is reduced (e.g. TEZ-3222), a custom
> shuffle handler could help reduce the number of fetches and could more
> efficiently fetch data. In particular if a reducer is fetching 100 pieces
> serially from the same mapper it could do this in one fetch call.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)