This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 95c9eb4e [#720][FOLLOW-UP] Correct the shuffle server id (#792)
95c9eb4e is described below

commit 95c9eb4ef307faede9a344de6895440ca48ea979
Author: roryqi <[email protected]>
AuthorDate: Wed Apr 5 16:14:51 2023 +0800

    [#720][FOLLOW-UP] Correct the shuffle server id (#792)
    
    ### What changes were proposed in this pull request?
    Assign the correct shuffle server id
    
    ### Why are the changes needed?
    #720 follow-up pr
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UTs
---
 .../apache/uniffle/server/ShuffleFlushManager.java | 12 ++++++-----
 .../org/apache/uniffle/server/ShuffleServer.java   | 25 +++++++++++++---------
 .../apache/uniffle/server/ShuffleServerConf.java   |  1 +
 .../ShuffleFlushManagerOnKerberizedHdfsTest.java   |  2 +-
 .../uniffle/server/ShuffleFlushManagerTest.java    | 17 +++++++--------
 .../uniffle/server/TestShuffleFlushManager.java    |  2 +-
 .../server/buffer/ShuffleBufferManagerTest.java    |  6 +++---
 7 files changed, 36 insertions(+), 29 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index b81453cf..8fa9e2c3 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -56,7 +56,6 @@ public class ShuffleFlushManager {
   protected final BlockingQueue<ShuffleDataFlushEvent> flushQueue = 
Queues.newLinkedBlockingQueue();
   private final Executor threadPoolExecutor;
   private final List<String> storageBasePaths;
-  private final String shuffleServerId;
   private final String storageType;
   private final int storageDataReplica;
   private final ShuffleServerConf shuffleServerConf;
@@ -71,9 +70,8 @@ public class ShuffleFlushManager {
   private int processPendingEventIndex = 0;
   private final int maxConcurrencyOfSingleOnePartition;
 
-  public ShuffleFlushManager(ShuffleServerConf shuffleServerConf, String 
shuffleServerId, ShuffleServer shuffleServer,
+  public ShuffleFlushManager(ShuffleServerConf shuffleServerConf, 
ShuffleServer shuffleServer,
                              StorageManager storageManager) {
-    this.shuffleServerId = shuffleServerId;
     this.shuffleServer = shuffleServer;
     this.shuffleServerConf = shuffleServerConf;
     this.storageManager = storageManager;
@@ -219,7 +217,7 @@ public class ShuffleFlushManager {
             event.getStartPartition(),
             event.getEndPartition(),
             storageBasePaths.toArray(new String[storageBasePaths.size()]),
-            shuffleServerId,
+            getShuffleServerId(),
             hadoopConf,
             storageDataReplica,
             user,
@@ -259,7 +257,11 @@ public class ShuffleFlushManager {
       }
     }
   }
-  
+
+  private String getShuffleServerId() {
+    return shuffleServerConf.getString(ShuffleServerConf.SHUFFLE_SERVER_ID, 
"shuffleServerId");
+  }
+
   private void updateCommittedBlockIds(String appId, int shuffleId, 
List<ShufflePartitionedBlock> blocks) {
     if (blocks == null || blocks.size() == 0) {
       return;
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 0a1d2cd2..c7cf9484 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -125,13 +125,22 @@ public class ShuffleServer {
   }
 
   public void start() throws Exception {
-    registerHeartBeat.startHeartBeat();
     jettyServer.start();
     server.start();
     if (nettyServerEnabled) {
       nettyPort = streamServer.start();
     }
 
+    if (nettyServerEnabled) {
+      id = ip + "-" + grpcPort + "-" + nettyPort;
+    } else {
+      id = ip + "-" + grpcPort;
+    }
+    shuffleServerConf.setString(ShuffleServerConf.SHUFFLE_SERVER_ID, id);
+    LOG.info("Start to shuffle server with id {}", id);
+    initMetricsReporter();
+
+    registerHeartBeat.startHeartBeat();
     Runtime.getRuntime().addShutdownHook(new Thread() {
       @Override
       public void run() {
@@ -195,13 +204,7 @@ public class ShuffleServer {
     }
     grpcPort = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
     nettyPort = 
shuffleServerConf.getInteger(ShuffleServerConf.NETTY_SERVER_PORT);
-    if (nettyPort >= 0) {
-      // when nettyPort is zero,actual netty port will be changed,but id can't 
be change.
-      id = ip + "-" + grpcPort + "-" + nettyPort;
-    } else {
-      id = ip + "-" + grpcPort;
-    }
-    LOG.info("Start to initialize server {}", id);
+
     jettyServer = new JettyServer(shuffleServerConf);
     registerMetrics();
 
@@ -228,7 +231,7 @@ public class ShuffleServer {
     }
 
     registerHeartBeat = new RegisterHeartBeat(this);
-    shuffleFlushManager = new ShuffleFlushManager(shuffleServerConf, id, this, 
storageManager);
+    shuffleFlushManager = new ShuffleFlushManager(shuffleServerConf, this, 
storageManager);
     shuffleBufferManager = new ShuffleBufferManager(shuffleServerConf, 
shuffleFlushManager);
     shuffleTaskManager = new ShuffleTaskManager(shuffleServerConf, 
shuffleFlushManager,
         shuffleBufferManager, storageManager);
@@ -262,7 +265,7 @@ public class ShuffleServer {
     }
   }
 
-  private void registerMetrics() throws Exception {
+  private void registerMetrics() {
     LOG.info("Register metrics");
     CollectorRegistry shuffleServerCollectorRegistry = new 
CollectorRegistry(true);
     ShuffleServerMetrics.register(shuffleServerCollectorRegistry);
@@ -291,7 +294,9 @@ public class ShuffleServer {
     jettyServer.addServlet(
         new CommonMetricsServlet(JvmMetrics.getCollectorRegistry(), true),
         "/prometheus/metrics/jvm");
+  }
 
+  private void initMetricsReporter() throws Exception {
     metricReporter = 
MetricReporterFactory.getMetricReporter(shuffleServerConf, id);
     if (metricReporter != null) {
       
metricReporter.addCollectorRegistry(ShuffleServerMetrics.getCollectorRegistry());
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 5f7b21e1..566e4e7a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -31,6 +31,7 @@ import org.apache.uniffle.common.util.RssUtils;
 public class ShuffleServerConf extends RssBaseConf {
 
   public static final String PREFIX_HADOOP_CONF = "rss.server.hadoop";
+  public static final String SHUFFLE_SERVER_ID = "rss.server.id";
 
   public static final ConfigOption<Long> SERVER_BUFFER_CAPACITY = ConfigOptions
       .key("rss.server.buffer.capacity")
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
index 2c730aa8..45a69ada 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
@@ -111,7 +111,7 @@ public class ShuffleFlushManagerOnKerberizedHdfsTest 
extends KerberizedHdfsBase
     storageManager.registerRemoteStorage(appId1, remoteStorage);
     storageManager.registerRemoteStorage(appId2, remoteStorage);
     ShuffleFlushManager manager =
-        new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", 
mockShuffleServer, storageManager);
+        new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, 
storageManager);
     ShuffleDataFlushEvent event1 =
         createShuffleDataFlushEvent(appId1, 1, 0, 1, null);
     manager.addToFlushQueue(event1);
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index 1645b9d1..aa8f88b6 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -113,7 +113,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     StorageManager storageManager =
         
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
     ShuffleFlushManager manager =
-        new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", 
mockShuffleServer, storageManager);
+        new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, 
storageManager);
     assertEquals("2", manager.getHadoopConf().get("dfs.replication"));
     assertEquals("value", manager.getHadoopConf().get("a.b"));
   }
@@ -135,7 +135,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
         
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
     storageManager.registerRemoteStorage(appId, remoteStorage);
     ShuffleFlushManager manager =
-        new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", 
mockShuffleServer, storageManager);
+        new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, 
storageManager);
 
     for (int i = 0; i < 10; i++) {
       ShuffleDataFlushEvent shuffleDataFlushEvent = 
createShuffleDataFlushEvent(appId, i, 1, 1, null);
@@ -163,7 +163,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
         
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
     storageManager.registerRemoteStorage(appId, remoteStorage);
     ShuffleFlushManager manager =
-        new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", 
mockShuffleServer, storageManager);
+        new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, 
storageManager);
 
     IntStream.range(0, 20).forEach(x -> {
       ShuffleDataFlushEvent event = createShuffleDataFlushEvent(appId, 1, 1, 
1, null);
@@ -191,7 +191,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     assertEquals(0.0, 
ShuffleServerMetrics.counterRemoteStorageFailedWrite.get(storageHost).get(), 
0.5);
     assertEquals(0.0, 
ShuffleServerMetrics.counterRemoteStorageSuccessWrite.get(storageHost).get(), 
0.5);
     ShuffleFlushManager manager =
-        new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", 
mockShuffleServer, storageManager);
+        new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, 
storageManager);
     ShuffleDataFlushEvent event1 =
         createShuffleDataFlushEvent(appId, 1, 1, 1, null);
     final List<ShufflePartitionedBlock> blocks1 = event1.getShuffleBlocks();
@@ -236,7 +236,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     List<ShufflePartitionedBlock> expectedBlocks = Lists.newArrayList();
     List<ShuffleDataFlushEvent> flushEvents1 = Lists.newArrayList();
     List<ShuffleDataFlushEvent> flushEvents2 = Lists.newArrayList();
-    ShuffleFlushManager manager = new ShuffleFlushManager(shuffleServerConf, 
"shuffleServerId",
+    ShuffleFlushManager manager = new ShuffleFlushManager(shuffleServerConf,
         mockShuffleServer, storageManager);
     for (int i = 0; i < 30; i++) {
       ShuffleDataFlushEvent flushEvent1 = createShuffleDataFlushEvent(appId, 
1, 1, 1, null);
@@ -275,7 +275,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     storageManager.registerRemoteStorage(appId1, remoteStorage);
     storageManager.registerRemoteStorage(appId2, remoteStorage);
     ShuffleFlushManager manager =
-        new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", 
mockShuffleServer, storageManager);
+        new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, 
storageManager);
     ShuffleDataFlushEvent event1 =
         createShuffleDataFlushEvent(appId1, 1, 0, 1, null);
     manager.addToFlushQueue(event1);
@@ -342,7 +342,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     StorageManager storageManager =
         StorageManagerFactory.getInstance().createStorageManager(serverConf);
     ShuffleFlushManager manager =
-        new ShuffleFlushManager(serverConf, "shuffleServerId", 
mockShuffleServer, storageManager);
+        new ShuffleFlushManager(serverConf, mockShuffleServer, storageManager);
     ShuffleDataFlushEvent event1 =
         createShuffleDataFlushEvent(appId1, 1, 0, 1, null);
     manager.addToFlushQueue(event1);
@@ -513,7 +513,6 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
 
     ShuffleFlushManager flushManager = new ShuffleFlushManager(
         shuffleServerConf,
-        "shuffle-server-id",
         mockShuffleServer,
         storageManager
     );
@@ -555,7 +554,7 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
     StorageManager storageManager =
         
StorageManagerFactory.getInstance().createStorageManager(shuffleServerConf);
     ShuffleFlushManager manager =
-        new ShuffleFlushManager(shuffleServerConf, "shuffleServerId", 
mockShuffleServer, storageManager);
+        new ShuffleFlushManager(shuffleServerConf, mockShuffleServer, 
storageManager);
     ShuffleDataFlushEvent event = new ShuffleDataFlushEvent(1, "1", 1, 1, 1, 
100, null, null, null);
     assertEquals(0, manager.getPendingEventsSize());
     manager.addPendingEvents(event);
diff --git 
a/server/src/test/java/org/apache/uniffle/server/TestShuffleFlushManager.java 
b/server/src/test/java/org/apache/uniffle/server/TestShuffleFlushManager.java
index 675b765e..3ce5efd0 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/TestShuffleFlushManager.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/TestShuffleFlushManager.java
@@ -24,7 +24,7 @@ import org.apache.uniffle.server.storage.StorageManager;
 public class TestShuffleFlushManager extends ShuffleFlushManager {
   public TestShuffleFlushManager(ShuffleServerConf shuffleServerConf, String 
shuffleServerId,
                                  ShuffleServer shuffleServer, StorageManager 
storageManager) {
-    super(shuffleServerConf, shuffleServerId, shuffleServer, storageManager);
+    super(shuffleServerConf, shuffleServer, storageManager);
   }
 
   @Override
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index a5faffa6..ef456088 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -391,7 +391,7 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
     StorageManager storageManager = 
StorageManagerFactory.getInstance().createStorageManager(conf);
     ShuffleFlushManager shuffleFlushManager = new ShuffleFlushManager(conf,
-        "serverId", mockShuffleServer, storageManager);
+        mockShuffleServer, storageManager);
     shuffleBufferManager = new ShuffleBufferManager(conf, shuffleFlushManager);
 
     when(mockShuffleServer
@@ -464,7 +464,7 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
     StorageManager storageManager = 
StorageManagerFactory.getInstance().createStorageManager(shuffleConf);
     ShuffleFlushManager shuffleFlushManager =
-        new ShuffleFlushManager(shuffleConf, "serverId", mockShuffleServer, 
storageManager);
+        new ShuffleFlushManager(shuffleConf, mockShuffleServer, 
storageManager);
     shuffleBufferManager = new ShuffleBufferManager(shuffleConf, 
shuffleFlushManager);
     ShuffleTaskManager shuffleTaskManager =
         new ShuffleTaskManager(shuffleConf, shuffleFlushManager, 
shuffleBufferManager, storageManager);
@@ -528,7 +528,7 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     ShuffleServer mockShuffleServer = mock(ShuffleServer.class);
     StorageManager storageManager = 
StorageManagerFactory.getInstance().createStorageManager(shuffleConf);
     ShuffleFlushManager shuffleFlushManager =
-            new ShuffleFlushManager(shuffleConf, "serverId", 
mockShuffleServer, storageManager);
+            new ShuffleFlushManager(shuffleConf, mockShuffleServer, 
storageManager);
     shuffleBufferManager = new ShuffleBufferManager(shuffleConf, 
shuffleFlushManager);
 
     when(mockShuffleServer

Reply via email to