zuston opened a new issue, #1459:
URL: https://github.com/apache/incubator-uniffle/issues/1459

   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://www.apache.org/foundation/policies/conduct)
   
   
   ### Search before asking
   
   - [X] I have searched in the 
[issues](https://github.com/apache/incubator-uniffle/issues?q=is%3Aissue) and 
found no similar issues.
   
   
   ### What would you like to be improved?
   
   In #775, it separate flush thread pools for different storage type. But I'm 
not satisfied with current impl.
   
   In `DefaultFlushEventHandler`, we consume the `ShuffleDataFlushEvent` by the 
function from `ShuffleFlushManager#flushToFile`. But in the `flushToFile` 
method, we will modify the internal state of event,  this is not proper.
   
   I hope the flushToFile only flush or check whether retry or discard the 
event, all the state of event should be modified or maintained in the 
`DefaultFlushEventHandler`. The following code is here:
   
   ```
   private void flushToFile(ShuffleDataFlushEvent event) {
       long start = System.currentTimeMillis();
       boolean writeSuccess = false;
   
       try {
         if (!event.isValid()) {
           LOG.warn(
               "AppId {} was removed already, event {} should be dropped", 
event.getAppId(), event);
           return;
         }
   
         List<ShufflePartitionedBlock> blocks = event.getShuffleBlocks();
         if (blocks == null || blocks.isEmpty()) {
           LOG.info("There is no block to be flushed: {}", event);
           return;
         }
   
         Storage storage = event.getUnderStorage();
         if (storage == null) {
           LOG.error("Storage selected is null and this should not happen. 
event: {}", event);
           throw new EventDiscardException();
         }
   
         if (event.isPended()
             && System.currentTimeMillis() - event.getStartPendingTime()
                 > pendingEventTimeoutSec * 1000L) {
           ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
           LOG.error(
               "Flush event cannot be flushed for {} sec, the event {} is 
dropped",
               pendingEventTimeoutSec,
               event);
           throw new EventDiscardException();
         }
   ```
   
   We could introduce the `EventDiscardException` or `EventRetryException` and 
so on for next processing in `DefaultFlushEventHandler`.
   
   
   ### How should we improve?
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to