This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new ca245316 [#1045] fix(server): shuffle server may hang when restart 
worker due to multi require-momery and no require-momery release (#1058)
ca245316 is described below

commit ca245316c3f5002de117adb7f550d3198fb7f6e2
Author: Qing <[email protected]>
AuthorDate: Tue Aug 8 14:51:25 2023 +0800

    [#1045] fix(server): shuffle server may hang when restart worker due to 
multi require-momery and no require-momery release (#1058)
    
    
    ### What changes were proposed in this pull request?
    
    shuffle server may hang when restart worker due to multi require-momery and 
no require-momery release.
    So, fix the bug
    
    ### Why are the changes needed?
    
    shuffle server may hang when restart worker due to multi require-momery and 
no require-momery release.
    So, fix the bug
    
    Fix:  https://github.com/apache/incubator-uniffle/issues/1045
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    unit test
---
 .../library/common/shuffle/impl/RssTezFetcher.java |  6 ++--
 .../common/shuffle/impl/RssTezFetcherTask.java     |  2 +-
 .../orderedgrouped/RssShuffleScheduler.java        |  5 +---
 .../shuffle/orderedgrouped/RssTezBypassWriter.java |  8 +++++
 .../apache/uniffle/test/ShuffleServerGrpcTest.java | 10 +++----
 .../client/impl/grpc/ShuffleServerGrpcClient.java  | 19 ++++++++++--
 .../impl/grpc/ShuffleServerGrpcNettyClient.java    |  5 +++-
 .../uniffle/server/ShuffleServerGrpcService.java   |  6 +++-
 .../apache/uniffle/server/ShuffleTaskManager.java  | 18 ++++++-----
 .../server/buffer/RequireBufferStatusCode.java     | 35 ++++++++++++++++++++++
 .../uniffle/server/ShuffleTaskManagerTest.java     | 14 +++++----
 11 files changed, 97 insertions(+), 31 deletions(-)

diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcher.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcher.java
index 82c2568e..071d5314 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcher.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcher.java
@@ -133,7 +133,7 @@ public class RssTezFetcher {
         hasPendingData = false;
         uncompressedData = null;
       } else {
-        LOG.info("uncompressedData is null");
+        LOG.info("UncompressedData is null");
         // if reserve fail, return and wait
         waitCount++;
         startWait = System.currentTimeMillis();
@@ -144,13 +144,13 @@ public class RssTezFetcher {
       copyBlockCount++;
       copyTime = readTime + decompressTime + serializeTime + waitTime;
     } else {
-      LOG.info("uncompressedData is null");
+      LOG.info("UncompressedData is null");
       // finish reading data, close related reader and check data consistent
       shuffleReadClient.close();
       shuffleReadClient.checkProcessedBlockIds();
       shuffleReadClient.logStatics();
       LOG.info(
-          "reduce task partition:"
+          "Reduce task partition:"
               + partitionId
               + " read block cnt: "
               + copyBlockCount
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
index a40601ae..7e57060f 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
@@ -157,7 +157,7 @@ public class RssTezFetcherTask extends 
CallableWithNdc<FetchResult> {
     Roaring64NavigableMap taskIdBitmap =
         RssTezUtils.fetchAllRssTaskIds(new HashSet<>(inputs), 
numPhysicalInputs, appAttemptId);
     LOG.info(
-        "inputs:{}, num input:{}, appAttemptId:{}, taskIdBitmap:{}",
+        "Inputs:{}, num input:{}, appAttemptId:{}, taskIdBitmap:{}",
         inputs,
         numPhysicalInputs,
         appAttemptId,
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
index 2d6de7f7..e6ae3ff2 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
@@ -1273,10 +1273,7 @@ class RssShuffleScheduler extends ShuffleScheduler {
       String inputHostName, int port, int partitionId, 
CompositeInputAttemptIdentifier srcAttempt) {
 
     LOG.info(
-        "AddKnownMapOutput thread:{}, obj:{}, RssShuffleScheduler, 
addKnownMapOutput, inputHostName length:{}, "
-            + "port:{}, partitionId:{}, srcAttempt:{}, inputHostName:{}",
-        Thread.currentThread().getName(),
-        this,
+        "AddKnownMapOutput inputHostName length:{}, port:{}, partitionId:{}, 
srcAttempt:{}, inputHostName:{}",
         inputHostName.length(),
         port,
         partitionId,
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriter.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriter.java
index f100a76f..4354c76a 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriter.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriter.java
@@ -37,6 +37,10 @@ public class RssTezBypassWriter {
   private static final byte[] HEADER = new byte[] {(byte) 'T', (byte) 'I', 
(byte) 'F', (byte) 0};
 
   public static void write(MapOutput mapOutput, byte[] buffer) {
+    LOG.info(
+        "RssTezBypassWriter write mapOutput, type:{}, buffer length:{}",
+        mapOutput.getType(),
+        buffer.length);
     // Write and commit uncompressed data to MapOutput.
     // In the majority of cases, merger allocates memory to accept data,
     // but when data size exceeds the threshold, merger can also allocate disk.
@@ -57,6 +61,10 @@ public class RssTezBypassWriter {
   }
 
   public static void write(final FetchedInput mapOutput, byte[] buffer) throws 
IOException {
+    LOG.info(
+        "RssTezBypassWriter write mapOutput, type:{}, buffer length:{}",
+        mapOutput.getType(),
+        buffer.length);
     // Write and commit uncompressed data to MapOutput.
     // In the majority of cases, merger allocates memory to accept data,
     // but when data size exceeds the threshold, merger can also allocate disk.
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index fbed4144..44bb813e 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -616,7 +616,7 @@ public class ShuffleServerGrpcTest extends 
IntegrationTestBase {
             .getCounterMap()
             .get(ShuffleServerGrpcMetrics.REQUIRE_BUFFER_METHOD)
             .get();
-    shuffleServerClient.requirePreAllocation(100, 10, 1000);
+    shuffleServerClient.requirePreAllocation(appId, 100, 10, 1000);
     newValue =
         shuffleServers
             .get(0)
@@ -852,11 +852,11 @@ public class ShuffleServerGrpcTest extends 
IntegrationTestBase {
 
     oldValue = ShuffleServerMetrics.counterTotalRequireBufferFailed.get();
     // the next two allocations will fail
-    assertEquals(shuffleServerClient.requirePreAllocation(GB, 0, 10), -1);
-    assertEquals(shuffleServerClient.requirePreAllocation(GB, 0, 10), -1);
+    assertEquals(shuffleServerClient.requirePreAllocation(appId, GB, 0, 10), 
-1);
+    assertEquals(shuffleServerClient.requirePreAllocation(appId, GB, 0, 10), 
-1);
     // the next two allocations will success
-    assertNotEquals(shuffleServerClient.requirePreAllocation(10, 0, 10), -1);
-    assertNotEquals(shuffleServerClient.requirePreAllocation(10, 0, 10), -1);
+    assertNotEquals(shuffleServerClient.requirePreAllocation(appId, 10, 0, 
10), -1);
+    assertNotEquals(shuffleServerClient.requirePreAllocation(appId, 10, 0, 
10), -1);
     newValue = ShuffleServerMetrics.counterTotalRequireBufferFailed.get();
     assertEquals((int) newValue, (int) oldValue + 2);
   }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 6ef1f2c4..b7f976ea 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -194,10 +194,10 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
 
   // Only for tests
   @VisibleForTesting
-  public long requirePreAllocation(int requireSize, int retryMax, long 
retryIntervalMax)
-      throws Exception {
+  public long requirePreAllocation(
+      String appId, int requireSize, int retryMax, long retryIntervalMax) 
throws Exception {
     return requirePreAllocation(
-        "EMPTY", 0, Collections.emptyList(), requireSize, retryMax, 
retryIntervalMax);
+        appId, 0, Collections.emptyList(), requireSize, retryMax, 
retryIntervalMax);
   }
 
   public long requirePreAllocation(
@@ -265,6 +265,19 @@ public class ShuffleServerGrpcClient extends GrpcClient 
implements ShuffleServer
           port,
           System.currentTimeMillis() - start);
       result = rpcResponse.getRequireBufferId();
+    } else if (rpcResponse.getStatus() == RssProtos.StatusCode.NO_REGISTER) {
+      String msg =
+          "Can't require "
+              + requireSize
+              + " bytes from "
+              + host
+              + ":"
+              + port
+              + ", statusCode="
+              + rpcResponse.getStatus()
+              + ", errorMsg:"
+              + rpcResponse.getRetMsg();
+      throw new NotRetryException(msg);
     }
     return result;
   }
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index d453e7a9..1a867fa2 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -90,7 +90,10 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
             () -> {
               long requireId =
                   requirePreAllocation(
-                      allocateSize, request.getRetryMax(), 
request.getRetryIntervalMax());
+                      request.getAppId(),
+                      allocateSize,
+                      request.getRetryMax(),
+                      request.getRetryIntervalMax());
               if (requireId == FAILED_REQUIRE_ID) {
                 throw new RssException(
                     String.format(
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index dc83aa93..394293b9 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -81,6 +81,7 @@ import 
org.apache.uniffle.proto.RssProtos.ShuffleRegisterRequest;
 import org.apache.uniffle.proto.RssProtos.ShuffleRegisterResponse;
 import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerImplBase;
 import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
+import org.apache.uniffle.server.buffer.RequireBufferStatusCode;
 import org.apache.uniffle.storage.common.Storage;
 import org.apache.uniffle.storage.common.StorageReadMetrics;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -402,9 +403,12 @@ public class ShuffleServerGrpcService extends 
ShuffleServerImplBase {
     }
 
     StatusCode status = StatusCode.SUCCESS;
-    if (requireBufferId == -1) {
+    if (requireBufferId == RequireBufferStatusCode.NO_BUFFER.statusCode()) {
       status = StatusCode.NO_BUFFER;
       ShuffleServerMetrics.counterTotalRequireBufferFailed.inc();
+    } else if (requireBufferId == 
RequireBufferStatusCode.NO_REGISTER.statusCode()) {
+      status = StatusCode.NO_REGISTER;
+      ShuffleServerMetrics.counterTotalRequireBufferFailed.inc();
     }
     RequireBufferResponse response =
         RequireBufferResponse.newBuilder()
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 17bb39f0..75b59279 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -58,6 +58,7 @@ import org.apache.uniffle.common.util.JavaUtils;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
+import org.apache.uniffle.server.buffer.RequireBufferStatusCode;
 import org.apache.uniffle.server.buffer.ShuffleBuffer;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
 import org.apache.uniffle.server.event.AppPurgeEvent;
@@ -409,14 +410,15 @@ public class ShuffleTaskManager {
   public long requireBuffer(
       String appId, int shuffleId, List<Integer> partitionIds, int 
requireSize) {
     ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.get(appId);
-    if (shuffleTaskInfo != null) {
-      for (int partitionId : partitionIds) {
-        long partitionUsedDataSize = getPartitionDataSize(appId, shuffleId, 
partitionId);
-        if (shuffleBufferManager.limitHugePartition(
-            appId, shuffleId, partitionId, partitionUsedDataSize)) {
-          
ShuffleServerMetrics.counterTotalRequireBufferFailedForHugePartition.inc();
-          return -1;
-        }
+    if (null == shuffleTaskInfo) {
+      return RequireBufferStatusCode.NO_REGISTER.statusCode();
+    }
+    for (int partitionId : partitionIds) {
+      long partitionUsedDataSize = getPartitionDataSize(appId, shuffleId, 
partitionId);
+      if (shuffleBufferManager.limitHugePartition(
+          appId, shuffleId, partitionId, partitionUsedDataSize)) {
+        
ShuffleServerMetrics.counterTotalRequireBufferFailedForHugePartition.inc();
+        return RequireBufferStatusCode.NO_BUFFER.statusCode();
       }
     }
     return requireBuffer(requireSize);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/RequireBufferStatusCode.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/RequireBufferStatusCode.java
new file mode 100644
index 00000000..761a896e
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/RequireBufferStatusCode.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer;
+
+public enum RequireBufferStatusCode {
+  // means no buffer
+  NO_BUFFER(-1L),
+  // means app not registered, may be worker is down and should check.
+  NO_REGISTER(-4L);
+
+  private final long statusCode;
+
+  RequireBufferStatusCode(long code) {
+    this.statusCode = code;
+  }
+
+  public long statusCode() {
+    return statusCode;
+  }
+}
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 76e86c2e..091b473a 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -109,6 +109,10 @@ public class ShuffleTaskManagerTest extends HadoopTestBase 
{
     String appId = "hugePartitionMemoryUsageLimitTest_appId";
     int shuffleId = 1;
 
+    // case1, return -4 means not registered.
+    long requiredId = shuffleTaskManager.requireBuffer(appId, 1, 
Arrays.asList(1), 500);
+    assertEquals(-4, requiredId);
+
     shuffleTaskManager.registerShuffle(
         appId,
         shuffleId,
@@ -116,18 +120,18 @@ public class ShuffleTaskManagerTest extends 
HadoopTestBase {
         RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
         StringUtils.EMPTY);
 
-    // case1
-    long requiredId = shuffleTaskManager.requireBuffer(appId, 1, 
Arrays.asList(1), 500);
+    // case2
+    requiredId = shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 
500);
     assertNotEquals(-1, requiredId);
 
-    // case2
+    // case3
     ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 500);
     shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData0);
     shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0.getBlockList());
     requiredId = shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 
500);
     assertNotEquals(-1, requiredId);
 
-    // case3
+    // case4
     partitionedData0 = createPartitionedData(1, 1, 500);
     shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData0);
     shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0.getBlockList());
@@ -141,7 +145,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
     assertEquals(1, ShuffleServerMetrics.gaugeHugePartitionNum.get());
     assertEquals(1, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());
 
-    // case4
+    // case5
     shuffleTaskManager.removeResources(appId);
     assertEquals(0, ShuffleServerMetrics.gaugeHugePartitionNum.get());
     assertEquals(0, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());

Reply via email to