This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 523cc4f8 [#1013] fix(tez): Wait when return MapOutput of type wait 
(#1063)
523cc4f8 is described below

commit 523cc4f8e99231489e8fdcb42dff6718b1c4475d
Author: Qing <[email protected]>
AuthorDate: Wed Aug 23 16:00:59 2023 +0800

    [#1013] fix(tez): Wait when return MapOutput of type wait (#1063)
    
    ### What changes were proposed in this pull request?
    
    When therer is not enough memory, It will return MapOutput of type wait, It 
should wait until there is enough memory.
    
    
    ### Why are the changes needed?
    
    Fix: https://github.com/apache/incubator-uniffle/issues/1013
    
    ### Does this PR introduce _any_ user-facing change?
    
    
    No.
    
    ### How was this patch tested?
    
    unit test
---
 .../library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcher.java | 2 +-
 common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java      | 2 ++
 2 files changed, 3 insertions(+), 1 deletion(-)

diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcher.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcher.java
index 0e40e832..86c4be64 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcher.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcher.java
@@ -240,7 +240,7 @@ public class RssTezShuffleDataFetcher extends 
CallableWithNdc<Void> {
       throw ioe;
     }
     // Check if we can shuffle *now* ...
-    if (mapOutput == null) {
+    if (mapOutput == null || mapOutput.getType() == MapOutput.Type.WAIT) {
       LOG.info("RssMRFetcher" + " - MergeManager returned status WAIT ...");
       // Not an error but wait to process data.
       // Use a retry flag to avoid re-fetch and re-uncompress.
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java 
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index 680c829b..38ae7a88 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -204,6 +204,7 @@ public class GrpcServer implements ServerInterface {
     LOG.info("Grpc server started, configured port: {}, listening on {}.", 
port, listenPort);
   }
 
+  @Override
   public void stop() throws InterruptedException {
     if (server != null) {
       server.shutdown().awaitTermination(10, TimeUnit.SECONDS);
@@ -214,6 +215,7 @@ public class GrpcServer implements ServerInterface {
     }
   }
 
+  @Override
   public void blockUntilShutdown() throws InterruptedException {
     if (server != null) {
       server.awaitTermination();

Reply via email to