Repository: helix Updated Branches: refs/heads/helix-0.6.x 22bee7298 -> f9b94bc9b
Add test for testing submessage fail but update status. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/f9b94bc9 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/f9b94bc9 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/f9b94bc9 Branch: refs/heads/helix-0.6.x Commit: f9b94bc9b713f251141502362de3b4634af50974 Parents: 22bee72 Author: Junkai Xue <[email protected]> Authored: Mon Apr 10 18:23:24 2017 -0700 Committer: dasahcc <[email protected]> Committed: Sat Apr 15 22:37:09 2017 -0700 ---------------------------------------------------------------------- .../integration/TestBatchMessageHandling.java | 108 +++++++++++++++++++ 1 file changed, 108 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/f9b94bc9/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java new file mode 100644 index 0000000..1d7ac5e --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestBatchMessageHandling.java @@ -0,0 +1,108 @@ +package org.apache.helix.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixException; +import org.apache.helix.NotificationContext; +import org.apache.helix.mock.participant.MockMSStateModel; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.Message; +import org.apache.helix.model.builder.FullAutoModeISBuilder; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelFactory; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestBatchMessageHandling extends ZkStandAloneCMTestBase { + + @Test + public void testSubMessageFailed() throws InterruptedException { + TestOnlineOfflineStateModel._numOfSuccessBeforeFail = 6; + + // Let one instance handle all the batch messages. + _participants[0].getStateMachineEngine().registerStateModelFactory("OnlineOffline", + new TestOnlineOfflineStateModelFactory(), "TestFactory"); + for (int i = 1; i < _participants.length; i++) { + _participants[i].syncStop(); + } + + // Add 1 db with batch message enabled. Each db has 10 partitions. + // So it will have 1 batch message and 10 sub messages. + + String dbName = "TestDBSubMessageFail"; + IdealState idealState = new FullAutoModeISBuilder(dbName).setStateModel("OnlineOffline") + .setStateModelFactoryName("TestFactory").setNumPartitions(10).setNumReplica(1).build(); + idealState.setBatchMessageMode(true); + _setupTool.getClusterManagementTool().addResource(CLUSTER_NAME, dbName, idealState); + _setupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, 1); + + Thread.sleep(1000L); + + int numOfOnlines = 0; + int numOfErrors = 0; + ExternalView externalView = + _setupTool.getClusterManagementTool().getResourceExternalView(CLUSTER_NAME, dbName); + for (String partition : externalView.getPartitionSet()) { + if (externalView.getStateMap(partition).values().contains("ONLINE")) { + numOfOnlines++; + } + + if (externalView.getStateMap(partition).values().contains("ERROR")) { + numOfErrors++; + } + } + + Assert.assertEquals(numOfErrors, 4); + Assert.assertEquals(numOfOnlines, 6); + } + + public static class TestOnlineOfflineStateModelFactory extends + StateModelFactory<TestOnlineOfflineStateModel> { + @Override + public TestOnlineOfflineStateModel createNewStateModel(String resourceName, String stateUnitKey) { + TestOnlineOfflineStateModel model = new TestOnlineOfflineStateModel(); + return model; + } + } + + public static class TestOnlineOfflineStateModel extends StateModel { + private static Logger LOG = Logger.getLogger(MockMSStateModel.class); + public static int _numOfSuccessBeforeFail; + + public synchronized void onBecomeOnlineFromOffline(Message message, + NotificationContext context) { + if (_numOfSuccessBeforeFail-- > 0) { + LOG.info("State transition from Offline to Online"); + return; + } + throw new HelixException("Number of Success reached"); + } + + public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { + LOG.info("State transition from Online to Offline"); + } + + public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { + LOG.info("State transition from Offline to Dropped"); + } + } +}
