This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 5875eb37 Introduce startup-silent-period mechanism to avoid partial
assignments (#247)
5875eb37 is described below
commit 5875eb37e8e038d72afd7483c4d4807d05d9e08a
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Oct 10 11:28:00 2022 +0800
Introduce startup-silent-period mechanism to avoid partial assignments
(#247)
### What changes were proposed in this pull request?
Introduce startup-silent-period mechanism to avoid partial assignments
### Why are the changes needed?
When changing some coordinator's conf and then restart, coordinator will
accept client getAssignment request immediately, but it will serve for jobs
request based on the partial registered shuffle-servers, which will make some
jobs gotten not enough required shuffle-servers and then slow the running speed.
I think we should make coordinator wait for more than one shuffle-server
heartbeat interval before serving for client. During out-of-service, requests
from client will fallback to slave coordinator.
### Does this PR introduce _any_ user-facing change?
Yes
### How was this patch tested?
UTs
---
.../apache/uniffle/coordinator/ClusterManager.java | 5 +++
.../uniffle/coordinator/CoordinatorConf.java | 13 ++++++
.../coordinator/CoordinatorGrpcService.java | 4 ++
.../uniffle/coordinator/SimpleClusterManager.java | 38 +++++++++++++++++
.../coordinator/SimpleClusterManagerTest.java | 13 ++++++
docs/coordinator_guide.md | 8 ++--
...berTest.java => CoordinatorAssignmentTest.java} | 49 +++++++++++++++++++---
.../apache/uniffle/test/IntegrationTestBase.java | 1 +
8 files changed, 122 insertions(+), 9 deletions(-)
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
index b8005c0a..8db4abc5 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ClusterManager.java
@@ -49,4 +49,9 @@ public interface ClusterManager extends Closeable {
List<ServerNode> list();
int getShuffleNodesMax();
+
+ /**
+ * @return whether to be ready for serving
+ */
+ boolean isReadyForServe();
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index b343ac64..70408cb5 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -159,6 +159,19 @@ public class CoordinatorConf extends RssBaseConf {
.enumType(AbstractAssignmentStrategy.HostAssignmentStrategy.class)
.defaultValue(AbstractAssignmentStrategy.HostAssignmentStrategy.PREFER_DIFF)
.withDescription("Strategy for selecting shuffle servers");
+ public static final ConfigOption<Boolean>
COORDINATOR_START_SILENT_PERIOD_ENABLED = ConfigOptions
+ .key("rss.coordinator.startup-silent-period.enabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Enable the startup-silent-period to reject the
assignment requests "
+ + "for avoiding partial assignments. To avoid service interruption,
this mechanism is disabled by default. "
+ + "Especially it's recommended to use in coordinator HA mode when
restarting single coordinator.");
+ public static final ConfigOption<Long>
COORDINATOR_START_SILENT_PERIOD_DURATION = ConfigOptions
+ .key("rss.coordinator.startup-silent-period.duration")
+ .longType()
+ .defaultValue(20 * 1000L)
+ .withDescription("The waiting duration(ms) when conf of "
+ + COORDINATOR_START_SILENT_PERIOD_ENABLED + " is enabled.");
public CoordinatorConf() {
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
index d2c3fc28..987c85a9 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorGrpcService.java
@@ -120,6 +120,10 @@ public class CoordinatorGrpcService extends
CoordinatorServerGrpc.CoordinatorSer
GetShuffleAssignmentsResponse response;
try {
+ if (!coordinatorServer.getClusterManager().isReadyForServe()) {
+ throw new Exception("Coordinator is out-of-service when in starting.");
+ }
+
final PartitionRangeAssignment pra =
coordinatorServer
.getAssignmentStrategy()
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index c60a0d8d..cc9b553e 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -65,6 +65,11 @@ public class SimpleClusterManager implements ClusterManager {
private long outputAliveServerCount = 0;
private final long periodicOutputIntervalTimes;
+ private long startTime;
+ private boolean startupSilentPeriodEnabled;
+ private long startupSilentPeriodDurationMs;
+ private boolean readyForServe = false;
+
public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf)
throws Exception {
this.shuffleNodesMax =
conf.getInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
this.heartbeatTimeout =
conf.getLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT);
@@ -72,6 +77,9 @@ public class SimpleClusterManager implements ClusterManager {
scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.getThreadFactory("SimpleClusterManager-%d"));
+ this.startupSilentPeriodEnabled =
conf.get(CoordinatorConf.COORDINATOR_START_SILENT_PERIOD_ENABLED);
+ this.startupSilentPeriodDurationMs =
conf.get(CoordinatorConf.COORDINATOR_START_SILENT_PERIOD_DURATION);
+
periodicOutputIntervalTimes =
conf.get(CoordinatorConf.COORDINATOR_NODES_PERIODIC_OUTPUT_INTERVAL_TIMES);
scheduledExecutorService.scheduleAtFixedRate(
() -> nodesCheck(), heartbeatTimeout / 3,
@@ -86,6 +94,8 @@ public class SimpleClusterManager implements ClusterManager {
checkNodesExecutorService.scheduleAtFixedRate(
() -> updateExcludeNodes(excludeNodesPath), updateNodesInterval,
updateNodesInterval, TimeUnit.MILLISECONDS);
}
+
+ this.startTime = System.currentTimeMillis();
}
void nodesCheck() {
@@ -224,6 +234,19 @@ public class SimpleClusterManager implements
ClusterManager {
return shuffleNodesMax;
}
+ @Override
+ public boolean isReadyForServe() {
+ if (!startupSilentPeriodEnabled) {
+ return true;
+ }
+
+ if (!readyForServe && System.currentTimeMillis() - startTime >
startupSilentPeriodDurationMs) {
+ readyForServe = true;
+ }
+
+ return readyForServe;
+ }
+
@Override
public void close() throws IOException {
if (hadoopFileSystem != null) {
@@ -238,4 +261,19 @@ public class SimpleClusterManager implements
ClusterManager {
checkNodesExecutorService.shutdown();
}
}
+
+ @VisibleForTesting
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ @VisibleForTesting
+ public void setReadyForServe(boolean readyForServe) {
+ this.readyForServe = readyForServe;
+ }
+
+ @VisibleForTesting
+ public void setStartupSilentPeriodEnabled(boolean
startupSilentPeriodEnabled) {
+ this.startupSilentPeriodEnabled = startupSilentPeriodEnabled;
+ }
}
diff --git
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
index be3fe2ce..df73bfa2 100644
---
a/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
+++
b/coordinator/src/test/java/org/apache/uniffle/coordinator/SimpleClusterManagerTest.java
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class SimpleClusterManagerTest {
@@ -56,6 +57,18 @@ public class SimpleClusterManagerTest {
CoordinatorMetrics.clear();
}
+ @Test
+ public void startupSilentPeriodTest() throws Exception {
+ CoordinatorConf coordinatorConf = new CoordinatorConf();
+
coordinatorConf.set(CoordinatorConf.COORDINATOR_START_SILENT_PERIOD_ENABLED,
true);
+
coordinatorConf.set(CoordinatorConf.COORDINATOR_START_SILENT_PERIOD_DURATION,
20 * 1000L);
+ SimpleClusterManager manager = new SimpleClusterManager(coordinatorConf,
new Configuration());
+ assertFalse(manager.isReadyForServe());
+
+ manager.setStartTime(System.currentTimeMillis() - 30 * 1000L);
+ assertTrue(manager.isReadyForServe());
+ }
+
@Test
public void getServerListTest() throws Exception {
CoordinatorConf ssc = new CoordinatorConf();
diff --git a/docs/coordinator_guide.md b/docs/coordinator_guide.md
index 79f9209d..13a21906 100644
--- a/docs/coordinator_guide.md
+++ b/docs/coordinator_guide.md
@@ -96,9 +96,11 @@ This document will introduce how to deploy Uniffle
coordinators.
|rss.rpc.server.port|-|RPC port for coordinator|
|rss.jetty.http.port|-|Http port for coordinator|
|rss.coordinator.remote.storage.select.strategy|APP_BALANCE|Strategy for
selecting the remote path|
-|rss.coordinator.remote.storage.schedule.time|60000|The time of scheduling the
read and write time of the paths to obtain different HDFS|
-|rss.coordinator.remote.storage.schedule.file.size|204800000|The size of the
file that the scheduled thread reads and writes|
-|rss.coordinator.remote.storage.schedule.access.times|3|The number of times to
read and write HDFS files|
+|rss.coordinator.remote.storage.io.sample.schedule.time|60000|The time of
scheduling the read and write time of the paths to obtain different HDFS|
+|rss.coordinator.remote.storage.io.sample.file.size|204800000|The size of the
file that the scheduled thread reads and writes|
+|rss.coordinator.remote.storage.io.sample.access.times|3|The number of times
to read and write HDFS files|
+|rss.coordinator.startup-silent-period.enabled|false|Enable the
startup-silent-period to reject the assignment requests for avoiding partial
assignments. To avoid service interruption, this mechanism is disabled by
default. Especially it's recommended to use in coordinator HA mode when
restarting single coordinator.|
+|rss.coordinator.startup-silent-period.duration|20000|The waiting duration(ms)
when conf of rss.coordinator.startup-silent-period.enabled is enabled.|
### AccessClusterLoadChecker settings
|Property Name|Default| Description|
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentServerNodesNumberTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
similarity index 66%
rename from
integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentServerNodesNumberTest.java
rename to
integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
index 2ed3e45f..c45c2e19 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentServerNodesNumberTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
@@ -34,23 +34,34 @@ import org.apache.uniffle.client.util.ClientType;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.coordinator.CoordinatorConf;
+import org.apache.uniffle.coordinator.SimpleClusterManager;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class AssignmentServerNodesNumberTest extends CoordinatorTestBase {
- private static final Logger LOG =
LoggerFactory.getLogger(AssignmentServerNodesNumberTest.class);
+public class CoordinatorAssignmentTest extends CoordinatorTestBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(CoordinatorAssignmentTest.class);
private static final int SHUFFLE_NODES_MAX = 10;
private static final int SERVER_NUM = 10;
private static final HashSet<String> TAGS = Sets.newHashSet("t1");
+ private static final String QUORUM =
+ LOCALHOST + ":" + COORDINATOR_PORT_1 + "," + LOCALHOST + ":" +
COORDINATOR_PORT_2;
+
@BeforeAll
public static void setupServers() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
- coordinatorConf.setLong(CoordinatorConf.COORDINATOR_APP_EXPIRED, 2000);
- coordinatorConf.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX,
SHUFFLE_NODES_MAX);
- createCoordinatorServer(coordinatorConf);
+ CoordinatorConf coordinatorConf1 = getCoordinatorConf();
+ coordinatorConf1.setLong(CoordinatorConf.COORDINATOR_APP_EXPIRED, 2000);
+ coordinatorConf1.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX,
SHUFFLE_NODES_MAX);
+ createCoordinatorServer(coordinatorConf1);
+
+ CoordinatorConf coordinatorConf2 = getCoordinatorConf();
+ coordinatorConf2.setLong(CoordinatorConf.COORDINATOR_APP_EXPIRED, 2000);
+ coordinatorConf2.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX,
SHUFFLE_NODES_MAX);
+ coordinatorConf2.setInteger(CoordinatorConf.RPC_SERVER_PORT,
COORDINATOR_PORT_2);
+ coordinatorConf2.setInteger(CoordinatorConf.JETTY_HTTP_PORT, JETTY_PORT_2);
+ createCoordinatorServer(coordinatorConf2);
for (int i = 0; i < SERVER_NUM; i++) {
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
@@ -65,6 +76,7 @@ public class AssignmentServerNodesNumberTest extends
CoordinatorTestBase {
shuffleServerConf.setInteger(RssBaseConf.RPC_SERVER_PORT, 18001 + i);
shuffleServerConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, 19010 + i);
shuffleServerConf.set(ShuffleServerConf.TAGS, new ArrayList<>(TAGS));
+ shuffleServerConf.setString("rss.coordinator.quorum", QUORUM);
createShuffleServer(shuffleServerConf);
}
startServers();
@@ -72,6 +84,31 @@ public class AssignmentServerNodesNumberTest extends
CoordinatorTestBase {
Thread.sleep(1000 * 5);
}
+ @Test
+ public void testSilentPeriod() throws Exception {
+ ShuffleWriteClientImpl shuffleWriteClient = new
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
+ 1, 1, 1, true, 1, 1);
+ shuffleWriteClient.registerCoordinators(QUORUM);
+
+ // Case1: Disable silent period
+ ShuffleAssignmentsInfo info =
shuffleWriteClient.getShuffleAssignments("app1", 0, 10, 1, TAGS, -1);
+ assertEquals(SHUFFLE_NODES_MAX,
info.getServerToPartitionRanges().keySet().size());
+
+ // Case2: Enable silent period mechanism, it should fallback to slave
coordinator.
+ SimpleClusterManager clusterManager = (SimpleClusterManager)
coordinators.get(0).getClusterManager();
+ clusterManager.setReadyForServe(false);
+ clusterManager.setStartupSilentPeriodEnabled(true);
+ clusterManager.setStartTime(System.currentTimeMillis() - 1);
+
+ if (clusterManager.getNodesNum() < 10) {
+ info = shuffleWriteClient.getShuffleAssignments("app1", 0, 10, 1, TAGS,
-1);
+ assertEquals(SHUFFLE_NODES_MAX,
info.getServerToPartitionRanges().keySet().size());
+ }
+
+ // recover
+ clusterManager.setReadyForServe(true);
+ }
+
@Test
public void testAssignmentServerNodesNumber() throws Exception {
ShuffleWriteClientImpl shuffleWriteClient = new
ShuffleWriteClientImpl(ClientType.GRPC.name(), 3, 1000, 1,
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
index dff9848b..06e3be7b 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
@@ -44,6 +44,7 @@ public abstract class IntegrationTestBase extends
HdfsTestBase {
protected static final int COORDINATOR_PORT_1 = 19999;
protected static final int COORDINATOR_PORT_2 = 20030;
protected static final int JETTY_PORT_1 = 19998;
+ protected static final int JETTY_PORT_2 = 20040;
protected static final String COORDINATOR_QUORUM = LOCALHOST + ":" +
COORDINATOR_PORT_1;
protected static List<ShuffleServer> shuffleServers = Lists.newArrayList();