This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 541d9dcc [#1111] fix: Shuffle server can not delete remote storage 
path of secured HDFS cluster (#1122)
541d9dcc is described below

commit 541d9dcce5c54c0d0d0b570e4c1d24dc4ce91794
Author: zhengchenyu <[email protected]>
AuthorDate: Fri Aug 11 10:09:09 2023 +0800

    [#1111] fix: Shuffle server can not delete remote storage path of secured 
HDFS cluster (#1122)
    
    ### What changes were proposed in this pull request?
    
    Fix the bug that shuffle server can not delete remote storage path in 
secured HDFS.
    
    ### Why are the changes needed?
    
    Fix: #1111
    
    ### How was this patch tested?
    
    unit test and test in cluster.
---
 .../apache/uniffle/server/ShuffleTaskManager.java  |   4 +-
 .../server/KerberizedShuffleTaskManagerTest.java   | 183 +++++++++++++++++++++
 2 files changed, 185 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 06014738..3579d4a5 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -651,6 +651,7 @@ public class ShuffleTaskManager {
   public void removeResources(String appId) {
     LOG.info("Start remove resource for appId[" + appId + "]");
     final long start = System.currentTimeMillis();
+    String user = getUserByAppId(appId);
     ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.remove(appId);
     if (shuffleTaskInfo == null) {
       LOG.info("Resource for appId[" + appId + "] had been removed before.");
@@ -663,8 +664,7 @@ public class ShuffleTaskManager {
     shuffleFlushManager.removeResources(appId);
     if (!shuffleToCachedBlockIds.isEmpty()) {
       storageManager.removeResources(
-          new AppPurgeEvent(
-              appId, getUserByAppId(appId), new 
ArrayList<>(shuffleToCachedBlockIds.keySet())));
+          new AppPurgeEvent(appId, user, new 
ArrayList<>(shuffleToCachedBlockIds.keySet())));
     }
     if (shuffleTaskInfo.hasHugePartition()) {
       ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec();
diff --git 
a/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java
new file mode 100644
index 00000000..4c4fe38f
--- /dev/null
+++ 
b/server/src/test/java/org/apache/uniffle/server/KerberizedShuffleTaskManagerTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.RangeMap;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.common.KerberizedHadoopBase;
+import org.apache.uniffle.common.PartitionRange;
+import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.ShufflePartitionedBlock;
+import org.apache.uniffle.common.ShufflePartitionedData;
+import org.apache.uniffle.common.util.ChecksumUtils;
+import org.apache.uniffle.server.buffer.ShuffleBuffer;
+import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.storage.util.ShuffleStorageUtils;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class KerberizedShuffleTaskManagerTest extends KerberizedHadoopBase {
+
+  private static final AtomicInteger ATOMIC_INT = new AtomicInteger(0);
+
+  private ShuffleServer shuffleServer;
+  protected static String hdfsUri;
+  protected static FileSystem fs;
+
+  @BeforeAll
+  public static void beforeAll() throws Exception {
+    testRunner = KerberizedShuffleTaskManagerTest.class;
+    KerberizedHadoopBase.init();
+    fs = kerberizedHadoop.getFileSystem();
+    hdfsUri = fs.getUri().toString();
+  }
+
+  @BeforeEach
+  public void beforeEach() {
+    ShuffleServerMetrics.clear();
+    ShuffleServerMetrics.register();
+  }
+
+  @AfterEach
+  public void afterEach() throws Exception {
+    if (shuffleServer != null) {
+      shuffleServer.stopServer();
+      shuffleServer = null;
+    }
+  }
+
+  /**
+   * Clean up the shuffle data of stage level for one app
+   *
+   * @throws Exception
+   */
+  @Test
+  public void removeShuffleDataWithHdfsTest() throws Exception {
+    String confFile = ClassLoader.getSystemResource("server.conf").getFile();
+    ShuffleServerConf conf = new ShuffleServerConf(confFile);
+    String storageBasePath = hdfsUri + "/rss/removeShuffleDataWithHdfsTest";
+    conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+    conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
+    conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
+    conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
+    conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
+    conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 128L);
+    conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE, 
50.0);
+    conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE, 
0.0);
+    conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
+    conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 
Long.MAX_VALUE);
+    conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    conf.set(ShuffleServerConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE, true);
+    conf.set(
+        ShuffleServerConf.RSS_SECURITY_HADOOP_KRB5_CONF_FILE, 
kerberizedHadoop.getKrb5ConfFile());
+    conf.set(
+        ShuffleServerConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE,
+        kerberizedHadoop.getHdfsKeytab());
+    conf.set(
+        ShuffleServerConf.RSS_SECURITY_HADOOP_KERBEROS_PRINCIPAL,
+        kerberizedHadoop.getHdfsPrincipal());
+    conf.setString(
+        ShuffleServerConf.PREFIX_HADOOP_CONF + 
".hadoop.security.authentication", "kerberos");
+
+    shuffleServer = new ShuffleServer(conf);
+
+    ShuffleTaskManager shuffleTaskManager = 
shuffleServer.getShuffleTaskManager();
+
+    String appId = "removeShuffleDataTest1";
+    for (int i = 0; i < 4; i++) {
+      shuffleTaskManager.registerShuffle(
+          appId,
+          i,
+          Lists.newArrayList(new PartitionRange(0, 1)),
+          new RemoteStorageInfo(storageBasePath, Maps.newHashMap()),
+          user);
+    }
+    shuffleTaskManager.refreshAppId(appId);
+
+    assertEquals(1, shuffleTaskManager.getAppIds().size());
+
+    ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
+
+    shuffleTaskManager.requireBuffer(35);
+    shuffleTaskManager.requireBuffer(35);
+    shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
+    shuffleTaskManager.updateCachedBlockIds(appId, 0, 
partitionedData0.getBlockList());
+    shuffleTaskManager.cacheShuffleData(appId, 1, false, partitionedData0);
+    shuffleTaskManager.updateCachedBlockIds(appId, 1, 
partitionedData0.getBlockList());
+    shuffleTaskManager.refreshAppId(appId);
+    shuffleTaskManager.checkResourceStatus();
+
+    assertEquals(1, shuffleTaskManager.getAppIds().size());
+
+    ShuffleBufferManager shuffleBufferManager = 
shuffleServer.getShuffleBufferManager();
+    RangeMap<Integer, ShuffleBuffer> rangeMap =
+        shuffleBufferManager.getBufferPool().get(appId).get(0);
+    assertFalse(rangeMap.asMapOfRanges().isEmpty());
+    shuffleTaskManager.commitShuffle(appId, 0);
+
+    // Before removing shuffle resources
+    String appBasePath = 
ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePath, appId);
+    String shufflePath0 = 
ShuffleStorageUtils.getFullShuffleDataFolder(appBasePath, "0");
+    assertTrue(fs.exists(new Path(shufflePath0)));
+
+    // After removing the shuffle id of 0 resources
+    shuffleTaskManager.removeShuffleDataSync(appId, 0);
+    assertFalse(fs.exists(new Path(shufflePath0)));
+    assertTrue(fs.exists(new Path(appBasePath)));
+    assertNull(shuffleBufferManager.getBufferPool().get(appId).get(0));
+    assertNotNull(shuffleBufferManager.getBufferPool().get(appId).get(1));
+    shuffleTaskManager.removeResources(appId);
+    assertFalse(fs.exists(new Path(appBasePath)));
+    assertNull(shuffleBufferManager.getBufferPool().get(appId));
+  }
+
+  private ShufflePartitionedData createPartitionedData(
+      int partitionId, int blockNum, int dataLength) {
+    ShufflePartitionedBlock[] blocks = createBlock(blockNum, dataLength);
+    return new ShufflePartitionedData(partitionId, blocks);
+  }
+
+  private ShufflePartitionedBlock[] createBlock(int num, int length) {
+    ShufflePartitionedBlock[] blocks = new ShufflePartitionedBlock[num];
+    for (int i = 0; i < num; i++) {
+      byte[] buf = new byte[length];
+      new Random().nextBytes(buf);
+      blocks[i] =
+          new ShufflePartitionedBlock(
+              length, length, ChecksumUtils.getCrc32(buf), 
ATOMIC_INT.incrementAndGet(), 0, buf);
+    }
+    return blocks;
+  }
+}

Reply via email to