Repository: helix Updated Branches: refs/heads/master befb1036f -> 73c3f0ad8
Fix redundant DROPPED message sent to participant It was caused by combination of two Helix logic: 1. Helix caches best possible mapping and wont recompute it unless there are changes to IdealState, LiveInstance, ResourceConfig or InstanceConfig . 2. In message generation, if current state does not exist, Helix will think it is in INITIAL (OFFLINE) state In this case, we have two fixes for that: 1. If we see current state is null and target state is DROPPED, Helix will not send OFFLINE -> DROPPED message anymore. 2. if we see recurrent OFFLINE -> DROPPED message, Helix will clean up the cached best possible mapping for this resource and let it recompute. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1e1cc41e Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1e1cc41e Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1e1cc41e Branch: refs/heads/master Commit: 1e1cc41ea1c2d911ee8d010495121bd73e5fb4d2 Parents: befb103 Author: Junkai Xue <j...@linkedin.com> Authored: Wed Oct 17 16:26:38 2018 -0700 Committer: Junkai Xue <j...@linkedin.com> Committed: Thu Nov 1 17:10:35 2018 -0700 ---------------------------------------------------------------------- .../controller/stages/ClusterDataCache.java | 8 +++ .../stages/MessageGenerationPhase.java | 8 +++ .../controller/TestRedundantDroppedMessage.java | 72 ++++++++++++++++++++ .../paticipant/TestStateTransitionTimeout.java | 2 +- .../TestStateTransitionTimeoutWithResource.java | 2 +- 5 files changed, 90 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/1e1cc41e/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index 6de6d51..08e98cc 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -871,6 +871,14 @@ public class ClusterDataCache extends AbstractDataCache { } /** + * Invalid the cached resourceAssignment (ideal mapping) for a resource + * @param resource + */ + public void invalidCachedIdealStateMapping(String resource) { + _idealMappingCache.remove(resource); + } + + /** * Get cached idealmapping * @return */ http://git-wip-us.apache.org/repos/asf/helix/blob/1e1cc41e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java index b1013d1..c829082 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java @@ -28,6 +28,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixDefinedState; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; import org.apache.helix.SystemPropertyKeys; @@ -150,6 +151,13 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage { currentStateOutput.getCurrentState(resourceName, partition, instanceName); if (currentState == null) { currentState = stateModelDef.getInitialState(); + if (desiredState.equals(HelixDefinedState.DROPPED.name())) { + LogUtil.logDebug(logger, _eventId, String + .format("No current state for partition %s in resource %s, skip the drop message", + partition.getPartitionName(), resourceName)); + cache.invalidCachedIdealStateMapping(resourceName); + continue; + } } Message pendingMessage = http://git-wip-us.apache.org/repos/asf/helix/blob/1e1cc41e/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java new file mode 100644 index 0000000..438ad4f --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java @@ -0,0 +1,72 @@ +package org.apache.helix.integration.controller; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import org.apache.helix.controller.stages.AttributeName; +import org.apache.helix.controller.stages.BestPossibleStateCalcStage; +import org.apache.helix.controller.stages.ClusterDataCache; +import org.apache.helix.controller.stages.ClusterEvent; +import org.apache.helix.controller.stages.ClusterEventType; +import org.apache.helix.controller.stages.CurrentStateComputationStage; +import org.apache.helix.controller.stages.IntermediateStateCalcStage; +import org.apache.helix.controller.stages.MessageGenerationPhase; +import org.apache.helix.controller.stages.MessageOutput; +import org.apache.helix.controller.stages.ResourceComputationStage; +import org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.Partition; +import org.apache.helix.task.TaskSynchronizedTestBase; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestRedundantDroppedMessage extends TaskSynchronizedTestBase { + @BeforeClass + public void beforeClass() throws Exception { + _numNodes = 2; + _numReplicas = 1; + _numDbs = 1; + _numPartitions = 1; + super.beforeClass(); + } + + @Test + public void testNoRedundantDropMessage() throws Exception { + String resourceName = "TEST_RESOURCE"; + _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, resourceName, 1, "MasterSlave", + IdealState.RebalanceMode.CUSTOMIZED.name()); + String partitionName = "P_0"; + ClusterEvent event = new ClusterEvent(CLUSTER_NAME, ClusterEventType.Unknown, "ID"); + ClusterDataCache cache = new ClusterDataCache(CLUSTER_NAME); + cache.refresh(_manager.getHelixDataAccessor()); + IdealState idealState = cache.getIdealState(resourceName); + idealState.setReplicas("2"); + Map<String, String> stateMap = new HashMap<>(); + stateMap.put(_participants[0].getInstanceName(), "SLAVE"); + stateMap.put(_participants[1].getInstanceName(), "DROPPED"); + idealState.setInstanceStateMap(partitionName, stateMap); + + cache.setIdealStates(Arrays.asList(idealState)); + cache.setCachedIdealMapping(idealState.getResourceName(), idealState.getRecord()); + + event.addAttribute(AttributeName.ClusterDataCache.name(), cache); + event.addAttribute(AttributeName.helixmanager.name(), _manager); + + runStage(event, new ResourceComputationStage()); + runStage(event, new CurrentStateComputationStage()); + runStage(event, new BestPossibleStateCalcStage()); + runStage(event, new IntermediateStateCalcStage()); + Assert.assertEquals(cache.getCachedIdealMapping().size(), 1); + runStage(event, new ResourceMessageGenerationPhase()); + + MessageOutput messageOutput = event.getAttribute(AttributeName.MESSAGES_ALL.name()); + Assert + .assertEquals(messageOutput.getMessages(resourceName, new Partition(partitionName)).size(), + 1); + Assert.assertEquals(cache.getCachedIdealMapping().size(), 0); + } + + + +} http://git-wip-us.apache.org/repos/asf/helix/blob/1e1cc41e/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java index 2f562e1..74cf9a2 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java +++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java @@ -160,7 +160,7 @@ public class TestStateTransitionTimeout extends ZkStandAloneCMTestBase { boolean result = ClusterStateVerifier - .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME)); + .verifyByPolling(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME)); Assert.assertTrue(result); HelixDataAccessor accessor = _participants[0].getHelixDataAccessor(); http://git-wip-us.apache.org/repos/asf/helix/blob/1e1cc41e/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java index bf3c84e..cd8f882 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java +++ b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java @@ -189,7 +189,7 @@ public class TestStateTransitionTimeoutWithResource extends ZkStandAloneCMTestBa _gSetupTool.getClusterManagementTool().enableResource(CLUSTER_NAME, TEST_DB + 1, true); boolean result = ClusterStateVerifier - .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME)); + .verifyByPolling(new MasterNbInExtViewVerifier(ZK_ADDR, CLUSTER_NAME)); Assert.assertTrue(result); verify(TEST_DB + 1); }