This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 523cc4f8 [#1013] fix(tez): Wait when return MapOutput of type wait
(#1063)
523cc4f8 is described below
commit 523cc4f8e99231489e8fdcb42dff6718b1c4475d
Author: Qing <[email protected]>
AuthorDate: Wed Aug 23 16:00:59 2023 +0800
[#1013] fix(tez): Wait when return MapOutput of type wait (#1063)
### What changes were proposed in this pull request?
When therer is not enough memory, It will return MapOutput of type wait, It
should wait until there is enough memory.
### Why are the changes needed?
Fix: https://github.com/apache/incubator-uniffle/issues/1013
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
unit test
---
.../library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcher.java | 2 +-
common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java | 2 ++
2 files changed, 3 insertions(+), 1 deletion(-)
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcher.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcher.java
index 0e40e832..86c4be64 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcher.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezShuffleDataFetcher.java
@@ -240,7 +240,7 @@ public class RssTezShuffleDataFetcher extends
CallableWithNdc<Void> {
throw ioe;
}
// Check if we can shuffle *now* ...
- if (mapOutput == null) {
+ if (mapOutput == null || mapOutput.getType() == MapOutput.Type.WAIT) {
LOG.info("RssMRFetcher" + " - MergeManager returned status WAIT ...");
// Not an error but wait to process data.
// Use a retry flag to avoid re-fetch and re-uncompress.
diff --git a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
index 680c829b..38ae7a88 100644
--- a/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
+++ b/common/src/main/java/org/apache/uniffle/common/rpc/GrpcServer.java
@@ -204,6 +204,7 @@ public class GrpcServer implements ServerInterface {
LOG.info("Grpc server started, configured port: {}, listening on {}.",
port, listenPort);
}
+ @Override
public void stop() throws InterruptedException {
if (server != null) {
server.shutdown().awaitTermination(10, TimeUnit.SECONDS);
@@ -214,6 +215,7 @@ public class GrpcServer implements ServerInterface {
}
}
+ @Override
public void blockUntilShutdown() throws InterruptedException {
if (server != null) {
server.awaitTermination();