This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 541d9dcc [#1111] fix: Shuffle server can not delete remote storage
path of secured HDFS cluster (#1122)
541d9dcc is described below
commit 541d9dcce5c54c0d0d0b570e4c1d24dc4ce91794
Author: zhengchenyu <[email protected]>
AuthorDate: Fri Aug 11 10:09:09 2023 +0800
[#1111] fix: Shuffle server can not delete remote storage path of secured
HDFS cluster (#1122)
### What changes were proposed in this pull request?
Fix the bug that shuffle server can not delete remote storage path in
secured HDFS.
### Why are the changes needed?
Fix: #1111
### How was this patch tested?
unit test and test in cluster.
---
.../apache/uniffle/server/ShuffleTaskManager.java | 4 +-
.../server/KerberizedShuffleTaskManagerTest.java | 183 +++++++++++++++++++++
2 files changed, 185 insertions(+), 2 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 06014738..3579d4a5 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -651,6 +651,7 @@ public class ShuffleTaskManager {
public void removeResources(String appId) {
LOG.info("Start remove resource for appId[" + appId + "]");
final long start = System.currentTimeMillis();
+ String user = getUserByAppId(appId);
ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.remove(appId);
if (shuffleTaskInfo == null) {
LOG.info("Resource for appId[" + appId + "] had been removed before.");
@@ -663,8 +664,7 @@ public class ShuffleTaskManager {
shuffleFlushManager.removeResources(appId);
if (!shuffleToCachedBlockIds.isEmpty()) {
storageManager.removeResources(
- new AppPurgeEvent(
- appId, getUserByAppId(appId), new
ArrayList<>(shuffleToCachedBlockIds.keySet())));
+ new AppPurgeEvent(appId, user, new
ArrayList<>(shuffleToCachedBlockIds.keySet())));
}
if (shuffleTaskInfo.hasHugePartition()) {
ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec();
diff --git
a/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java
new file mode 100644
index 00000000..4c4fe38f
--- /dev/null
+++
b/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.RangeMap;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.KerberizedHadoopBase;
+import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.ShufflePartitionedData;
+import org.apache.uniffle.common.util.ChecksumUtils;
+import org.apache.uniffle.server.buffer.ShuffleBuffer;
+import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.storage.util.ShuffleStorageUtils;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KerberizedShuffleTaskManagerTest extends KerberizedHadoopBase {
+
+ private static final AtomicInteger ATOMIC_INT = new AtomicInteger(0);
+
+ private ShuffleServer shuffleServer;
+ protected static String hdfsUri;
+ protected static FileSystem fs;
+
+ @BeforeAll
+ public static void beforeAll() throws Exception {
+ testRunner = KerberizedShuffleTaskManagerTest.class;
+ KerberizedHadoopBase.init();
+ fs = kerberizedHadoop.getFileSystem();
+ hdfsUri = fs.getUri().toString();
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ ShuffleServerMetrics.clear();
+ ShuffleServerMetrics.register();
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ if (shuffleServer != null) {
+ shuffleServer.stopServer();
+ shuffleServer = null;
+ }
+ }
+
+ /**
+ * Clean up the shuffle data of stage level for one app
+ *
+ * @throws Exception
+ */
+ @Test
+ public void removeShuffleDataWithHdfsTest() throws Exception {
+ String confFile = ClassLoader.getSystemResource("server.conf").getFile();
+ ShuffleServerConf conf = new ShuffleServerConf(confFile);
+ String storageBasePath = hdfsUri + "/rss/removeShuffleDataWithHdfsTest";
+ conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+ conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
+ conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
+ conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
+ conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
+ conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
+ conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
50.0);
+ conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
0.0);
+ conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
+ conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
Long.MAX_VALUE);
+ conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
+ String user = UserGroupInformation.getCurrentUser().getShortUserName();
+ conf.set(ShuffleServerConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE, true);
+ conf.set(
+ ShuffleServerConf.RSS_SECURITY_HADOOP_KRB5_CONF_FILE,
kerberizedHadoop.getKrb5ConfFile());
+ conf.set(
+ ShuffleServerConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE,
+ kerberizedHadoop.getHdfsKeytab());
+ conf.set(
+ ShuffleServerConf.RSS_SECURITY_HADOOP_KERBEROS_PRINCIPAL,
+ kerberizedHadoop.getHdfsPrincipal());
+ conf.setString(
+ ShuffleServerConf.PREFIX_HADOOP_CONF +
".hadoop.security.authentication", "kerberos");
+
+ shuffleServer = new ShuffleServer(conf);
+
+ ShuffleTaskManager shuffleTaskManager =
shuffleServer.getShuffleTaskManager();
+
+ String appId = "removeShuffleDataTest1";
+ for (int i = 0; i < 4; i++) {
+ shuffleTaskManager.registerShuffle(
+ appId,
+ i,
+ Lists.newArrayList(new PartitionRange(0, 1)),
+ new RemoteStorageInfo(storageBasePath, Maps.newHashMap()),
+ user);
+ }
+ shuffleTaskManager.refreshAppId(appId);
+
+ assertEquals(1, shuffleTaskManager.getAppIds().size());
+
+ ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
+
+ shuffleTaskManager.requireBuffer(35);
+ shuffleTaskManager.requireBuffer(35);
+ shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
+ shuffleTaskManager.updateCachedBlockIds(appId, 0,
partitionedData0.getBlockList());
+ shuffleTaskManager.cacheShuffleData(appId, 1, false, partitionedData0);
+ shuffleTaskManager.updateCachedBlockIds(appId, 1,
partitionedData0.getBlockList());
+ shuffleTaskManager.refreshAppId(appId);
+ shuffleTaskManager.checkResourceStatus();
+
+ assertEquals(1, shuffleTaskManager.getAppIds().size());
+
+ ShuffleBufferManager shuffleBufferManager =
shuffleServer.getShuffleBufferManager();
+ RangeMap<Integer, ShuffleBuffer> rangeMap =
+ shuffleBufferManager.getBufferPool().get(appId).get(0);
+ assertFalse(rangeMap.asMapOfRanges().isEmpty());
+ shuffleTaskManager.commitShuffle(appId, 0);
+
+ // Before removing shuffle resources
+ String appBasePath =
ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath, appId);
+ String shufflePath0 =
ShuffleStorageUtils.getFullShuffleDataFolder(appBasePath, "0");
+ assertTrue(fs.exists(new Path(shufflePath0)));
+
+ // After removing the shuffle id of 0 resources
+ shuffleTaskManager.removeShuffleDataSync(appId, 0);
+ assertFalse(fs.exists(new Path(shufflePath0)));
+ assertTrue(fs.exists(new Path(appBasePath)));
+ assertNull(shuffleBufferManager.getBufferPool().get(appId).get(0));
+ assertNotNull(shuffleBufferManager.getBufferPool().get(appId).get(1));
+ shuffleTaskManager.removeResources(appId);
+ assertFalse(fs.exists(new Path(appBasePath)));
+ assertNull(shuffleBufferManager.getBufferPool().get(appId));
+ }
+
+ private ShufflePartitionedData createPartitionedData(
+ int partitionId, int blockNum, int dataLength) {
+ ShufflePartitionedBlock[] blocks = createBlock(blockNum, dataLength);
+ return new ShufflePartitionedData(partitionId, blocks);
+ }
+
+ private ShufflePartitionedBlock[] createBlock(int num, int length) {
+ ShufflePartitionedBlock[] blocks = new ShufflePartitionedBlock[num];
+ for (int i = 0; i < num; i++) {
+ byte[] buf = new byte[length];
+ new Random().nextBytes(buf);
+ blocks[i] =
+ new ShufflePartitionedBlock(
+ length, length, ChecksumUtils.getCrc32(buf),
ATOMIC_INT.incrementAndGet(), 0, buf);
+ }
+ return blocks;
+ }
+}