Repository: helix Updated Branches: refs/heads/helix-0.8.1-hotfix 6d402143c -> 6ee5ffc55 (forced update)
Fix GroupCommit issue for adding back current state There was an issue that GroupCommit will add DROPPED current state back if there are double sent OFFLINE -> DROPPED message. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/6ee5ffc5 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/6ee5ffc5 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/6ee5ffc5 Branch: refs/heads/helix-0.8.1-hotfix Commit: 6ee5ffc5522e31ab92823d2a0eea4a088f4d8926 Parents: 2a770b2 Author: Junkai Xue <[email protected]> Authored: Tue Jul 17 22:06:59 2018 -0700 Committer: Junkai Xue <[email protected]> Committed: Tue Jul 17 22:28:54 2018 -0700 ---------------------------------------------------------------------- .../main/java/org/apache/helix/GroupCommit.java | 4 +- .../messaging/TestGroupCommitAddBackData.java | 137 +++++++++++++++++++ 2 files changed, 139 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/6ee5ffc5/helix-core/src/main/java/org/apache/helix/GroupCommit.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/GroupCommit.java b/helix-core/src/main/java/org/apache/helix/GroupCommit.java index b6c0ff1..e775d2d 100644 --- a/helix-core/src/main/java/org/apache/helix/GroupCommit.java +++ b/helix-core/src/main/java/org/apache/helix/GroupCommit.java @@ -117,9 +117,9 @@ public class GroupCommit { */ if (merged == null) { merged = new ZNRecord(first._record); - } else { - merged.merge(first._record); } + merged.merge(first._record); + Iterator<Entry> it = queue._pending.iterator(); while (it.hasNext()) { Entry ent = it.next(); http://git-wip-us.apache.org/repos/asf/helix/blob/6ee5ffc5/helix-core/src/test/java/org/apache/helix/integration/messaging/TestGroupCommitAddBackData.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestGroupCommitAddBackData.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestGroupCommitAddBackData.java new file mode 100644 index 0000000..fadfd5e --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestGroupCommitAddBackData.java @@ -0,0 +1,137 @@ +package org.apache.helix.integration.messaging; + +import java.util.Date; +import java.util.Random; +import java.util.UUID; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.PropertyKey; +import org.apache.helix.integration.common.ZkStandAloneCMTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.integration.task.WorkflowGenerator; +import org.apache.helix.model.Message; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestGroupCommitAddBackData extends ZkStandAloneCMTestBase { + private static Logger LOG = LoggerFactory.getLogger(TestGroupCommitAddBackData.class); + private static final int START_PORT = 12918; + private static final int DEFAULT_TIMEOUT = 30 * 1000; + + private HelixManager _manager; + private final String CLASS_NAME = getShortClassName(); + private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + + private MockParticipantManager _participant; + + private int _replica = 3; + + @BeforeClass + public void beforeClass() throws Exception { + // Logger.getRootLogger().setLevel(Level.INFO); + System.out.println("START " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + + // setup storage cluster + _gSetupTool.addCluster(CLUSTER_NAME, true); + String storageNodeName = PARTICIPANT_PREFIX + "_" + START_PORT; + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + _participant = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, storageNodeName); + _participant.syncStart(); + + // create cluster manager + _manager = HelixManagerFactory + .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR); + _manager.connect(); + } + + @AfterClass + public void afterClass() throws Exception { + if (_participant != null && _participant.isConnected()) { + _participant.syncStop(); + } + + if (_manager != null && _manager.isConnected()) { + _manager.disconnect(); + } + + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + try { + _gSetupTool.deleteCluster(CLUSTER_NAME); + } catch (Exception ex) { + System.err.println( + "Failed to delete cluster " + CLUSTER_NAME + ", error: " + ex.getLocalizedMessage()); + } + } + + System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testGroupCommitAddCurrentStateBack() throws InterruptedException { + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + Message initMessage = generateMessage("OFFLINE", "ONLINE"); + accessor.setProperty( + accessor.keyBuilder().message(_participant.getInstanceName(), initMessage.getMsgId()), + initMessage); + Assert.assertTrue(waitForMessageProcessed(accessor, initMessage.getMsgId())); + Message toOffline = generateMessage("ONLINE", "OFFLINE"); + accessor.setProperty( + accessor.keyBuilder().message(_participant.getInstanceName(), toOffline.getMsgId()), + toOffline); + Assert.assertTrue(waitForMessageProcessed(accessor, toOffline.getMsgId())); + + // Consequential 10 messages + for (int i = 0; i < 10; i++) { + Message dropped = generateMessage("OFFLINE", "DROPPED"); + accessor.setProperty( + accessor.keyBuilder().message(_participant.getInstanceName(), dropped.getMsgId()), + dropped); + Assert.assertTrue(waitForMessageProcessed(accessor, dropped.getMsgId())); + Assert.assertFalse(accessor.getBaseDataAccessor().exists(accessor.keyBuilder() + .currentState(_participant.getInstanceName(), _participant.getSessionId(), + WorkflowGenerator.DEFAULT_TGT_DB).getPath(), 0)); + } + } + + private Message generateMessage(String from, String to) { + String uuid = UUID.randomUUID().toString(); + Message message = new Message(Message.MessageType.STATE_TRANSITION, uuid); + message.setSrcName("ADMIN"); + message.setTgtName(_participant.getInstanceName()); + message.setMsgState(Message.MessageState.NEW); + message.setPartitionName("P"); + message.setResourceName(WorkflowGenerator.DEFAULT_TGT_DB); + message.setFromState(from); + message.setToState(to); + message.setTgtSessionId(_participant.getSessionId()); + message.setSrcSessionId(_manager.getSessionId()); + message.setStateModelDef("OnlineOffline"); + message.setStateModelFactoryName("DEFAULT"); + return message; + } + + private boolean waitForMessageProcessed(HelixDataAccessor accessor, String messageId) + throws InterruptedException { + String path = + accessor.keyBuilder().message(_participant.getInstanceName(), messageId).getPath(); + long startTime = System.currentTimeMillis(); + while (accessor.getBaseDataAccessor().exists(path, 0)) { + if (System.currentTimeMillis() - startTime > DEFAULT_TIMEOUT) { + return false; + } + Thread.sleep(200); + } + return true; + } +}
