Repository: helix
Updated Branches:
  refs/heads/master befb1036f -> 73c3f0ad8


Fix redundant DROPPED message sent to participant

It was caused by combination of two Helix logic:

1. Helix caches best possible mapping and wont recompute it unless there are 
changes to IdealState, LiveInstance, ResourceConfig or InstanceConfig .
2. In message generation, if current state does not exist, Helix will think it 
is in INITIAL (OFFLINE) state

In this case, we have two fixes for that:

1. If we see current state is null and target state is DROPPED, Helix will not 
send OFFLINE -> DROPPED message anymore.
2. if we see recurrent OFFLINE -> DROPPED message, Helix will clean up the 
cached best possible mapping for this resource and let it recompute.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1e1cc41e
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1e1cc41e
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1e1cc41e

Branch: refs/heads/master
Commit: 1e1cc41ea1c2d911ee8d010495121bd73e5fb4d2
Parents: befb103
Author: Junkai Xue <j...@linkedin.com>
Authored: Wed Oct 17 16:26:38 2018 -0700
Committer: Junkai Xue <j...@linkedin.com>
Committed: Thu Nov 1 17:10:35 2018 -0700

----------------------------------------------------------------------
 .../controller/stages/ClusterDataCache.java     |  8 +++
 .../stages/MessageGenerationPhase.java          |  8 +++
 .../controller/TestRedundantDroppedMessage.java | 72 ++++++++++++++++++++
 .../paticipant/TestStateTransitionTimeout.java  |  2 +-
 .../TestStateTransitionTimeoutWithResource.java |  2 +-
 5 files changed, 90 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1e1cc41e/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 6de6d51..08e98cc 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -871,6 +871,14 @@ public class ClusterDataCache extends AbstractDataCache {
   }
 
   /**
+   * Invalid the cached resourceAssignment (ideal mapping) for a resource
+   * @param resource
+   */
+  public void invalidCachedIdealStateMapping(String resource) {
+    _idealMappingCache.remove(resource);
+  }
+
+  /**
    * Get cached idealmapping
    * @return
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/1e1cc41e/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index b1013d1..c829082 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -28,6 +28,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.SystemPropertyKeys;
@@ -150,6 +151,13 @@ public abstract class MessageGenerationPhase extends 
AbstractBaseStage {
             currentStateOutput.getCurrentState(resourceName, partition, 
instanceName);
         if (currentState == null) {
           currentState = stateModelDef.getInitialState();
+          if (desiredState.equals(HelixDefinedState.DROPPED.name())) {
+            LogUtil.logDebug(logger, _eventId, String
+                .format("No current state for partition %s in resource %s, 
skip the drop message",
+                    partition.getPartitionName(), resourceName));
+            cache.invalidCachedIdealStateMapping(resourceName);
+            continue;
+          }
         }
 
         Message pendingMessage =

http://git-wip-us.apache.org/repos/asf/helix/blob/1e1cc41e/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
new file mode 100644
index 0000000..438ad4f
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/controller/TestRedundantDroppedMessage.java
@@ -0,0 +1,72 @@
+package org.apache.helix.integration.controller;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.IntermediateStateCalcStage;
+import org.apache.helix.controller.stages.MessageGenerationPhase;
+import org.apache.helix.controller.stages.MessageOutput;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import 
org.apache.helix.controller.stages.resource.ResourceMessageGenerationPhase;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.task.TaskSynchronizedTestBase;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestRedundantDroppedMessage extends TaskSynchronizedTestBase {
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    _numNodes = 2;
+    _numReplicas = 1;
+    _numDbs = 1;
+    _numPartitions = 1;
+    super.beforeClass();
+  }
+
+  @Test
+  public void testNoRedundantDropMessage() throws Exception {
+    String resourceName = "TEST_RESOURCE";
+    _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, 
resourceName, 1, "MasterSlave",
+        IdealState.RebalanceMode.CUSTOMIZED.name());
+    String partitionName = "P_0";
+    ClusterEvent event = new ClusterEvent(CLUSTER_NAME, 
ClusterEventType.Unknown, "ID");
+    ClusterDataCache cache = new ClusterDataCache(CLUSTER_NAME);
+    cache.refresh(_manager.getHelixDataAccessor());
+    IdealState idealState = cache.getIdealState(resourceName);
+    idealState.setReplicas("2");
+    Map<String, String> stateMap = new HashMap<>();
+    stateMap.put(_participants[0].getInstanceName(), "SLAVE");
+    stateMap.put(_participants[1].getInstanceName(), "DROPPED");
+    idealState.setInstanceStateMap(partitionName, stateMap);
+
+    cache.setIdealStates(Arrays.asList(idealState));
+    cache.setCachedIdealMapping(idealState.getResourceName(), 
idealState.getRecord());
+
+    event.addAttribute(AttributeName.ClusterDataCache.name(), cache);
+    event.addAttribute(AttributeName.helixmanager.name(), _manager);
+
+    runStage(event, new ResourceComputationStage());
+    runStage(event, new CurrentStateComputationStage());
+    runStage(event, new BestPossibleStateCalcStage());
+    runStage(event, new IntermediateStateCalcStage());
+    Assert.assertEquals(cache.getCachedIdealMapping().size(), 1);
+    runStage(event, new ResourceMessageGenerationPhase());
+
+    MessageOutput messageOutput = 
event.getAttribute(AttributeName.MESSAGES_ALL.name());
+    Assert
+        .assertEquals(messageOutput.getMessages(resourceName, new 
Partition(partitionName)).size(),
+            1);
+    Assert.assertEquals(cache.getCachedIdealMapping().size(), 0);
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/1e1cc41e/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
 
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
index 2f562e1..74cf9a2 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeout.java
@@ -160,7 +160,7 @@ public class TestStateTransitionTimeout extends 
ZkStandAloneCMTestBase {
 
     boolean result =
         ClusterStateVerifier
-            .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, 
CLUSTER_NAME));
+            .verifyByPolling(new MasterNbInExtViewVerifier(ZK_ADDR, 
CLUSTER_NAME));
     Assert.assertTrue(result);
     HelixDataAccessor accessor = _participants[0].getHelixDataAccessor();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/1e1cc41e/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
 
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
index bf3c84e..cd8f882 100644
--- 
a/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/paticipant/TestStateTransitionTimeoutWithResource.java
@@ -189,7 +189,7 @@ public class TestStateTransitionTimeoutWithResource extends 
ZkStandAloneCMTestBa
     _gSetupTool.getClusterManagementTool().enableResource(CLUSTER_NAME, 
TEST_DB + 1, true);
     boolean result =
         ClusterStateVerifier
-            .verifyByZkCallback(new MasterNbInExtViewVerifier(ZK_ADDR, 
CLUSTER_NAME));
+            .verifyByPolling(new MasterNbInExtViewVerifier(ZK_ADDR, 
CLUSTER_NAME));
     Assert.assertTrue(result);
     verify(TEST_DB + 1);
   }

Reply via email to