[ 
https://issues.apache.org/jira/browse/TEZ-2442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15275815#comment-15275815
 ] 

shanyu zhao commented on TEZ-2442:
----------------------------------

Thanks [~hitesh] for reviewing the patch!

◾Naming convention: "shuffle" - this should really be intermediate data and not 
shuffle as tez supports non-shuffle edges.
[shanyu]: point taken. I guess this comes from the shuffle phase of MR where 
data is moved from one task to another. I agree that intermediate data is a 
more accurate description.

◾TEZ_FS_BASED_SHUFFLE_ENABLED seems like a global setting but it need not be. 
Firstly, it should belong in Tezruntimeconfiguration as this Input/Output 
related. Secondly, this should be tunable on a per-edge basis which is 
currently not the case today. That said, we do need to figure out how to enable 
the FsBasedShuffleDataCleaner and optimize for the case where it is not used at 
all so a global flag to enable will likely be needed but something else to 
configure on a per edge basis is also needed.
[shanyu]: Originally I have put it in TezRuntimeConfiguration, then since I 
need to enable the FS cleaner in Tez AM I have to make it a global 
configuration.

◾TEZ_FS_BASED_SHUFFLE_LOCATION_DEFAULT - Is there a reason why this cannot just 
default to a sub-dir of the tez staging dir?
[shanyu]: this is good point. If the shuffle location is not provided, we can 
use ".shuffle" sub dir of the staging dir.

◾Spurious change in 
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
 could be removed.
[shanyu]: will do.

FsBasedShuffleDataCleaner.java:
◾"FileSystem fs = path.getFileSystem(getConfig());" - this should be a one-time 
call and not done every time a folder cleanup is invoked.
[shanyu]: will do.

◾"LOG.error("Failed to clean shuffle dir: " + ie);" - should specify which dir 
failed to be deleted.
[shanyu]: will do.

◾deleting folder in serviceStop() - this is actually very incorrect. This 
should only be done when the app unregisters and not on AM shutdown.
[shanyu]: The deleting happens in two cases: 1) app finishes/unregisters, this 
is in handle(event) where the event is DAG_FINISHED; 2) AM shutdown. This is 
the second case.

// No need to check if pathComponent starts with a certain prefix
// because in HDFS based shuffle it carries the fully qualified URL.

◾what about requiring the check for non-hdfs based shuffle?
[shanyu]: InputAttemptIdentifier does not have full context of Tez 
configuration, and it needs not to. So we don't know if FS shuffle is enabled 
or not. Furthermore, I don't see the necessity of checking the path component 
here since all the code path generating component path is either using the 
right prefix or using FS based shuffle.

      Constructor<?> ctor = clazz.getConstructor(Configuration.class, 
TaskContext.class); 
◾this is an incompatible change for anyone who has overridden the 
TEZ_RUNTIME_TASK_OUTPUT_MANAGER property.
[shanyu]: Yes, you are right. I'll fix this.

      LOG.info("Using Filesystem based fetcher");

      LOG.info("path=" + path.toString());

◾Log messages can be removed as they do not seem to be providing any useful 
info. The fetcher specific info provides no context on what is being fetched 
and the type of fetch should likely be covered as part of logging the actual 
fetch details.
[shanyu]: will do.

"ShuffleUtils.isFsBasedShuffleEnabled(conf)"
◾it might be good to cache this value wherever possible instead of querying the 
conf object multiple times
[shanyu]: Originally I have the caching here but caching it globally is 
troublesome for unit tests. So the caller needs to do proper caching if needed. 
I'll modify fetcher and fetcherorderedgrouped to do proper caching.

FileSystemFetchedInput.java:
◾toString function needs changing to denote local vs fs based
[shanyu]: based on the path we can tell if this is local vs fs based. for fs 
based the inputFile is fully qualified FileSystem URL.

PipelinedSorter: 
◾copyLocalFileToHdfs() - why would this be needed?
[shanyu]: because if there is only 1 spill we need to copy this local spill 
file to File System. I'll rename it to copyLocalFileToFileSystem().

  private static final String OUTPUT_LOCATION_ENV_NAME = 
"TEZ_FS_BASED_SHUFFLE_LOCATION";

◾not sure I understand why we are allowing the shuffle location to be 
overridden via the environment?
[shanyu]: Just providing another way to supply shuffle location. If we use an 
AUX service in NM to do proper clean up, we can let NM generate the shuffle 
location and properly clean it up. NM can provide the shuffle location via 
environment.

String subPath = context.getApplicationId().toString() + Path.SEPARATOR
                + context.getDagIdentifier() + Path.SEPARATOR + 
context.getUniqueIdentifier();

◾how does this scale for various FS impls when there are mulitple vertices and 
each vertex could have a 100K tasks.
[shanyu]: are you worried about 100K sub folders in one HDFS folder? I'm going 
to do a scaling test and share the results later.

I have not yet looked into the full internals of the changes to the actual 
fetcher/output write logic. I am guessing Rajesh Balamohan and Siddharth Seth 
may take a more indepth look. 

Some general comments: 
◾the failure heuristics for a DFS based intermediate output are completely 
different to a local disk based output. The overall design to enable a more 
reliable storage for intermediate data should therefore account for changing 
the edge type to be PERSISTED_RELIABLE and use that as the defining factor of 
whether to write data to DFS or local FS and also at the same time change the 
failure heuristics to understand that a DFS is more reliable hence node 
failures should not be considered as a case for re-running tasks, etc.
[shanyu]: You have a very good point here. One of the future works we can do is 
to implement "persisted_reliable" edge based on the FS based shuffle. But the 
purpose of this JIRA is not to add an additional edge type. Instead, it aims at 
breaking the dependency on the MapReduce shuffle AUX service. In reality we can 
put intermediate data in an HDFS which replication factor of one, and the error 
heuristic should be exactly the same as "persisted" edge.

Again, thank you so much [~hitesh] for the detailed review!


> Support DFS based shuffle in addition to HTTP shuffle
> -----------------------------------------------------
>
>                 Key: TEZ-2442
>                 URL: https://issues.apache.org/jira/browse/TEZ-2442
>             Project: Apache Tez
>          Issue Type: Improvement
>    Affects Versions: 0.5.3
>            Reporter: Kannan Rajah
>            Assignee: shanyu zhao
>         Attachments: FS_based_shuffle_v2.pdf, Tez Shuffle using DFS.pdf, 
> hdfs_broadcast_hack.txt, tez-2442-trunk.2.patch, tez-2442-trunk.3.patch, 
> tez-2442-trunk.patch, tez_hdfs_shuffle.patch
>
>
> In Tez, Shuffle is a mechanism by which intermediate data can be shared 
> between stages. Shuffle data is written to local disk and fetched from any 
> remote node using HTTP. A DFS like MapR file system can support writing this 
> shuffle data directly to its DFS using a notion of local volumes and retrieve 
> it using HDFS API from remote node. The current Shuffle implementation 
> assumes local data can only be managed by LocalFileSystem. So it uses 
> RawLocalFileSystem and LocalDirAllocator. If we can remove this assumption 
> and introduce an abstraction to manage local disks, then we can reuse most of 
> the shuffle logic (store, sort) and inject a HDFS API based retrieval instead 
> of HTTP.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to