This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new ca245316 [#1045] fix(server): shuffle server may hang when restart
worker due to multi require-momery and no require-momery release (#1058)
ca245316 is described below
commit ca245316c3f5002de117adb7f550d3198fb7f6e2
Author: Qing <[email protected]>
AuthorDate: Tue Aug 8 14:51:25 2023 +0800
[#1045] fix(server): shuffle server may hang when restart worker due to
multi require-momery and no require-momery release (#1058)
### What changes were proposed in this pull request?
shuffle server may hang when restart worker due to multi require-momery and
no require-momery release.
So, fix the bug
### Why are the changes needed?
shuffle server may hang when restart worker due to multi require-momery and
no require-momery release.
So, fix the bug
Fix: https://github.com/apache/incubator-uniffle/issues/1045
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
unit test
---
.../library/common/shuffle/impl/RssTezFetcher.java | 6 ++--
.../common/shuffle/impl/RssTezFetcherTask.java | 2 +-
.../orderedgrouped/RssShuffleScheduler.java | 5 +---
.../shuffle/orderedgrouped/RssTezBypassWriter.java | 8 +++++
.../apache/uniffle/test/ShuffleServerGrpcTest.java | 10 +++----
.../client/impl/grpc/ShuffleServerGrpcClient.java | 19 ++++++++++--
.../impl/grpc/ShuffleServerGrpcNettyClient.java | 5 +++-
.../uniffle/server/ShuffleServerGrpcService.java | 6 +++-
.../apache/uniffle/server/ShuffleTaskManager.java | 18 ++++++-----
.../server/buffer/RequireBufferStatusCode.java | 35 ++++++++++++++++++++++
.../uniffle/server/ShuffleTaskManagerTest.java | 14 +++++----
11 files changed, 97 insertions(+), 31 deletions(-)
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcher.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcher.java
index 82c2568e..071d5314 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcher.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcher.java
@@ -133,7 +133,7 @@ public class RssTezFetcher {
hasPendingData = false;
uncompressedData = null;
} else {
- LOG.info("uncompressedData is null");
+ LOG.info("UncompressedData is null");
// if reserve fail, return and wait
waitCount++;
startWait = System.currentTimeMillis();
@@ -144,13 +144,13 @@ public class RssTezFetcher {
copyBlockCount++;
copyTime = readTime + decompressTime + serializeTime + waitTime;
} else {
- LOG.info("uncompressedData is null");
+ LOG.info("UncompressedData is null");
// finish reading data, close related reader and check data consistent
shuffleReadClient.close();
shuffleReadClient.checkProcessedBlockIds();
shuffleReadClient.logStatics();
LOG.info(
- "reduce task partition:"
+ "Reduce task partition:"
+ partitionId
+ " read block cnt: "
+ copyBlockCount
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
index a40601ae..7e57060f 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssTezFetcherTask.java
@@ -157,7 +157,7 @@ public class RssTezFetcherTask extends
CallableWithNdc<FetchResult> {
Roaring64NavigableMap taskIdBitmap =
RssTezUtils.fetchAllRssTaskIds(new HashSet<>(inputs),
numPhysicalInputs, appAttemptId);
LOG.info(
- "inputs:{}, num input:{}, appAttemptId:{}, taskIdBitmap:{}",
+ "Inputs:{}, num input:{}, appAttemptId:{}, taskIdBitmap:{}",
inputs,
numPhysicalInputs,
appAttemptId,
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
index 2d6de7f7..e6ae3ff2 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
@@ -1273,10 +1273,7 @@ class RssShuffleScheduler extends ShuffleScheduler {
String inputHostName, int port, int partitionId,
CompositeInputAttemptIdentifier srcAttempt) {
LOG.info(
- "AddKnownMapOutput thread:{}, obj:{}, RssShuffleScheduler,
addKnownMapOutput, inputHostName length:{}, "
- + "port:{}, partitionId:{}, srcAttempt:{}, inputHostName:{}",
- Thread.currentThread().getName(),
- this,
+ "AddKnownMapOutput inputHostName length:{}, port:{}, partitionId:{},
srcAttempt:{}, inputHostName:{}",
inputHostName.length(),
port,
partitionId,
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriter.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriter.java
index f100a76f..4354c76a 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriter.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssTezBypassWriter.java
@@ -37,6 +37,10 @@ public class RssTezBypassWriter {
private static final byte[] HEADER = new byte[] {(byte) 'T', (byte) 'I',
(byte) 'F', (byte) 0};
public static void write(MapOutput mapOutput, byte[] buffer) {
+ LOG.info(
+ "RssTezBypassWriter write mapOutput, type:{}, buffer length:{}",
+ mapOutput.getType(),
+ buffer.length);
// Write and commit uncompressed data to MapOutput.
// In the majority of cases, merger allocates memory to accept data,
// but when data size exceeds the threshold, merger can also allocate disk.
@@ -57,6 +61,10 @@ public class RssTezBypassWriter {
}
public static void write(final FetchedInput mapOutput, byte[] buffer) throws
IOException {
+ LOG.info(
+ "RssTezBypassWriter write mapOutput, type:{}, buffer length:{}",
+ mapOutput.getType(),
+ buffer.length);
// Write and commit uncompressed data to MapOutput.
// In the majority of cases, merger allocates memory to accept data,
// but when data size exceeds the threshold, merger can also allocate disk.
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
index fbed4144..44bb813e 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerGrpcTest.java
@@ -616,7 +616,7 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
.getCounterMap()
.get(ShuffleServerGrpcMetrics.REQUIRE_BUFFER_METHOD)
.get();
- shuffleServerClient.requirePreAllocation(100, 10, 1000);
+ shuffleServerClient.requirePreAllocation(appId, 100, 10, 1000);
newValue =
shuffleServers
.get(0)
@@ -852,11 +852,11 @@ public class ShuffleServerGrpcTest extends
IntegrationTestBase {
oldValue = ShuffleServerMetrics.counterTotalRequireBufferFailed.get();
// the next two allocations will fail
- assertEquals(shuffleServerClient.requirePreAllocation(GB, 0, 10), -1);
- assertEquals(shuffleServerClient.requirePreAllocation(GB, 0, 10), -1);
+ assertEquals(shuffleServerClient.requirePreAllocation(appId, GB, 0, 10),
-1);
+ assertEquals(shuffleServerClient.requirePreAllocation(appId, GB, 0, 10),
-1);
// the next two allocations will success
- assertNotEquals(shuffleServerClient.requirePreAllocation(10, 0, 10), -1);
- assertNotEquals(shuffleServerClient.requirePreAllocation(10, 0, 10), -1);
+ assertNotEquals(shuffleServerClient.requirePreAllocation(appId, 10, 0,
10), -1);
+ assertNotEquals(shuffleServerClient.requirePreAllocation(appId, 10, 0,
10), -1);
newValue = ShuffleServerMetrics.counterTotalRequireBufferFailed.get();
assertEquals((int) newValue, (int) oldValue + 2);
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
index 6ef1f2c4..b7f976ea 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcClient.java
@@ -194,10 +194,10 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
// Only for tests
@VisibleForTesting
- public long requirePreAllocation(int requireSize, int retryMax, long
retryIntervalMax)
- throws Exception {
+ public long requirePreAllocation(
+ String appId, int requireSize, int retryMax, long retryIntervalMax)
throws Exception {
return requirePreAllocation(
- "EMPTY", 0, Collections.emptyList(), requireSize, retryMax,
retryIntervalMax);
+ appId, 0, Collections.emptyList(), requireSize, retryMax,
retryIntervalMax);
}
public long requirePreAllocation(
@@ -265,6 +265,19 @@ public class ShuffleServerGrpcClient extends GrpcClient
implements ShuffleServer
port,
System.currentTimeMillis() - start);
result = rpcResponse.getRequireBufferId();
+ } else if (rpcResponse.getStatus() == RssProtos.StatusCode.NO_REGISTER) {
+ String msg =
+ "Can't require "
+ + requireSize
+ + " bytes from "
+ + host
+ + ":"
+ + port
+ + ", statusCode="
+ + rpcResponse.getStatus()
+ + ", errorMsg:"
+ + rpcResponse.getRetMsg();
+ throw new NotRetryException(msg);
}
return result;
}
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index d453e7a9..1a867fa2 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -90,7 +90,10 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
() -> {
long requireId =
requirePreAllocation(
- allocateSize, request.getRetryMax(),
request.getRetryIntervalMax());
+ request.getAppId(),
+ allocateSize,
+ request.getRetryMax(),
+ request.getRetryIntervalMax());
if (requireId == FAILED_REQUIRE_ID) {
throw new RssException(
String.format(
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index dc83aa93..394293b9 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -81,6 +81,7 @@ import
org.apache.uniffle.proto.RssProtos.ShuffleRegisterRequest;
import org.apache.uniffle.proto.RssProtos.ShuffleRegisterResponse;
import org.apache.uniffle.proto.ShuffleServerGrpc.ShuffleServerImplBase;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
+import org.apache.uniffle.server.buffer.RequireBufferStatusCode;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.common.StorageReadMetrics;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -402,9 +403,12 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
}
StatusCode status = StatusCode.SUCCESS;
- if (requireBufferId == -1) {
+ if (requireBufferId == RequireBufferStatusCode.NO_BUFFER.statusCode()) {
status = StatusCode.NO_BUFFER;
ShuffleServerMetrics.counterTotalRequireBufferFailed.inc();
+ } else if (requireBufferId ==
RequireBufferStatusCode.NO_REGISTER.statusCode()) {
+ status = StatusCode.NO_REGISTER;
+ ShuffleServerMetrics.counterTotalRequireBufferFailed.inc();
}
RequireBufferResponse response =
RequireBufferResponse.newBuilder()
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 17bb39f0..75b59279 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -58,6 +58,7 @@ import org.apache.uniffle.common.util.JavaUtils;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
+import org.apache.uniffle.server.buffer.RequireBufferStatusCode;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.event.AppPurgeEvent;
@@ -409,14 +410,15 @@ public class ShuffleTaskManager {
public long requireBuffer(
String appId, int shuffleId, List<Integer> partitionIds, int
requireSize) {
ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.get(appId);
- if (shuffleTaskInfo != null) {
- for (int partitionId : partitionIds) {
- long partitionUsedDataSize = getPartitionDataSize(appId, shuffleId,
partitionId);
- if (shuffleBufferManager.limitHugePartition(
- appId, shuffleId, partitionId, partitionUsedDataSize)) {
-
ShuffleServerMetrics.counterTotalRequireBufferFailedForHugePartition.inc();
- return -1;
- }
+ if (null == shuffleTaskInfo) {
+ return RequireBufferStatusCode.NO_REGISTER.statusCode();
+ }
+ for (int partitionId : partitionIds) {
+ long partitionUsedDataSize = getPartitionDataSize(appId, shuffleId,
partitionId);
+ if (shuffleBufferManager.limitHugePartition(
+ appId, shuffleId, partitionId, partitionUsedDataSize)) {
+
ShuffleServerMetrics.counterTotalRequireBufferFailedForHugePartition.inc();
+ return RequireBufferStatusCode.NO_BUFFER.statusCode();
}
}
return requireBuffer(requireSize);
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/RequireBufferStatusCode.java
b/server/src/main/java/org/apache/uniffle/server/buffer/RequireBufferStatusCode.java
new file mode 100644
index 00000000..761a896e
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/RequireBufferStatusCode.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server.buffer;
+
+public enum RequireBufferStatusCode {
+ // means no buffer
+ NO_BUFFER(-1L),
+ // means app not registered, may be worker is down and should check.
+ NO_REGISTER(-4L);
+
+ private final long statusCode;
+
+ RequireBufferStatusCode(long code) {
+ this.statusCode = code;
+ }
+
+ public long statusCode() {
+ return statusCode;
+ }
+}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 76e86c2e..091b473a 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -109,6 +109,10 @@ public class ShuffleTaskManagerTest extends HadoopTestBase
{
String appId = "hugePartitionMemoryUsageLimitTest_appId";
int shuffleId = 1;
+ // case1, return -4 means not registered.
+ long requiredId = shuffleTaskManager.requireBuffer(appId, 1,
Arrays.asList(1), 500);
+ assertEquals(-4, requiredId);
+
shuffleTaskManager.registerShuffle(
appId,
shuffleId,
@@ -116,18 +120,18 @@ public class ShuffleTaskManagerTest extends
HadoopTestBase {
RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
StringUtils.EMPTY);
- // case1
- long requiredId = shuffleTaskManager.requireBuffer(appId, 1,
Arrays.asList(1), 500);
+ // case2
+ requiredId = shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1),
500);
assertNotEquals(-1, requiredId);
- // case2
+ // case3
ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 500);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0.getBlockList());
requiredId = shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1),
500);
assertNotEquals(-1, requiredId);
- // case3
+ // case4
partitionedData0 = createPartitionedData(1, 1, 500);
shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData0);
shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0.getBlockList());
@@ -141,7 +145,7 @@ public class ShuffleTaskManagerTest extends HadoopTestBase {
assertEquals(1, ShuffleServerMetrics.gaugeHugePartitionNum.get());
assertEquals(1, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());
- // case4
+ // case5
shuffleTaskManager.removeResources(appId);
assertEquals(0, ShuffleServerMetrics.gaugeHugePartitionNum.get());
assertEquals(0, ShuffleServerMetrics.gaugeAppWithHugePartitionNum.get());