zuston commented on issue #142:
URL: 
https://github.com/apache/incubator-uniffle/issues/142#issuecomment-1208955697

   I have a question that why it will influence the dynamic allocation? Could u 
help give more detail?
   
   I think a shuffle dependency will hold a shuffle handle(rss or sort handle) 
which is decided by the method of registerShuffle. This means that a shuffle 
only will be managed by one shuffle manager(rss or sort) and served for writer 
or reader. The co-operation dont exist in sort and rss.
   
   > SortShuffleManager need ESS to support dynamic allocation, but 
RssShuffleManager can't work with ESS.
   
   I dont understand what's the meaning.
   
   Attach the code draft implementation.
   ### Code Draft
   ```java
   public class DelegationRssShuffleManager implements ShuffleManager {
    
     private ShuffleManager rssShuffleManager;
     private ShuffleManager sortShuffleManager;
   
      public DelegationRssShuffleManager(SparkConf sparkConf, boolean isDriver) 
throws Exception {
       this.sparkConf = sparkConf;
       // dont initialize the shuffle manager in the constructor.
     }
   
    private synchronized ShuffleManager createOrGetShuffleManager(boolean 
useRss) {
      if (useRss) {
         if (rssShuffleManager != null) {
           return rssShuffleManager;
         } else {
           try {
             final ShuffleManager shuffleManager = new 
RssShuffleManager(sparkConf, true);
             sparkConf.set(RssSparkConfig.RSS_ENABLED.key(), "true");
             LOG.info("Use RssShuffleManager");
             this.rssShuffleManager = shuffleManager;
             return rssShuffleManager;
           } catch (Exception exception) {
             LOG.warn("Fail to create RssShuffleManager, fallback to 
SortShuffleManager {}", exception.getMessage());
           }
         }
       }
   
       if (sortShuffleManager == null) {
         try {
           final ShuffleManager shuffleManager =
               
RssSparkShuffleUtils.loadShuffleManager(Constants.SORT_SHUFFLE_MANAGER_NAME, 
sparkConf, true);
           LOG.info("Use SortShuffleManager");
           this.sortShuffleManager = shuffleManager;
         } catch (Exception e) {
           throw new RssException(e.getMessage());
         }
       }
       return sortShuffleManager;
    }
     
     @Override
     public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, 
ShuffleDependency<K, V, C> dependency) {
       ShuffleManager shuffleManager = 
createOrGetShuffleManager(tryAccessCluster());
       if (!(shuffleManager instanceof RssShuffleManager)) {
         shuffleIdsUsingSort.add(shuffleId);
       }
       return shuffleManager.registerShuffle(shuffleId, dependency);
     }
     
     @Override
     public <K, V> ShuffleWriter<K, V> getWriter(
         ShuffleHandle handle,
         long mapId,
         TaskContext context,
         ShuffleWriteMetricsReporter metrics) {
       ShuffleManager shuffleManager = createOrGetShuffleManager(handle 
instanceof RssShuffleHandle);
       return shuffleManager.getWriter(handle, mapId, context, metrics);
     }
   
      @Override
     public <K, C> ShuffleReader<K, C> getReader(
         ShuffleHandle handle,
         int startPartition,
         int endPartition,
         TaskContext context,
         ShuffleReadMetricsReporter metrics) {
       ShuffleManager shuffleManager = createOrGetShuffleManager(handle 
instanceof RssShuffleHandle);
       return shuffleManager.getReader(handle,
           startPartition, endPartition, context, metrics);
     }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to