[ 
https://issues.apache.org/jira/browse/TEZ-2442?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274676#comment-15274676
 ] 

Hitesh Shah commented on TEZ-2442:
----------------------------------

Comments:
   - Naming convention:  "shuffle" - this should really be intermediate data 
and not shuffle as tez supports non-shuffle edges. 
   - TEZ_FS_BASED_SHUFFLE_ENABLED seems like a global setting but it need not 
be. Firstly, it should belong in Tezruntimeconfiguration as this Input/Output 
related. Secondly, this should be tunable on a per-edge basis which is 
currently not the case today. That said, we do need to figure out how to enable 
the FsBasedShuffleDataCleaner and optimize for the case where it is not used at 
all so a global flag to enable will likely be needed but something else to 
configure on a per edge basis is also needed.
   - TEZ_FS_BASED_SHUFFLE_LOCATION_DEFAULT - Is there a reason why this cannot 
just default to a sub-dir of the tez staging dir? 

   - Spurious change in 
tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/Constants.java
 could be removed. 

FsBasedShuffleDataCleaner.java:
  - "FileSystem fs = path.getFileSystem(getConfig());" - this should be a 
one-time call and not done every time a folder cleanup is invoked. 
  -  "LOG.error("Failed to clean shuffle dir: " + ie);" - should specify which 
dir failed to be deleted.
  - deleting folder in serviceStop() - this is actually very incorrect. This 
should only be done when the app unregisters and not on AM shutdown.

{code}
// No need to check if pathComponent starts with a certain prefix
// because in HDFS based shuffle it carries the fully qualified URL.
{code}
  - what about requiring the check for non-hdfs based shuffle? 

{code}      Constructor<?> ctor = clazz.getConstructor(Configuration.class, 
TaskContext.class); {code}
  - this is an incompatible change for anyone who has overridden the 
TEZ_RUNTIME_TASK_OUTPUT_MANAGER property. 

{code}
      LOG.info("Using Filesystem based fetcher");

      LOG.info("path=" + path.toString());
{code}
  - Log messages can be removed as they do not seem to be providing any useful 
info. The fetcher specific info provides no context on what is being fetched 
and the type of fetch should likely be covered as part of logging the actual 
fetch details. 

"ShuffleUtils.isFsBasedShuffleEnabled(conf)"
   - it might be good to cache this value wherever possible instead of querying 
the conf object multiple times

FileSystemFetchedInput.java:
   - toString function needs changing to denote local vs fs based 

 PipelinedSorter: 
   - copyLocalFileToHdfs() - why would this be needed? 

{code}
  private static final String OUTPUT_LOCATION_ENV_NAME = 
"TEZ_FS_BASED_SHUFFLE_LOCATION";
{code}
  - not sure I understand why we are allowing the shuffle location to be 
overridden via the environment? 

{code}
String subPath = context.getApplicationId().toString() + Path.SEPARATOR
                + context.getDagIdentifier() + Path.SEPARATOR + 
context.getUniqueIdentifier();
{code}
  - how does this scale for various FS impls when there are mulitple vertices 
and each vertex could have a 100K tasks. 

I have not yet looked into the full internals of the changes to the actual 
fetcher/output write logic. I am guessing [~rajesh.balamohan] and [~sseth] may 
take a more indepth look. 

Some general comments: 
  - the failure heuristics for a DFS based intermediate output are completely 
different to a local disk based output. The overall design to enable a more 
reliable storage for intermediate data should therefore account for changing 
the edge type to be PERSISTED_RELIABLE and use that as the defining factor of 
whether to write data to DFS or local FS and also at the same time change the 
failure heuristics to understand that a DFS is more reliable hence node 
failures should not be considered as a case for re-running tasks, etc. 








  





> Support DFS based shuffle in addition to HTTP shuffle
> -----------------------------------------------------
>
>                 Key: TEZ-2442
>                 URL: https://issues.apache.org/jira/browse/TEZ-2442
>             Project: Apache Tez
>          Issue Type: Improvement
>    Affects Versions: 0.5.3
>            Reporter: Kannan Rajah
>            Assignee: shanyu zhao
>         Attachments: FS_based_shuffle_v2.pdf, Tez Shuffle using DFS.pdf, 
> hdfs_broadcast_hack.txt, tez-2442-trunk.2.patch, tez-2442-trunk.3.patch, 
> tez-2442-trunk.patch, tez_hdfs_shuffle.patch
>
>
> In Tez, Shuffle is a mechanism by which intermediate data can be shared 
> between stages. Shuffle data is written to local disk and fetched from any 
> remote node using HTTP. A DFS like MapR file system can support writing this 
> shuffle data directly to its DFS using a notion of local volumes and retrieve 
> it using HDFS API from remote node. The current Shuffle implementation 
> assumes local data can only be managed by LocalFileSystem. So it uses 
> RawLocalFileSystem and LocalDirAllocator. If we can remove this assumption 
> and introduce an abstraction to manage local disks, then we can reuse most of 
> the shuffle logic (store, sort) and inject a HDFS API based retrieval instead 
> of HTTP.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to