This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new ff55219 Add stale message map to improve P2P message rate (#1124)
ff55219 is described below
commit ff55219f815e67aab39c60d514da53060f6a8cea
Author: Meng Zhang <[email protected]>
AuthorDate: Wed Jul 22 11:41:56 2020 -0700
Add stale message map to improve P2P message rate (#1124)
Add stale message cache to keep processed but not deleted messages.
Controller will not need to wait for these messages tp be gone before sending
more messages to participants.
This change will improve the rate of P2P messages and help reduce
mastership handoff latency.
---
.../helix/common/caches/InstanceMessagesCache.java | 46 +++++++++++++++++++
.../dataproviders/BaseControllerDataProvider.java | 18 ++++++++
.../stages/CurrentStateComputationStage.java | 50 +++++++++++++++++----
.../controller/stages/CurrentStateOutput.java | 2 +-
.../controller/stages/MessageGenerationPhase.java | 51 ++++++++++++++++------
.../helix/spectator/RoutingTableProvider.java | 2 +-
.../stages/TestCurrentStateComputationStage.java | 20 ++++++++-
.../controller/stages/TestRebalancePipeline.java | 48 +++++++++++++++++---
.../messaging/TestP2PNoDuplicatedMessage.java | 10 ++---
9 files changed, 208 insertions(+), 39 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
index 0740966..1bd3107 100644
---
a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
+++
b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -29,6 +29,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -55,6 +56,10 @@ public class InstanceMessagesCache {
// <instance -> {<MessageId, Message>}>
private Map<String, Map<String, Message>> _messageCache = Maps.newHashMap();
+ // maintain a cache of stale messages
+ // <instance -> {<MessageId, Message>}>
+ private Map<String, Map<String, Message>> _staleMessageCache =
Maps.newHashMap();
+
// maintain a set of valid pending P2P messages.
// <instance -> {<MessageId, Message>}>
private Map<String, Map<String, Message>> _relayMessageCache =
Maps.newHashMap();
@@ -148,9 +153,29 @@ public class InstanceMessagesCache {
LOG.info(
"END: InstanceMessagesCache.refresh(), {} of Messages read from
ZooKeeper. took {} ms. ",
newMessageKeys.size(), (System.currentTimeMillis() - startTime));
+
+ refreshStaleMessageCache();
return true;
}
+ @VisibleForTesting
+ public Map<String, Map<String, Message>> getStaleMessageCache() {
+ return _staleMessageCache;
+ }
+
+ public Set<Message> getStaleMessagesByInstance(String instanceName) {
+ Map<String, Message> staleMessageMap =
_staleMessageCache.get(instanceName);
+ if (staleMessageMap != null) {
+ return new HashSet<>(staleMessageMap.values());
+ }
+ return Collections.emptySet();
+ }
+
+ public void addStaleMessage(String instanceName, Message staleMessage) {
+ _staleMessageCache.putIfAbsent(instanceName, new HashMap<>());
+ _staleMessageCache.get(instanceName).putIfAbsent(staleMessage.getMsgId(),
staleMessage);
+ }
+
/**
* Refresh relay message cache by updating relay messages read from ZK, and
remove all expired relay messages.
*/
@@ -516,6 +541,27 @@ public class InstanceMessagesCache {
_relayHostMessageCache.put(relayMessage.getMsgId(), hostMessage);
}
+ // filter stale message cache by message cache to remove already deleted
messages
+ private void refreshStaleMessageCache() {
+ LOG.info("Start to refresh stale message cache");
+ Map<String, Set<String>> toRemoveMessages = new HashMap<>();
+ for (String instanceName : _staleMessageCache.keySet()) {
+ for (String messageId : _staleMessageCache.get(instanceName).keySet()) {
+ if (!_messageCache.getOrDefault(instanceName, Collections.emptyMap())
+ .containsKey(messageId)) {
+ toRemoveMessages.computeIfAbsent(instanceName, k -> new
HashSet<>()).add(messageId);
+ }
+ }
+ }
+
+ toRemoveMessages.entrySet().stream().forEach(entry -> {
+ entry.getValue().stream().forEach(id ->
_staleMessageCache.get(entry.getKey()).remove(id));
+ if (_staleMessageCache.get(entry.getKey()).size() == 0) {
+ _staleMessageCache.remove(entry.getKey());
+ }
+ });
+ }
+
@Override public String toString() {
return "InstanceMessagesCache{" +
"_messageMap=" + _messageMap +
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
index e2f2ac2..3031a0d 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/BaseControllerDataProvider.java
@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
@@ -545,6 +546,23 @@ public class BaseControllerDataProvider implements
ControlContextProvider {
}
/**
+ * This function is supposed to be only used by testing purpose for safety.
For "get" usage,
+ * please use getStaleMessagesByInstance.
+ */
+ @VisibleForTesting
+ public Map<String, Map<String, Message>> getStaleMessages() {
+ return _instanceMessagesCache.getStaleMessageCache();
+ }
+
+ public Set<Message> getStaleMessagesByInstance(String instanceName) {
+ return _instanceMessagesCache.getStaleMessagesByInstance(instanceName);
+ }
+
+ public void addStaleMessage(String instanceName, Message staleMessage) {
+ _instanceMessagesCache.addStaleMessage(instanceName, staleMessage);
+ }
+
+ /**
* Provides a list of current outstanding pending relay messages on a given
instance.
* @param instanceName
* @return
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 62fda33..2d883d9 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -23,12 +23,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import
org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.rebalancer.util.ResourceUsageCalculator;
@@ -56,11 +58,19 @@ import org.slf4j.LoggerFactory;
*/
public class CurrentStateComputationStage extends AbstractBaseStage {
private static Logger LOG =
LoggerFactory.getLogger(CurrentStateComputationStage.class);
+ private boolean _isTaskFrameworkPipeline = false;
@Override
public void process(ClusterEvent event) throws Exception {
_eventId = event.getEventId();
BaseControllerDataProvider cache =
event.getAttribute(AttributeName.ControllerDataProvider.name());
+
+ // TODO: remove the explicit checking of type, since this could
potentially complicates
+ // pipeline separation
+ if (cache instanceof WorkflowControllerDataProvider) {
+ _isTaskFrameworkPipeline = true;
+ }
+
final Map<String, Resource> resourceMap =
event.getAttribute(AttributeName.RESOURCES.name());
final Map<String, Resource> resourceToRebalance =
event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
@@ -77,15 +87,17 @@ public class CurrentStateComputationStage extends
AbstractBaseStage {
String instanceName = instance.getInstanceName();
String instanceSessionId = instance.getEphemeralOwner();
- // update pending messages
- Map<String, Message> messages = cache.getMessages(instanceName);
- Map<String, Message> relayMessages =
cache.getRelayMessages(instanceName);
- updatePendingMessages(instance, messages.values(), currentStateOutput,
relayMessages.values(), resourceMap);
-
// update current states.
Map<String, CurrentState> currentStateMap =
cache.getCurrentState(instanceName,
instanceSessionId);
updateCurrentStates(instance, currentStateMap.values(),
currentStateOutput, resourceMap);
+
+ Set<Message> existingStaleMessages =
cache.getStaleMessagesByInstance(instanceName);
+ // update pending messages
+ Map<String, Message> messages = cache.getMessages(instanceName);
+ Map<String, Message> relayMessages =
cache.getRelayMessages(instanceName);
+ updatePendingMessages(instance, cache, messages.values(),
relayMessages.values(),
+ existingStaleMessages, currentStateOutput, resourceMap);
}
event.addAttribute(AttributeName.CURRENT_STATE.name(), currentStateOutput);
@@ -101,16 +113,22 @@ public class CurrentStateComputationStage extends
AbstractBaseStage {
}
// update all pending messages to CurrentStateOutput.
- private void updatePendingMessages(LiveInstance instance,
Collection<Message> pendingMessages,
- CurrentStateOutput currentStateOutput, Collection<Message>
pendingRelayMessages,
+ private void updatePendingMessages(LiveInstance instance,
BaseControllerDataProvider cache,
+ Collection<Message> pendingMessages, Collection<Message>
pendingRelayMessages,
+ Set<Message> existingStaleMessages, CurrentStateOutput
currentStateOutput,
Map<String, Resource> resourceMap) {
String instanceName = instance.getInstanceName();
String instanceSessionId = instance.getEphemeralOwner();
// update all pending messages
for (Message message : pendingMessages) {
+ // ignore existing stale messages
+ if (existingStaleMessages.contains(message)) {
+ continue;
+ }
if
(!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())
- &&
!MessageType.STATE_TRANSITION_CANCELLATION.name().equalsIgnoreCase(message.getMsgType()))
{
+ && !MessageType.STATE_TRANSITION_CANCELLATION.name()
+ .equalsIgnoreCase(message.getMsgType())) {
continue;
}
if (!instanceSessionId.equals(message.getTgtSessionId())) {
@@ -129,7 +147,13 @@ public class CurrentStateComputationStage extends
AbstractBaseStage {
String partitionName = message.getPartitionName();
Partition partition = resource.getPartition(partitionName);
if (partition != null) {
- setMessageState(currentStateOutput, resourceName, partition,
instanceName, message);
+ String currentState =
currentStateOutput.getCurrentState(resourceName, partition,
+ instanceName);
+ if (_isTaskFrameworkPipeline || !isStaleMessage(message,
currentState)) {
+ setMessageState(currentStateOutput, resourceName, partition,
instanceName, message);
+ } else {
+ cache.addStaleMessage(instanceName, message);
+ }
} else {
LogUtil.logInfo(LOG, _eventId, String
.format("Ignore a pending message %s for a non-exist resource %s
and partition %s",
@@ -191,6 +215,14 @@ public class CurrentStateComputationStage extends
AbstractBaseStage {
}
}
+ private boolean isStaleMessage(Message message, String currentState) {
+ if (currentState == null || message.getFromState() == null ||
message.getToState() == null) {
+ return false;
+ }
+ return !message.getFromState().equalsIgnoreCase(currentState) ||
message.getToState()
+ .equalsIgnoreCase(currentState);
+ }
+
// update current states in CurrentStateOutput
private void updateCurrentStates(LiveInstance instance,
Collection<CurrentState> currentStates,
CurrentStateOutput currentStateOutput, Map<String, Resource>
resourceMap) {
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
index 752a760..26c4eed 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateOutput.java
@@ -193,7 +193,7 @@ public class CurrentStateOutput {
}
return null;
}
-
+
public Long getEndTime(String resourceName, Partition partition, String
instanceName) {
Map<Partition, Map<String, Long>> partitionInfo =
_currentStateEndTimeMap.get(resourceName);
if (partitionInfo != null) {
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
index 4223c37..85a2958 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
@@ -67,6 +68,8 @@ public abstract class MessageGenerationPhase extends
AbstractBaseStage {
// transition
public final static long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil
.getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60
* 1000);
+ private final static String PENDING_MESSAGE = "pending message";
+ private final static String STALE_MESSAGE = "stale message";
private static Logger logger =
LoggerFactory.getLogger(MessageGenerationPhase.class);
@@ -77,8 +80,9 @@ public abstract class MessageGenerationPhase extends
AbstractBaseStage {
BaseControllerDataProvider cache =
event.getAttribute(AttributeName.ControllerDataProvider.name());
Map<String, Resource> resourceMap =
event.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
- Map<String, Map<String, Message>> pendingMessagesToCleanUp = new
HashMap<>();
CurrentStateOutput currentStateOutput =
event.getAttribute(AttributeName.CURRENT_STATE.name());
+
+ Map<String, Map<String, Message>> messagesToCleanUp = new HashMap<>();
if (manager == null || cache == null || resourceMap == null ||
currentStateOutput == null
|| resourcesStateMap == null) {
throw new StageException("Missing attributes in event:" + event
@@ -96,7 +100,7 @@ public abstract class MessageGenerationPhase extends
AbstractBaseStage {
for (Resource resource : resourceMap.values()) {
try {
generateMessage(resource, cache, resourcesStateMap,
currentStateOutput, manager,
- sessionIdMap, event.getEventType(), output,
pendingMessagesToCleanUp);
+ sessionIdMap, event.getEventType(), output, messagesToCleanUp);
} catch (HelixException ex) {
LogUtil.logError(logger, _eventId,
"Failed to generate message for resource " +
resource.getResourceName(), ex);
@@ -104,8 +108,8 @@ public abstract class MessageGenerationPhase extends
AbstractBaseStage {
}
// Asynchronously GC pending messages if necessary
- if (!pendingMessagesToCleanUp.isEmpty()) {
- schedulePendingMessageCleanUp(pendingMessagesToCleanUp,
cache.getAsyncTasksThreadPool(),
+ if (!messagesToCleanUp.isEmpty()) {
+ schedulePendingMessageCleanUp(messagesToCleanUp,
cache.getAsyncTasksThreadPool(),
manager.getHelixDataAccessor());
}
event.addAttribute(AttributeName.MESSAGES_ALL.name(), output);
@@ -115,7 +119,7 @@ public abstract class MessageGenerationPhase extends
AbstractBaseStage {
final ResourcesStateMap resourcesStateMap, final CurrentStateOutput
currentStateOutput,
final HelixManager manager, final Map<String, String> sessionIdMap,
final ClusterEventType eventType, MessageOutput output,
- Map<String, Map<String, Message>> pendingMessagesToCleanUp) {
+ Map<String, Map<String, Message>> messagesToCleanUp) {
String resourceName = resource.getResourceName();
StateModelDefinition stateModelDef =
cache.getStateModelDef(resource.getStateModelDefRef());
@@ -147,6 +151,9 @@ public abstract class MessageGenerationPhase extends
AbstractBaseStage {
Map<String, List<Message>> messageMap = new HashMap<>();
for (String instanceName : instanceStateMap.keySet()) {
+
+ Set<Message> staleMessages =
cache.getStaleMessagesByInstance(instanceName);
+
String desiredState = instanceStateMap.get(instanceName);
String currentState =
@@ -188,14 +195,17 @@ public abstract class MessageGenerationPhase extends
AbstractBaseStage {
if (pendingMessage != null &&
shouldCleanUpPendingMessage(pendingMessage, currentState,
currentStateOutput.getEndTime(resourceName, partition,
instanceName))) {
- LogUtil.logInfo(logger, _eventId, String.format(
- "Adding pending message %s on instance %s to clean up. Msg:
%s->%s, current state of resource %s:%s is %s",
- pendingMessage.getMsgId(), instanceName,
pendingMessage.getFromState(),
- pendingMessage.getToState(), resourceName, partition,
currentState));
- if (!pendingMessagesToCleanUp.containsKey(instanceName)) {
- pendingMessagesToCleanUp.put(instanceName, new HashMap<String,
Message>());
+ logAndAddToCleanUp(messagesToCleanUp, pendingMessage, instanceName,
resourceName,
+ partition, currentState, PENDING_MESSAGE);
+ }
+
+ for (Message staleMessage : staleMessages) {
+ if (System.currentTimeMillis() - currentStateOutput
+ .getEndTime(resourceName, partition, instanceName)
+ > DEFAULT_OBSELETE_MSG_PURGE_DELAY) {
+ logAndAddToCleanUp(messagesToCleanUp, staleMessage, instanceName,
resourceName,
+ partition, currentState, STALE_MESSAGE);
}
-
pendingMessagesToCleanUp.get(instanceName).put(pendingMessage.getMsgId(),
pendingMessage);
}
if (desiredState.equals(NO_DESIRED_STATE) ||
desiredState.equalsIgnoreCase(currentState)) {
@@ -259,6 +269,20 @@ public abstract class MessageGenerationPhase extends
AbstractBaseStage {
} // end of for-each-partition
}
+ private void logAndAddToCleanUp(Map<String, Map<String, Message>>
messagesToCleanUp,
+ Message message, String instanceName, String resourceName, Partition
partition,
+ String currentState, String cleanUpMessageType) {
+ String logMsg = String.format(
+ "Adding %s %s on instance %s to clean up. Msg: %s->%s, current state"
+ + " of resource %s:%s is %s", cleanUpMessageType,
message.getMsgId(), instanceName,
+ message.getFromState(), message.getToState(), resourceName, partition,
currentState);
+ LogUtil.logInfo(logger, _eventId, logMsg);
+ if (!messagesToCleanUp.containsKey(instanceName)) {
+ messagesToCleanUp.put(instanceName, new HashMap<String, Message>());
+ }
+ messagesToCleanUp.get(instanceName).put(message.getMsgId(), message);
+ }
+
private Message generateCancellationMessageForPendingMessage(final String
desiredState, final String currentState,
final String nextState, final Message pendingMessage, final HelixManager
manager,
final Resource resource, final Partition partition, final Map<String,
String> sessionIdMap,
@@ -337,7 +361,8 @@ public abstract class MessageGenerationPhase extends
AbstractBaseStage {
final Map<String, Map<String, Message>> pendingMessagesToPurge,
ExecutorService workerPool,
final HelixDataAccessor accessor) {
workerPool.submit(new Callable<Object>() {
- @Override public Object call() {
+ @Override
+ public Object call() {
for (Map.Entry<String, Map<String, Message>> entry :
pendingMessagesToPurge.entrySet()) {
String instanceName = entry.getKey();
for (Message msg : entry.getValue().values()) {
diff --git
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 704f4c2..0331b44 100644
---
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -33,9 +33,9 @@ import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
-import com.google.common.collect.ImmutableMap;
import javax.management.JMException;
+import com.google.common.collect.ImmutableMap;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index 1f354d7..52d84d0 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -57,8 +57,8 @@ public class TestCurrentStateComputationStage extends
BaseStageTest {
event.addAttribute(AttributeName.RESOURCES.name(), resourceMap);
event.addAttribute(AttributeName.RESOURCES_TO_REBALANCE.name(),
resourceMap);
- event.addAttribute(AttributeName.ControllerDataProvider.name(),
- new ResourceControllerDataProvider());
+ ResourceControllerDataProvider dataCache = new
ResourceControllerDataProvider();
+ event.addAttribute(AttributeName.ControllerDataProvider.name(), dataCache);
CurrentStateComputationStage stage = new CurrentStateComputationStage();
runStage(event, new ReadClusterDataStage());
runStage(event, stage);
@@ -112,6 +112,22 @@ public class TestCurrentStateComputationStage extends
BaseStageTest {
"localhost_3");
AssertJUnit.assertEquals(currentState, "OFFLINE");
+ // Add another state transition message which is stale
+ message = new Message(Message.MessageType.STATE_TRANSITION, "msg2");
+ message.setFromState("SLAVE");
+ message.setToState("OFFLINE");
+ message.setResourceName("testResourceName");
+ message.setPartitionName("testResourceName_1");
+ message.setTgtName("localhost_3");
+ message.setTgtSessionId("session_3");
+ accessor.setProperty(keyBuilder.message("localhost_" + 3,
message.getId()), message);
+
+ runStage(event, new ReadClusterDataStage());
+ runStage(event, stage);
+ CurrentStateOutput output4 =
event.getAttribute(AttributeName.CURRENT_STATE.name());
+ AssertJUnit.assertEquals(dataCache.getStaleMessages().size(), 1);
+
AssertJUnit.assertTrue(dataCache.getStaleMessages().containsKey("localhost_3"));
+
AssertJUnit.assertTrue(dataCache.getStaleMessages().get("localhost_3").containsKey("msg2"));
}
}
diff --git
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
index dde80c6..79c5e72 100644
---
a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
+++
b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java
@@ -22,6 +22,7 @@ package org.apache.helix.controller.stages;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -53,7 +54,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase {
private final String _className = getShortClassName();
@Test
- public void testDuplicateMsg() {
+ public void testDuplicateMsg() throws Exception {
String clusterName = "CLUSTER_" + _className + "_dup";
System.out.println("START " + clusterName + " at " + new
Date(System.currentTimeMillis()));
@@ -115,16 +116,51 @@ public class TestRebalancePipeline extends ZkUnitTestBase
{
Assert.assertEquals(message.getTgtName(), "localhost_0");
// round2: updates node0 currentState to SLAVE but keep the
- // message, make sure controller should not send S->M until removal is done
- setCurrentState(clusterName, "localhost_0", resourceName, resourceName +
"_0", liveInstances.get(0).getEphemeralOwner(),
- "SLAVE");
+ // message, make sure controller should not wait for the message to be
deleted, but should
+ // send out a S -> M message to node0
+ setCurrentState(clusterName, "localhost_0", resourceName, resourceName +
"_0",
+ liveInstances.get(0).getEphemeralOwner(), "SLAVE");
runPipeline(event, dataRefresh);
refreshClusterConfig(clusterName, accessor);
- runPipeline(event, rebalancePipeline);
+
+ Pipeline computationPipeline = new Pipeline();
+ computationPipeline.addStage(new ResourceComputationStage());
+ computationPipeline.addStage(new CurrentStateComputationStage());
+
+ Pipeline messagePipeline = new Pipeline();
+ messagePipeline.addStage(new BestPossibleStateCalcStage());
+ messagePipeline.addStage(new IntermediateStateCalcStage());
+ messagePipeline.addStage(new ResourceMessageGenerationPhase());
+ messagePipeline.addStage(new MessageSelectionStage());
+ messagePipeline.addStage(new MessageThrottleStage());
+ messagePipeline.addStage(new ResourceMessageDispatchStage());
+
+ runPipeline(event, computationPipeline);
+
+ Map<String, Map<String, Message>> staleMessages =
dataCache.getStaleMessages();
+ Assert.assertEquals(staleMessages.size(), 1);
+ Assert.assertTrue(staleMessages.containsKey("localhost_0"));
+
Assert.assertTrue(staleMessages.get("localhost_0").containsKey(message.getMsgId()));
+
+ runPipeline(event, messagePipeline);
+
msgSelOutput = event.getAttribute(AttributeName.MESSAGES_SELECTED.name());
messages = msgSelOutput.getMessages(resourceName, new
Partition(resourceName + "_0"));
- Assert.assertEquals(messages.size(), 0, "Should NOT output 1 message:
SLAVE-MASTER for node1");
+ Assert.assertEquals(messages.size(), 1, "Should output 1 message:
SLAVE-MASTER for node0");
+
Assert.assertTrue(messages.get(0).getTgtName().equalsIgnoreCase("localhost_0"));
+
Assert.assertTrue(messages.get(0).getFromState().equalsIgnoreCase("SLAVE"));
+ Assert.assertTrue(messages.get(0).getToState().equalsIgnoreCase("MASTER"));
+
+ runPipeline(event, dataRefresh);
+
+ // Verify the stale message should be deleted
+ Assert.assertTrue(TestHelper.verify(() -> {
+ if (dataCache.getStaleMessages().size() != 0) {
+ return false;
+ }
+ return true;
+ }, 2000));
deleteLiveInstances(clusterName);
deleteCluster(clusterName);
diff --git
a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
index 0ef34a7..ed9aa90 100644
---
a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
+++
b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PNoDuplicatedMessage.java
@@ -62,12 +62,12 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase {
final String CLASS_NAME = getShortClassName();
final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
- static final int PARTICIPANT_NUMBER = 6;
+ static final int PARTICIPANT_NUMBER = 10;
static final int PARTICIPANT_START_PORT = 12918;
static final int DB_COUNT = 2;
- static final int PARTITION_NUMBER = 50;
+ static final int PARTITION_NUMBER = 100;
static final int REPLICA_NUMBER = 3;
final String _controllerName = CONTROLLER_PREFIX + "_0";
@@ -171,11 +171,7 @@ public class TestP2PNoDuplicatedMessage extends ZkTestBase
{
verifyP2PEnabled(startTime);
}
- double ratio = ((double) p2pTrigged) / ((double) total);
- Assert.assertTrue(ratio > 0.6, String
- .format("Only %d out of %d percent transitions to Master were triggered
by expected host!",
- p2pTrigged, total));
-
+ Assert.assertEquals(p2pTrigged, total);
Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessagesInProgress, 0,
"There are duplicated transition messages sent while participant is
handling the state-transition!");
Assert.assertEquals(MockHelixTaskExecutor.duplicatedMessages, 0,