[
https://issues.apache.org/jira/browse/TEZ-2442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15275815#comment-15275815
]
shanyu zhao commented on TEZ-2442:
----------------------------------
Thanks [~hitesh] for reviewing the patch!
◾Naming convention: "shuffle" - this should really be intermediate data and not
shuffle as tez supports non-shuffle edges.
[shanyu]: point taken. I guess this comes from the shuffle phase of MR where
data is moved from one task to another. I agree that intermediate data is a
more accurate description.
◾TEZ_FS_BASED_SHUFFLE_ENABLED seems like a global setting but it need not be.
Firstly, it should belong in Tezruntimeconfiguration as this Input/Output
related. Secondly, this should be tunable on a per-edge basis which is
currently not the case today. That said, we do need to figure out how to enable
the FsBasedShuffleDataCleaner and optimize for the case where it is not used at
all so a global flag to enable will likely be needed but something else to
configure on a per edge basis is also needed.
[shanyu]: Originally I have put it in TezRuntimeConfiguration, then since I
need to enable the FS cleaner in Tez AM I have to make it a global
configuration.
◾TEZ_FS_BASED_SHUFFLE_LOCATION_DEFAULT - Is there a reason why this cannot just
default to a sub-dir of the tez staging dir?
[shanyu]: this is good point. If the shuffle location is not provided, we can
use ".shuffle" sub dir of the staging dir.
◾Spurious change in
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
could be removed.
[shanyu]: will do.
FsBasedShuffleDataCleaner.java:
◾"FileSystem fs = path.getFileSystem(getConfig());" - this should be a one-time
call and not done every time a folder cleanup is invoked.
[shanyu]: will do.
◾"LOG.error("Failed to clean shuffle dir: " + ie);" - should specify which dir
failed to be deleted.
[shanyu]: will do.
◾deleting folder in serviceStop() - this is actually very incorrect. This
should only be done when the app unregisters and not on AM shutdown.
[shanyu]: The deleting happens in two cases: 1) app finishes/unregisters, this
is in handle(event) where the event is DAG_FINISHED; 2) AM shutdown. This is
the second case.
// No need to check if pathComponent starts with a certain prefix
// because in HDFS based shuffle it carries the fully qualified URL.
◾what about requiring the check for non-hdfs based shuffle?
[shanyu]: InputAttemptIdentifier does not have full context of Tez
configuration, and it needs not to. So we don't know if FS shuffle is enabled
or not. Furthermore, I don't see the necessity of checking the path component
here since all the code path generating component path is either using the
right prefix or using FS based shuffle.
Constructor<?> ctor = clazz.getConstructor(Configuration.class,
TaskContext.class);
◾this is an incompatible change for anyone who has overridden the
TEZ_RUNTIME_TASK_OUTPUT_MANAGER property.
[shanyu]: Yes, you are right. I'll fix this.
LOG.info("Using Filesystem based fetcher");
LOG.info("path=" + path.toString());
◾Log messages can be removed as they do not seem to be providing any useful
info. The fetcher specific info provides no context on what is being fetched
and the type of fetch should likely be covered as part of logging the actual
fetch details.
[shanyu]: will do.
"ShuffleUtils.isFsBasedShuffleEnabled(conf)"
◾it might be good to cache this value wherever possible instead of querying the
conf object multiple times
[shanyu]: Originally I have the caching here but caching it globally is
troublesome for unit tests. So the caller needs to do proper caching if needed.
I'll modify fetcher and fetcherorderedgrouped to do proper caching.
FileSystemFetchedInput.java:
◾toString function needs changing to denote local vs fs based
[shanyu]: based on the path we can tell if this is local vs fs based. for fs
based the inputFile is fully qualified FileSystem URL.
PipelinedSorter:
◾copyLocalFileToHdfs() - why would this be needed?
[shanyu]: because if there is only 1 spill we need to copy this local spill
file to File System. I'll rename it to copyLocalFileToFileSystem().
private static final String OUTPUT_LOCATION_ENV_NAME =
"TEZ_FS_BASED_SHUFFLE_LOCATION";
◾not sure I understand why we are allowing the shuffle location to be
overridden via the environment?
[shanyu]: Just providing another way to supply shuffle location. If we use an
AUX service in NM to do proper clean up, we can let NM generate the shuffle
location and properly clean it up. NM can provide the shuffle location via
environment.
String subPath = context.getApplicationId().toString() + Path.SEPARATOR
+ context.getDagIdentifier() + Path.SEPARATOR +
context.getUniqueIdentifier();
◾how does this scale for various FS impls when there are mulitple vertices and
each vertex could have a 100K tasks.
[shanyu]: are you worried about 100K sub folders in one HDFS folder? I'm going
to do a scaling test and share the results later.
I have not yet looked into the full internals of the changes to the actual
fetcher/output write logic. I am guessing Rajesh Balamohan and Siddharth Seth
may take a more indepth look.
Some general comments:
◾the failure heuristics for a DFS based intermediate output are completely
different to a local disk based output. The overall design to enable a more
reliable storage for intermediate data should therefore account for changing
the edge type to be PERSISTED_RELIABLE and use that as the defining factor of
whether to write data to DFS or local FS and also at the same time change the
failure heuristics to understand that a DFS is more reliable hence node
failures should not be considered as a case for re-running tasks, etc.
[shanyu]: You have a very good point here. One of the future works we can do is
to implement "persisted_reliable" edge based on the FS based shuffle. But the
purpose of this JIRA is not to add an additional edge type. Instead, it aims at
breaking the dependency on the MapReduce shuffle AUX service. In reality we can
put intermediate data in an HDFS which replication factor of one, and the error
heuristic should be exactly the same as "persisted" edge.
Again, thank you so much [~hitesh] for the detailed review!
> Support DFS based shuffle in addition to HTTP shuffle
> -----------------------------------------------------
>
> Key: TEZ-2442
> URL: https://issues.apache.org/jira/browse/TEZ-2442
> Project: Apache Tez
> Issue Type: Improvement
> Affects Versions: 0.5.3
> Reporter: Kannan Rajah
> Assignee: shanyu zhao
> Attachments: FS_based_shuffle_v2.pdf, Tez Shuffle using DFS.pdf,
> hdfs_broadcast_hack.txt, tez-2442-trunk.2.patch, tez-2442-trunk.3.patch,
> tez-2442-trunk.patch, tez_hdfs_shuffle.patch
>
>
> In Tez, Shuffle is a mechanism by which intermediate data can be shared
> between stages. Shuffle data is written to local disk and fetched from any
> remote node using HTTP. A DFS like MapR file system can support writing this
> shuffle data directly to its DFS using a notion of local volumes and retrieve
> it using HDFS API from remote node. The current Shuffle implementation
> assumes local data can only be managed by LocalFileSystem. So it uses
> RawLocalFileSystem and LocalDirAllocator. If we can remove this assumption
> and introduce an abstraction to manage local disks, then we can reuse most of
> the shuffle logic (store, sort) and inject a HDFS API based retrieval instead
> of HTTP.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)