This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 146adff Fix MaintenanceRecoveryStage Hanging (#1792)
146adff is described below
commit 146adff02e3622fb7ea0f8815312dca7c04323ae
Author: Junkai Xue <[email protected]>
AuthorDate: Thu Jun 10 17:45:03 2021 -0700
Fix MaintenanceRecoveryStage Hanging (#1792)
The mainteance recovery stage should be in the order after
IntermediateStage since it requires the output from IntermediateStage.
---
.../helix/controller/GenericHelixController.java | 7 +--
.../controller/TestClusterMaintenanceMode.java | 55 ++++++++++++----------
2 files changed, 33 insertions(+), 29 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index b8fac92..11cd9ee 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -511,15 +511,16 @@ public class GenericHelixController implements
IdealStateChangeListener, LiveIns
// rebalance pipeline
Pipeline rebalancePipeline = new Pipeline(pipelineName);
rebalancePipeline.addStage(new BestPossibleStateCalcStage());
- // Need to add MaintenanceRecoveryStage here because
MAX_PARTITIONS_PER_INSTANCE check could
- // only occur after IntermediateStateCalcStage calculation
- rebalancePipeline.addStage(new MaintenanceRecoveryStage());
rebalancePipeline.addStage(new MessageGenerationPhase());
rebalancePipeline.addStage(new MessageSelectionStage());
// The IntermediateStateCalcStage should be applied after message
selection
// Messages are throttled already removed by IntermediateStateCalcStage
in MessageSelection output
rebalancePipeline.addStage(new IntermediateStateCalcStage());
rebalancePipeline.addStage(new MessageThrottleStage());
+ // Need to add MaintenanceRecoveryStage here because
MAX_PARTITIONS_PER_INSTANCE check could
+ // only occur after IntermediateStateCalcStage calculation and
MessageThrottleStage. We will have
+ // IntermediateStateCalcStage merged with MessageThrottleStage
eventually.
+ rebalancePipeline.addStage(new MaintenanceRecoveryStage());
rebalancePipeline.addStage(new ResourceMessageDispatchStage());
rebalancePipeline.addStage(new PersistAssignmentStage());
rebalancePipeline.addStage(new TargetExteralViewCalcStage());
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
index 201729e..ebfb03e 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterMaintenanceMode.java
@@ -85,15 +85,14 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
@Test(dependsOnMethods = "testNotInMaintenanceMode")
public void testInMaintenanceMode() {
- _gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME,
true, "Test");
- boolean isInMaintenanceMode =
-
_gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME);
+ _gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME,
true, TestHelper.getTestMethodName());
+ boolean isInMaintenanceMode =
_gSetupTool.getClusterManagementTool().isInMaintenanceMode(CLUSTER_NAME);
Assert.assertTrue(isInMaintenanceMode);
}
@Test(dependsOnMethods = "testInMaintenanceMode")
public void testMaintenanceModeAddNewInstance() {
- _gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME,
true, "Test");
+ _gSetupTool.getClusterManagementTool().enableMaintenanceMode(CLUSTER_NAME,
true, TestHelper.getTestMethodName());
ExternalView prevExternalView = _gSetupTool.getClusterManagementTool()
.getResourceExternalView(CLUSTER_NAME,
WorkflowGenerator.DEFAULT_TGT_DB);
String instanceName = PARTICIPANT_PREFIX + "_" + (_startPort + 10);
@@ -164,7 +163,7 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
* Test that the auto-exit functionality works.
*/
@Test(dependsOnMethods = "testExitMaintenanceModeNewResourceRecovery")
- public void testAutoExitMaintenanceMode() throws InterruptedException {
+ public void testAutoExitMaintenanceMode() throws Exception {
// Set the config for auto-exiting maintenance mode
ClusterConfig clusterConfig =
_manager.getConfigAccessor().getClusterConfig(CLUSTER_NAME);
clusterConfig.setMaxOfflineInstancesAllowed(2);
@@ -175,7 +174,7 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
for (int i = 0; i < 3; i++) {
_participants[i].syncStop();
}
- Thread.sleep(500L);
+ TestHelper.verify(() ->
_dataAccessor.getChildNames(_keyBuilder.liveInstances()).size() == 0, 2000L);
// Check that the cluster is in maintenance
MaintenanceSignal maintenanceSignal =
_dataAccessor.getProperty(_keyBuilder.maintenance());
@@ -187,7 +186,7 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
_participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME,
instanceName);
_participants[i].syncStart();
}
- Thread.sleep(500L);
+ TestHelper.verify(() ->
_dataAccessor.getChildNames(_keyBuilder.liveInstances()).size() == 3, 2000L);
// Check that the cluster is no longer in maintenance (auto-recovered)
maintenanceSignal = _dataAccessor.getProperty(_keyBuilder.maintenance());
@@ -195,7 +194,7 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
}
@Test(dependsOnMethods = "testAutoExitMaintenanceMode")
- public void testNoAutoExitWhenManuallyPutInMaintenance() throws
InterruptedException {
+ public void testNoAutoExitWhenManuallyPutInMaintenance() throws Exception {
// Manually put the cluster in maintenance
_gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME,
true, null,
null);
@@ -204,7 +203,7 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
for (int i = 0; i < 2; i++) {
_participants[i].syncStop();
}
- Thread.sleep(500L);
+ TestHelper.verify(() ->
_dataAccessor.getChildNames(_keyBuilder.liveInstances()).size() == 0, 2000L);
// Now bring up all instances
for (int i = 0; i < 3; i++) {
@@ -212,7 +211,7 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
_participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME,
instanceName);
_participants[i].syncStart();
}
- Thread.sleep(500L);
+ TestHelper.verify(() ->
_dataAccessor.getChildNames(_keyBuilder.liveInstances()).size() == 3, 2000L);
// The cluster should still be in maintenance because it was enabled
manually
MaintenanceSignal maintenanceSignal =
_dataAccessor.getProperty(_keyBuilder.maintenance());
@@ -224,7 +223,7 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
* @throws InterruptedException
*/
@Test(dependsOnMethods = "testNoAutoExitWhenManuallyPutInMaintenance")
- public void testManualEnablingOverridesAutoEnabling() throws
InterruptedException {
+ public void testManualEnablingOverridesAutoEnabling() throws Exception {
// Exit maintenance mode manually
_gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME,
false, null,
null);
@@ -233,7 +232,7 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
for (int i = 0; i < 3; i++) {
_participants[i].syncStop();
}
- Thread.sleep(500L);
+ TestHelper.verify(() ->
_dataAccessor.getChildNames(_keyBuilder.liveInstances()).size() == 0, 2000L);
// Check that maintenance signal was triggered by Controller
MaintenanceSignal maintenanceSignal =
_dataAccessor.getProperty(_keyBuilder.maintenance());
@@ -246,7 +245,7 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
"TRIGGERED_BY", "SHOULD NOT BE RECORDED");
_gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME,
true, null,
customFields);
- Thread.sleep(500L);
+ TestHelper.verify(() ->
_dataAccessor.getProperty(_keyBuilder.maintenance()) != null, 2000L);
// Check that maintenance mode has successfully overwritten with the right
TRIGGERED_BY field
maintenanceSignal = _dataAccessor.getProperty(_keyBuilder.maintenance());
@@ -272,7 +271,7 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
// Manually exit maintenance mode
_gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME,
false, null,
null);
- Thread.sleep(500L);
+ TestHelper.verify(() ->
_dataAccessor.getProperty(_keyBuilder.maintenance()) != null, 2000L);
// Since 3 instances are missing, the cluster should have gone back under
maintenance
// automatically
@@ -289,7 +288,7 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
_participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME,
instanceName);
_participants[i].syncStart();
}
- Thread.sleep(500L);
+ TestHelper.verify(() ->
_dataAccessor.getChildNames(_keyBuilder.liveInstances()).size() == 3, 2000L);
// Check that the cluster exited maintenance
maintenanceSignal = _dataAccessor.getProperty(_keyBuilder.maintenance());
@@ -299,7 +298,7 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
for (int i = 0; i < 3; i++) {
_participants[i].syncStop();
}
- Thread.sleep(500L);
+ TestHelper.verify(() ->
_dataAccessor.getChildNames(_keyBuilder.liveInstances()).size() == 0, 2000L);
// Check that cluster is back under maintenance
maintenanceSignal = _dataAccessor.getProperty(_keyBuilder.maintenance());
@@ -315,7 +314,10 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
// automatically because the instances currently have more than 1
clusterConfig.setMaxPartitionsPerInstance(1);
_manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
- Thread.sleep(500L);
+ TestHelper.verify(
+ () -> ((ClusterConfig)
_dataAccessor.getProperty(_keyBuilder.clusterConfig())).getMaxPartitionsPerInstance()
== 1,
+ 2000L);
+
// Now bring up all instances
for (int i = 0; i < 3; i++) {
@@ -323,7 +325,7 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
_participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME,
instanceName);
_participants[i].syncStart();
}
- Thread.sleep(500L);
+ TestHelper.verify(() ->
_dataAccessor.getChildNames(_keyBuilder.liveInstances()).size() == 3, 2000L);
// Check that the cluster is still in maintenance (should not have
auto-exited because it would
// fail the MaxPartitionsPerInstance check)
@@ -383,7 +385,7 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
* @throws InterruptedException
*/
@Test(dependsOnMethods = "testMaxPartitionLimit")
- public void testMaintenanceHistory() throws InterruptedException,
IOException {
+ public void testMaintenanceHistory() throws Exception {
// In maintenance mode, by controller, for
MAX_PARTITION_PER_INSTANCE_EXCEEDED
ControllerHistory history =
_dataAccessor.getProperty(_keyBuilder.controllerLeaderHistory());
Map<String, String> lastHistoryEntry = convertStringToMap(
@@ -400,7 +402,7 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
clusterConfig.setMaxPartitionsPerInstance(-1);
_manager.getConfigAccessor().setClusterConfig(CLUSTER_NAME, clusterConfig);
- Thread.sleep(500L);
+ TestHelper.verify(() ->
_dataAccessor.getProperty(_keyBuilder.maintenance()) == null, 2000L);
// Now check that the cluster exited maintenance
// EXIT, CONTROLLER, for MAX_PARTITION_PER_INSTANCE_EXCEEDED
@@ -414,16 +416,17 @@ public class TestClusterMaintenanceMode extends
TaskTestBase {
// Manually put the cluster in maintenance with a custom field
Map<String, String> customFieldMap = ImmutableMap.of("k1", "v1", "k2",
"v2");
-
_gSetupTool.getClusterManagementTool().manuallyEnableMaintenanceMode(CLUSTER_NAME,
true, "TEST",
- customFieldMap);
- Thread.sleep(500L);
+ _gSetupTool.getClusterManagementTool()
+ .manuallyEnableMaintenanceMode(CLUSTER_NAME, true,
TestHelper.getTestMethodName(), customFieldMap);
+ TestHelper.verify(() ->
_dataAccessor.getProperty(_keyBuilder.maintenance()) != null, 2000L);
+
// ENTER, USER, for reason TEST, no internalReason
history = _dataAccessor.getProperty(_keyBuilder.controllerLeaderHistory());
- lastHistoryEntry = convertStringToMap(
-
history.getMaintenanceHistoryList().get(history.getMaintenanceHistoryList().size()
- 1));
+ lastHistoryEntry =
+
convertStringToMap(history.getMaintenanceHistoryList().get(history.getMaintenanceHistoryList().size()
- 1));
Assert.assertEquals(lastHistoryEntry.get("OPERATION_TYPE"), "ENTER");
Assert.assertEquals(lastHistoryEntry.get("TRIGGERED_BY"), "USER");
- Assert.assertEquals(lastHistoryEntry.get("REASON"), "TEST");
+ Assert.assertEquals(lastHistoryEntry.get("REASON"),
TestHelper.getTestMethodName());
Assert.assertNull(lastHistoryEntry.get("AUTO_TRIGGER_REASON"));
}