[
https://issues.apache.org/jira/browse/TEZ-2442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274676#comment-15274676
]
Hitesh Shah commented on TEZ-2442:
----------------------------------
Comments:
- Naming convention: "shuffle" - this should really be intermediate data
and not shuffle as tez supports non-shuffle edges.
- TEZ_FS_BASED_SHUFFLE_ENABLED seems like a global setting but it need not
be. Firstly, it should belong in Tezruntimeconfiguration as this Input/Output
related. Secondly, this should be tunable on a per-edge basis which is
currently not the case today. That said, we do need to figure out how to enable
the FsBasedShuffleDataCleaner and optimize for the case where it is not used at
all so a global flag to enable will likely be needed but something else to
configure on a per edge basis is also needed.
- TEZ_FS_BASED_SHUFFLE_LOCATION_DEFAULT - Is there a reason why this cannot
just default to a sub-dir of the tez staging dir?
- Spurious change in
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
could be removed.
FsBasedShuffleDataCleaner.java:
- "FileSystem fs = path.getFileSystem(getConfig());" - this should be a
one-time call and not done every time a folder cleanup is invoked.
- "LOG.error("Failed to clean shuffle dir: " + ie);" - should specify which
dir failed to be deleted.
- deleting folder in serviceStop() - this is actually very incorrect. This
should only be done when the app unregisters and not on AM shutdown.
{code}
// No need to check if pathComponent starts with a certain prefix
// because in HDFS based shuffle it carries the fully qualified URL.
{code}
- what about requiring the check for non-hdfs based shuffle?
{code} Constructor<?> ctor = clazz.getConstructor(Configuration.class,
TaskContext.class); {code}
- this is an incompatible change for anyone who has overridden the
TEZ_RUNTIME_TASK_OUTPUT_MANAGER property.
{code}
LOG.info("Using Filesystem based fetcher");
LOG.info("path=" + path.toString());
{code}
- Log messages can be removed as they do not seem to be providing any useful
info. The fetcher specific info provides no context on what is being fetched
and the type of fetch should likely be covered as part of logging the actual
fetch details.
"ShuffleUtils.isFsBasedShuffleEnabled(conf)"
- it might be good to cache this value wherever possible instead of querying
the conf object multiple times
FileSystemFetchedInput.java:
- toString function needs changing to denote local vs fs based
PipelinedSorter:
- copyLocalFileToHdfs() - why would this be needed?
{code}
private static final String OUTPUT_LOCATION_ENV_NAME =
"TEZ_FS_BASED_SHUFFLE_LOCATION";
{code}
- not sure I understand why we are allowing the shuffle location to be
overridden via the environment?
{code}
String subPath = context.getApplicationId().toString() + Path.SEPARATOR
+ context.getDagIdentifier() + Path.SEPARATOR +
context.getUniqueIdentifier();
{code}
- how does this scale for various FS impls when there are mulitple vertices
and each vertex could have a 100K tasks.
I have not yet looked into the full internals of the changes to the actual
fetcher/output write logic. I am guessing [~rajesh.balamohan] and [~sseth] may
take a more indepth look.
Some general comments:
- the failure heuristics for a DFS based intermediate output are completely
different to a local disk based output. The overall design to enable a more
reliable storage for intermediate data should therefore account for changing
the edge type to be PERSISTED_RELIABLE and use that as the defining factor of
whether to write data to DFS or local FS and also at the same time change the
failure heuristics to understand that a DFS is more reliable hence node
failures should not be considered as a case for re-running tasks, etc.
> Support DFS based shuffle in addition to HTTP shuffle
> -----------------------------------------------------
>
> Key: TEZ-2442
> URL: https://issues.apache.org/jira/browse/TEZ-2442
> Project: Apache Tez
> Issue Type: Improvement
> Affects Versions: 0.5.3
> Reporter: Kannan Rajah
> Assignee: shanyu zhao
> Attachments: FS_based_shuffle_v2.pdf, Tez Shuffle using DFS.pdf,
> hdfs_broadcast_hack.txt, tez-2442-trunk.2.patch, tez-2442-trunk.3.patch,
> tez-2442-trunk.patch, tez_hdfs_shuffle.patch
>
>
> In Tez, Shuffle is a mechanism by which intermediate data can be shared
> between stages. Shuffle data is written to local disk and fetched from any
> remote node using HTTP. A DFS like MapR file system can support writing this
> shuffle data directly to its DFS using a notion of local volumes and retrieve
> it using HDFS API from remote node. The current Shuffle implementation
> assumes local data can only be managed by LocalFileSystem. So it uses
> RawLocalFileSystem and LocalDirAllocator. If we can remove this assumption
> and introduce an abstraction to manage local disks, then we can reuse most of
> the shuffle logic (store, sort) and inject a HDFS API based retrieval instead
> of HTTP.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)