Repository: helix Updated Branches: refs/heads/helix-0.6.x d704af442 -> 3a4ff21b4
[HELIX-574] fix bucketize resource bug in current state carryover, rb=31970 Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3a4ff21b Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3a4ff21b Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3a4ff21b Branch: refs/heads/helix-0.6.x Commit: 3a4ff21b4424be359205d53e16e9504217bcae8f Parents: d704af4 Author: zzhang <[email protected]> Authored: Thu Mar 12 00:31:28 2015 -0700 Committer: zzhang <[email protected]> Committed: Thu Mar 12 00:31:28 2015 -0700 ---------------------------------------------------------------------- .../controller/rebalancer/CustomRebalancer.java | 4 +- .../util/ConstraintBasedAssignment.java | 4 +- .../manager/zk/ParticipantManagerHelper.java | 41 ++- .../helix/manager/zk/ZKHelixDataAccessor.java | 10 + .../integration/TestBucketizedResource.java | 100 ++++++ .../helix/integration/TestSchedulerMessage.java | 330 +------------------ .../integration/TestSchedulerMessage2.java | 137 ++++++++ .../integration/TestSchedulerMsgContraints.java | 190 +++++++++++ .../integration/TestSchedulerMsgUsingQueue.java | 133 ++++++++ 9 files changed, 613 insertions(+), 336 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java index 0f3fbc4..21ad2ff 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java @@ -104,8 +104,8 @@ public class CustomRebalancer implements Rebalancer, MappingCalculator { if (currentStateMap != null) { for (String instance : currentStateMap.keySet()) { if ((idealStateMap == null || !idealStateMap.containsKey(instance)) - && !disabledInstancesForPartition.contains(instance) && isResourceEnabled) { - // if dropped and not disabled, transit to DROPPED + && !disabledInstancesForPartition.contains(instance)) { + // if dropped (whether disabled or not), transit to DROPPED instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString()); } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals( HelixDefinedState.ERROR.name())) http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java index bab357b..a520803 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java +++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java @@ -80,8 +80,8 @@ public class ConstraintBasedAssignment { if (currentStateMap != null) { for (String instance : currentStateMap.keySet()) { if ((instancePreferenceList == null || !instancePreferenceList.contains(instance)) - && !disabledInstancesForPartition.contains(instance) && isResourceEnabled) { - // if dropped and not disabled, transit to DROPPED + && !disabledInstancesForPartition.contains(instance)) { + // if dropped (whether disabled or not), transit to DROPPED instanceStateMap.put(instance, HelixDefinedState.DROPPED.toString()); } else if ((currentStateMap.get(instance) == null || !currentStateMap.get(instance).equals( HelixDefinedState.ERROR.name())) http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java index 1bee2fe..36a669a 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManagerHelper.java @@ -20,11 +20,15 @@ package org.apache.helix.manager.zk; */ import java.lang.management.ManagementFactory; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; +import org.I0Itec.zkclient.DataUpdater; import org.I0Itec.zkclient.exception.ZkNodeExistsException; import org.apache.helix.AccessOption; +import org.apache.helix.BaseDataAccessor; import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixException; @@ -33,8 +37,10 @@ import org.apache.helix.InstanceType; import org.apache.helix.LiveInstanceInfoProvider; import org.apache.helix.PropertyKey; import org.apache.helix.ZNRecord; +import org.apache.helix.ZNRecordBucketizer; import org.apache.helix.messaging.DefaultMessagingService; import org.apache.helix.model.CurrentState; +import org.apache.helix.model.CurrentState.CurrentStateProperty; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; @@ -242,12 +248,41 @@ public class ParticipantManagerHelper { StateModelDefinition stateModel = _dataAccessor.getProperty(_keyBuilder.stateModelDef(stateModelDefRef)); + BaseDataAccessor<ZNRecord> baseAccessor = _dataAccessor.getBaseDataAccessor(); String curStatePath = _keyBuilder.currentState(_instanceName, _sessionId, lastCurState.getResourceName()) .getPath(); - _dataAccessor.getBaseDataAccessor().update(curStatePath, - new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialState(), lastCurState), - AccessOption.PERSISTENT); + + String initState = stateModel.getInitialState(); + if (lastCurState.getBucketSize() > 0) { + // update parent node + ZNRecord metaRecord = new ZNRecord(lastCurState.getId()); + metaRecord.setSimpleFields(lastCurState.getRecord().getSimpleFields()); + DataUpdater<ZNRecord> metaRecordUpdater = + new CurStateCarryOverUpdater(_sessionId, initState, new CurrentState(metaRecord)); + boolean success = + baseAccessor.update(curStatePath, metaRecordUpdater, AccessOption.PERSISTENT); + if (success) { + // update current state buckets + ZNRecordBucketizer bucketizer = new ZNRecordBucketizer(lastCurState.getBucketSize()); + + Map<String, ZNRecord> map = bucketizer.bucketize(lastCurState.getRecord()); + List<String> paths = new ArrayList<String>(); + List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>(); + for (String bucketName : map.keySet()) { + paths.add(curStatePath + "/" + bucketName); + updaters.add(new CurStateCarryOverUpdater(_sessionId, initState, new CurrentState(map + .get(bucketName)))); + } + + baseAccessor.updateChildren(paths, updaters, AccessOption.PERSISTENT); + } + + } else { + _dataAccessor.getBaseDataAccessor().update(curStatePath, + new CurStateCarryOverUpdater(_sessionId, initState, lastCurState), + AccessOption.PERSISTENT); + } } } http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java index 8c9fc8d..ed434a1 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java @@ -187,6 +187,11 @@ public class ZKHelixDataAccessor implements HelixDataAccessor { int bucketSize = property.getBucketSize(); if (bucketSize > 0) { + // @see HELIX-574 + // clean up list and map fields in case we write to parent node by mistake + property.getRecord().getMapFields().clear(); + property.getRecord().getListFields().clear(); + List<ZNRecord> childRecords = _baseDataAccessor.getChildren(path, null, options); ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords); @@ -239,6 +244,11 @@ public class ZKHelixDataAccessor implements HelixDataAccessor { int bucketSize = property.getBucketSize(); if (bucketSize > 0) { + // @see HELIX-574 + // clean up list and map fields in case we write to parent node by mistake + property.getRecord().getMapFields().clear(); + property.getRecord().getListFields().clear(); + List<ZNRecord> childRecords = _baseDataAccessor.getChildren(path, null, options); ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords); http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java index 2b5e2bc..9b72e41 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestBucketizedResource.java @@ -19,14 +19,18 @@ package org.apache.helix.integration; * under the License. */ +import java.util.Arrays; import java.util.Date; +import java.util.List; +import org.apache.helix.HelixAdmin; import org.apache.helix.PropertyKey; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.integration.manager.ClusterControllerManager; import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZkBaseDataAccessor; import org.apache.helix.model.ExternalView; @@ -34,6 +38,7 @@ import org.apache.helix.model.IdealState; import org.apache.helix.tools.ClusterStateVerifier; import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier; import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier; +import org.apache.helix.tools.DefaultIdealStateCalculator; import org.testng.Assert; import org.testng.annotations.Test; @@ -115,4 +120,99 @@ public class TestBucketizedResource extends ZkIntegrationTestBase { System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); } + + @Test + public void testBounceDisableAndDrop() throws Exception { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + String dbName = "TestDB0"; + int n = 5; + int r = 3; + List<String> instanceNames = + Arrays.asList("localhost_0", "localhost_1", "localhost_2", "localhost_3", "localhost_4"); + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + // create cluster and add nodes to cluster + MockParticipantManager[] participants = new MockParticipantManager[n]; + _gSetupTool.addCluster(clusterName, true); + _gSetupTool.addInstancesToCluster(clusterName, + instanceNames.toArray(new String[instanceNames.size()])); + + // add a bucketized resource + ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor); + Builder keyBuilder = accessor.keyBuilder(); + ZNRecord idealStateRec = + DefaultIdealStateCalculator.calculateIdealState(instanceNames, 10, r - 1, dbName, "MASTER", + "SLAVE"); + IdealState idealState = new IdealState(idealStateRec); + idealState.setBucketSize(2); + idealState.setStateModelDefRef("MasterSlave"); + idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED); + idealState.setReplicas(Integer.toString(r)); + accessor.setProperty(keyBuilder.idealStates(dbName), idealState); + + // start controller + ClusterControllerManager controller = + new ClusterControllerManager(ZK_ADDR, clusterName, "controller"); + controller.syncStart(); + + // start participants + for (int i = 0; i < n; i++) { + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceNames.get(i)); + participants[i].syncStart(); + } + + boolean result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // bounce + participants[0].syncStop(); + participants[0] = new MockParticipantManager(ZK_ADDR, clusterName, instanceNames.get(0)); + participants[0].syncStart(); + + result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // make sure participants[0]'s current state is bucketzied correctly during carryover + String path = + keyBuilder.currentState(instanceNames.get(0), participants[0].getSessionId(), dbName) + .getPath(); + ZNRecord record = baseAccessor.get(path, null, 0); + Assert.assertTrue(record.getMapFields().size() == 0); + + // disable the bucketize resource + HelixAdmin admin = new ZKHelixAdmin(_gZkClient); + admin.enableResource(clusterName, dbName, false); + result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // drop the bucketize resource + _gSetupTool.dropResourceFromCluster(clusterName, dbName); + result = + ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR, + clusterName)); + Assert.assertTrue(result); + + // make sure external-view is cleaned up + path = keyBuilder.externalView(dbName).getPath(); + result = baseAccessor.exists(path, 0); + Assert.assertFalse(result); + + // clean up + controller.syncStop(); + for (MockParticipantManager participant : participants) { + participant.syncStop(); + } + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + + } } http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java index 70713f3..ef2e18d 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java @@ -19,7 +19,6 @@ package org.apache.helix.integration; * under the License. */ -import java.io.IOException; import java.io.StringWriter; import java.util.ArrayList; import java.util.List; @@ -55,8 +54,6 @@ import org.apache.helix.monitoring.ZKPathDataDumpTask; import org.apache.helix.util.HelixUtil; import org.apache.log4j.Level; import org.apache.log4j.Logger; -import org.codehaus.jackson.JsonGenerationException; -import org.codehaus.jackson.map.JsonMappingException; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; import org.testng.Assert; @@ -64,7 +61,7 @@ import org.testng.annotations.Test; public class TestSchedulerMessage extends ZkStandAloneCMTestBase { - class MockAsyncCallback extends AsyncCallback { + public static class MockAsyncCallback extends AsyncCallback { Message _message; public MockAsyncCallback() { @@ -189,95 +186,6 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBase { } @Test() - public void testSchedulerMsgUsingQueue() throws Exception { - Logger.getRootLogger().setLevel(Level.INFO); - _factory._results.clear(); - Thread.sleep(2000); - HelixManager manager = null; - for (int i = 0; i < NODE_NR; i++) { - _participants[i].getMessagingService().registerMessageHandlerFactory( - _factory.getMessageType(), _factory); - - manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager; - } - - Message schedulerMessage = - new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString()); - schedulerMessage.setTgtSessionId("*"); - schedulerMessage.setTgtName("CONTROLLER"); - // TODO: change it to "ADMIN" ? - schedulerMessage.setSrcName("CONTROLLER"); - schedulerMessage.getRecord().setSimpleField( - DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg"); - // Template for the individual message sent to each participant - Message msg = new Message(_factory.getMessageType(), "Template"); - msg.setTgtSessionId("*"); - msg.setMsgState(MessageState.NEW); - - // Criteria to send individual messages - Criteria cr = new Criteria(); - cr.setInstanceName("localhost_%"); - cr.setRecipientInstanceType(InstanceType.PARTICIPANT); - cr.setSessionSpecific(false); - cr.setResource("%"); - cr.setPartition("%"); - - ObjectMapper mapper = new ObjectMapper(); - SerializationConfig serializationConfig = mapper.getSerializationConfig(); - serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true); - - StringWriter sw = new StringWriter(); - mapper.writeValue(sw, cr); - - String crString = sw.toString(); - - schedulerMessage.getRecord().setSimpleField("Criteria", crString); - schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields()); - schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1"); - - HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor(); - Builder keyBuilder = helixDataAccessor.keyBuilder(); - helixDataAccessor.createProperty(keyBuilder.controllerMessage(schedulerMessage.getMsgId()), - schedulerMessage); - - for (int i = 0; i < 30; i++) { - Thread.sleep(2000); - if (_PARTITIONS == _factory._results.size()) { - break; - } - } - - Assert.assertEquals(_PARTITIONS, _factory._results.size()); - PropertyKey controllerTaskStatus = - keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), - schedulerMessage.getMsgId()); - - int messageResultCount = 0; - for (int i = 0; i < 10; i++) { - ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord(); - Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount") - .equals("" + (_PARTITIONS * 3))); - for (String key : statusUpdate.getMapFields().keySet()) { - if (key.startsWith("MessageResult ")) { - messageResultCount++; - } - } - if (messageResultCount == _PARTITIONS * 3) { - break; - } else { - Thread.sleep(2000); - } - } - Assert.assertEquals(messageResultCount, _PARTITIONS * 3); - int count = 0; - for (Set<String> val : _factory._results.values()) { - count += val.size(); - } - Assert.assertEquals(count, _PARTITIONS * 3); - - } - - @Test() public void testSchedulerMsg() throws Exception { Logger.getRootLogger().setLevel(Level.INFO); _factory._results.clear(); @@ -418,98 +326,6 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBase { } @Test() - public void testSchedulerMsg2() throws Exception { - _factory._results.clear(); - Thread.sleep(2000); - HelixManager manager = null; - for (int i = 0; i < NODE_NR; i++) { - _participants[i].getMessagingService().registerMessageHandlerFactory( - _factory.getMessageType(), _factory); - - manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager; - } - - Message schedulerMessage = - new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString()); - schedulerMessage.setTgtSessionId("*"); - schedulerMessage.setTgtName("CONTROLLER"); - // TODO: change it to "ADMIN" ? - schedulerMessage.setSrcName("CONTROLLER"); - - // Template for the individual message sent to each participant - Message msg = new Message(_factory.getMessageType(), "Template"); - msg.setTgtSessionId("*"); - msg.setMsgState(MessageState.NEW); - - // Criteria to send individual messages - Criteria cr = new Criteria(); - cr.setInstanceName("localhost_%"); - cr.setRecipientInstanceType(InstanceType.PARTICIPANT); - cr.setSessionSpecific(false); - cr.setResource("%"); - cr.setPartition("%"); - - ObjectMapper mapper = new ObjectMapper(); - SerializationConfig serializationConfig = mapper.getSerializationConfig(); - serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true); - - StringWriter sw = new StringWriter(); - mapper.writeValue(sw, cr); - - String crString = sw.toString(); - - schedulerMessage.getRecord().setSimpleField("Criteria", crString); - schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields()); - schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1"); - schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true"); - - Criteria cr2 = new Criteria(); - cr2.setRecipientInstanceType(InstanceType.CONTROLLER); - cr2.setInstanceName("*"); - cr2.setSessionSpecific(false); - - schedulerMessage.getRecord().setSimpleField( - DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg2"); - MockAsyncCallback callback = new MockAsyncCallback(); - manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1); - String msgId = - callback._message.getResultMap() - .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID); - - HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor(); - Builder keyBuilder = helixDataAccessor.keyBuilder(); - for (int i = 0; i < 10; i++) { - Thread.sleep(200); - PropertyKey controllerTaskStatus = - keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId); - ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord(); - if (statusUpdate.getMapFields().containsKey("Summary")) { - break; - } - } - - Assert.assertEquals(_PARTITIONS, _factory._results.size()); - PropertyKey controllerTaskStatus = - keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId); - ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord(); - Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount") - .equals("" + (_PARTITIONS * 3))); - int messageResultCount = 0; - for (String key : statusUpdate.getMapFields().keySet()) { - if (key.startsWith("MessageResult ")) { - messageResultCount++; - } - } - Assert.assertEquals(messageResultCount, _PARTITIONS * 3); - - int count = 0; - for (Set<String> val : _factory._results.values()) { - count += val.size(); - } - Assert.assertEquals(count, _PARTITIONS * 3); - } - - @Test() public void testSchedulerZeroMsg() throws Exception { TestMessagingHandlerFactory factory = new TestMessagingHandlerFactory(); HelixManager manager = null; @@ -849,148 +665,4 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBase { // System.out.println(count); Assert.assertEquals(count, _PARTITIONS * 3 * 2); } - - @Test - public void testSchedulerMsgContraints() throws JsonGenerationException, JsonMappingException, - IOException, InterruptedException { - TestMessagingHandlerFactoryLatch factory = new TestMessagingHandlerFactoryLatch(); - HelixManager manager = null; - for (int i = 0; i < NODE_NR; i++) { - _participants[i].getMessagingService().registerMessageHandlerFactory( - factory.getMessageType(), factory); - - _participants[i].getMessagingService().registerMessageHandlerFactory( - factory.getMessageType(), factory); - - manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager; - } - - Message schedulerMessage = - new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString()); - schedulerMessage.setTgtSessionId("*"); - schedulerMessage.setTgtName("CONTROLLER"); - // TODO: change it to "ADMIN" ? - schedulerMessage.setSrcName("CONTROLLER"); - - // Template for the individual message sent to each participant - Message msg = new Message(factory.getMessageType(), "Template"); - msg.setTgtSessionId("*"); - msg.setMsgState(MessageState.NEW); - - // Criteria to send individual messages - Criteria cr = new Criteria(); - cr.setInstanceName("localhost_%"); - cr.setRecipientInstanceType(InstanceType.PARTICIPANT); - cr.setSessionSpecific(false); - cr.setResource("%"); - cr.setPartition("%"); - - ObjectMapper mapper = new ObjectMapper(); - SerializationConfig serializationConfig = mapper.getSerializationConfig(); - serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true); - - StringWriter sw = new StringWriter(); - mapper.writeValue(sw, cr); - - String crString = sw.toString(); - - schedulerMessage.getRecord().setSimpleField("Criteria", crString); - schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields()); - schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1"); - schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true"); - schedulerMessage.getRecord().setSimpleField( - DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgContraints"); - - Criteria cr2 = new Criteria(); - cr2.setRecipientInstanceType(InstanceType.CONTROLLER); - cr2.setInstanceName("*"); - cr2.setSessionSpecific(false); - - MockAsyncCallback callback = new MockAsyncCallback(); - mapper = new ObjectMapper(); - serializationConfig = mapper.getSerializationConfig(); - serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true); - - sw = new StringWriter(); - mapper.writeValue(sw, cr); - - HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor(); - Builder keyBuilder = helixDataAccessor.keyBuilder(); - - // Set contraints that only 1 msg per participant - Map<String, String> constraints = new TreeMap<String, String>(); - constraints.put("MESSAGE_TYPE", "STATE_TRANSITION"); - constraints.put("TRANSITION", "OFFLINE-COMPLETED"); - constraints.put("CONSTRAINT_VALUE", "1"); - constraints.put("INSTANCE", ".*"); - manager.getClusterManagmentTool().setConstraint(manager.getClusterName(), - ConstraintType.MESSAGE_CONSTRAINT, "constraint1", new ConstraintItem(constraints)); - - // Send scheduler message - crString = sw.toString(); - schedulerMessage.getRecord().setSimpleField("Criteria", crString); - manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1); - String msgId = - callback._message.getResultMap() - .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID); - - for (int j = 0; j < 10; j++) { - Thread.sleep(200); - PropertyKey controllerTaskStatus = - keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId); - ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord(); - if (statusUpdate.getMapFields().containsKey("SentMessageCount")) { - Assert.assertEquals( - statusUpdate.getMapFields().get("SentMessageCount").get("MessageCount"), "" - + (_PARTITIONS * 3)); - break; - } - } - - for (int i = 0; i < _PARTITIONS * 3 / 5; i++) { - for (int j = 0; j < 10; j++) { - Thread.sleep(300); - if (factory._messageCount == 5 * (i + 1)) - break; - } - Thread.sleep(300); - Assert.assertEquals(factory._messageCount, 5 * (i + 1)); - factory.signal(); - // System.err.println(i); - } - - for (int j = 0; j < 10; j++) { - Thread.sleep(200); - PropertyKey controllerTaskStatus = - keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId); - ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord(); - if (statusUpdate.getMapFields().containsKey("Summary")) { - break; - } - } - - Assert.assertEquals(_PARTITIONS, factory._results.size()); - PropertyKey controllerTaskStatus = - keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId); - ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord(); - Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount") - .equals("" + (_PARTITIONS * 3))); - int messageResultCount = 0; - for (String key : statusUpdate.getMapFields().keySet()) { - if (key.startsWith("MessageResult ")) { - messageResultCount++; - } - } - Assert.assertEquals(messageResultCount, _PARTITIONS * 3); - - int count = 0; - for (Set<String> val : factory._results.values()) { - count += val.size(); - } - Assert.assertEquals(count, _PARTITIONS * 3); - - manager.getClusterManagmentTool().removeConstraint(manager.getClusterName(), - ConstraintType.MESSAGE_CONSTRAINT, "constraint1"); - - } } http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage2.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage2.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage2.java new file mode 100644 index 0000000..b0ee961 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage2.java @@ -0,0 +1,137 @@ +package org.apache.helix.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.StringWriter; +import java.util.Set; +import java.util.UUID; + +import org.apache.helix.Criteria; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.PropertyKey; +import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory; +import org.apache.helix.model.Message; +import org.apache.helix.model.Message.MessageState; +import org.apache.helix.model.Message.MessageType; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestSchedulerMessage2 extends ZkStandAloneCMTestBase { + TestSchedulerMessage.TestMessagingHandlerFactory _factory = + new TestSchedulerMessage.TestMessagingHandlerFactory(); + + @Test() + public void testSchedulerMsg2() throws Exception { + _factory._results.clear(); + Thread.sleep(2000); + HelixManager manager = null; + for (int i = 0; i < NODE_NR; i++) { + _participants[i].getMessagingService().registerMessageHandlerFactory( + _factory.getMessageType(), _factory); + + manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager; + } + + Message schedulerMessage = + new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString()); + schedulerMessage.setTgtSessionId("*"); + schedulerMessage.setTgtName("CONTROLLER"); + // TODO: change it to "ADMIN" ? + schedulerMessage.setSrcName("CONTROLLER"); + + // Template for the individual message sent to each participant + Message msg = new Message(_factory.getMessageType(), "Template"); + msg.setTgtSessionId("*"); + msg.setMsgState(MessageState.NEW); + + // Criteria to send individual messages + Criteria cr = new Criteria(); + cr.setInstanceName("localhost_%"); + cr.setRecipientInstanceType(InstanceType.PARTICIPANT); + cr.setSessionSpecific(false); + cr.setResource("%"); + cr.setPartition("%"); + + ObjectMapper mapper = new ObjectMapper(); + SerializationConfig serializationConfig = mapper.getSerializationConfig(); + serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true); + + StringWriter sw = new StringWriter(); + mapper.writeValue(sw, cr); + + String crString = sw.toString(); + + schedulerMessage.getRecord().setSimpleField("Criteria", crString); + schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields()); + schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1"); + schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true"); + + Criteria cr2 = new Criteria(); + cr2.setRecipientInstanceType(InstanceType.CONTROLLER); + cr2.setInstanceName("*"); + cr2.setSessionSpecific(false); + + schedulerMessage.getRecord().setSimpleField( + DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg2"); + TestSchedulerMessage.MockAsyncCallback callback = new TestSchedulerMessage.MockAsyncCallback(); + manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1); + String msgId = + callback._message.getResultMap() + .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID); + + HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor(); + PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder(); + for (int i = 0; i < 10; i++) { + Thread.sleep(200); + PropertyKey controllerTaskStatus = + keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId); + ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord(); + if (statusUpdate.getMapFields().containsKey("Summary")) { + break; + } + } + + Assert.assertEquals(_PARTITIONS, _factory._results.size()); + PropertyKey controllerTaskStatus = + keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId); + ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord(); + Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount") + .equals("" + (_PARTITIONS * 3))); + int messageResultCount = 0; + for (String key : statusUpdate.getMapFields().keySet()) { + if (key.startsWith("MessageResult ")) { + messageResultCount++; + } + } + Assert.assertEquals(messageResultCount, _PARTITIONS * 3); + + int count = 0; + for (Set<String> val : _factory._results.values()) { + count += val.size(); + } + Assert.assertEquals(count, _PARTITIONS * 3); + } + +} http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java new file mode 100644 index 0000000..51a225e --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgContraints.java @@ -0,0 +1,190 @@ +package org.apache.helix.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.StringWriter; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; + +import org.apache.helix.Criteria; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.PropertyKey; +import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory; +import org.apache.helix.model.ClusterConstraints.ConstraintType; +import org.apache.helix.model.ConstraintItem; +import org.apache.helix.model.Message; +import org.apache.helix.model.Message.MessageState; +import org.apache.helix.model.Message.MessageType; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestSchedulerMsgContraints extends ZkStandAloneCMTestBase { + + @Test + public void testSchedulerMsgContraints() throws Exception { + TestSchedulerMessage.TestMessagingHandlerFactoryLatch factory = + new TestSchedulerMessage.TestMessagingHandlerFactoryLatch(); + HelixManager manager = null; + for (int i = 0; i < NODE_NR; i++) { + _participants[i].getMessagingService().registerMessageHandlerFactory( + factory.getMessageType(), factory); + + _participants[i].getMessagingService().registerMessageHandlerFactory( + factory.getMessageType(), factory); + + manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager; + } + + Message schedulerMessage = + new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString()); + schedulerMessage.setTgtSessionId("*"); + schedulerMessage.setTgtName("CONTROLLER"); + // TODO: change it to "ADMIN" ? + schedulerMessage.setSrcName("CONTROLLER"); + + // Template for the individual message sent to each participant + Message msg = new Message(factory.getMessageType(), "Template"); + msg.setTgtSessionId("*"); + msg.setMsgState(MessageState.NEW); + + // Criteria to send individual messages + Criteria cr = new Criteria(); + cr.setInstanceName("localhost_%"); + cr.setRecipientInstanceType(InstanceType.PARTICIPANT); + cr.setSessionSpecific(false); + cr.setResource("%"); + cr.setPartition("%"); + + ObjectMapper mapper = new ObjectMapper(); + SerializationConfig serializationConfig = mapper.getSerializationConfig(); + serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true); + + StringWriter sw = new StringWriter(); + mapper.writeValue(sw, cr); + + String crString = sw.toString(); + + schedulerMessage.getRecord().setSimpleField("Criteria", crString); + schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields()); + schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1"); + schedulerMessage.getRecord().setSimpleField("WAIT_ALL", "true"); + schedulerMessage.getRecord().setSimpleField( + DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsgContraints"); + + Criteria cr2 = new Criteria(); + cr2.setRecipientInstanceType(InstanceType.CONTROLLER); + cr2.setInstanceName("*"); + cr2.setSessionSpecific(false); + + TestSchedulerMessage.MockAsyncCallback callback = new TestSchedulerMessage.MockAsyncCallback(); + mapper = new ObjectMapper(); + serializationConfig = mapper.getSerializationConfig(); + serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true); + + sw = new StringWriter(); + mapper.writeValue(sw, cr); + + HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor(); + PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder(); + + // Set constraints that only 1 message per participant + Map<String, String> constraints = new TreeMap<String, String>(); + constraints.put("MESSAGE_TYPE", "STATE_TRANSITION"); + constraints.put("TRANSITION", "OFFLINE-COMPLETED"); + constraints.put("CONSTRAINT_VALUE", "1"); + constraints.put("INSTANCE", ".*"); + manager.getClusterManagmentTool().setConstraint(manager.getClusterName(), + ConstraintType.MESSAGE_CONSTRAINT, "constraint1", new ConstraintItem(constraints)); + + // Send scheduler message + crString = sw.toString(); + schedulerMessage.getRecord().setSimpleField("Criteria", crString); + manager.getMessagingService().sendAndWait(cr2, schedulerMessage, callback, -1); + String msgId = + callback._message.getResultMap() + .get(DefaultSchedulerMessageHandlerFactory.SCHEDULER_MSG_ID); + + for (int j = 0; j < 10; j++) { + Thread.sleep(200); + PropertyKey controllerTaskStatus = + keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId); + ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord(); + if (statusUpdate.getMapFields().containsKey("SentMessageCount")) { + Assert.assertEquals( + statusUpdate.getMapFields().get("SentMessageCount").get("MessageCount"), "" + + (_PARTITIONS * 3)); + break; + } + } + + for (int i = 0; i < _PARTITIONS * 3 / 5; i++) { + for (int j = 0; j < 10; j++) { + Thread.sleep(300); + if (factory._messageCount == 5 * (i + 1)) + break; + } + Thread.sleep(300); + Assert.assertEquals(factory._messageCount, 5 * (i + 1)); + factory.signal(); + // System.err.println(i); + } + + for (int j = 0; j < 10; j++) { + Thread.sleep(200); + PropertyKey controllerTaskStatus = + keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId); + ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord(); + if (statusUpdate.getMapFields().containsKey("Summary")) { + break; + } + } + + Assert.assertEquals(_PARTITIONS, factory._results.size()); + PropertyKey controllerTaskStatus = + keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), msgId); + ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord(); + Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount") + .equals("" + (_PARTITIONS * 3))); + int messageResultCount = 0; + for (String key : statusUpdate.getMapFields().keySet()) { + if (key.startsWith("MessageResult ")) { + messageResultCount++; + } + } + Assert.assertEquals(messageResultCount, _PARTITIONS * 3); + + int count = 0; + for (Set<String> val : factory._results.values()) { + count += val.size(); + } + Assert.assertEquals(count, _PARTITIONS * 3); + + manager.getClusterManagmentTool().removeConstraint(manager.getClusterName(), + ConstraintType.MESSAGE_CONSTRAINT, "constraint1"); + + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/3a4ff21b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java new file mode 100644 index 0000000..d5b5680 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java @@ -0,0 +1,133 @@ +package org.apache.helix.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.StringWriter; +import java.util.Set; +import java.util.UUID; + +import org.apache.helix.Criteria; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.InstanceType; +import org.apache.helix.PropertyKey; +import org.apache.helix.ZNRecord; +import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory; +import org.apache.helix.model.Message; +import org.apache.helix.model.Message.MessageState; +import org.apache.helix.model.Message.MessageType; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.SerializationConfig; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestSchedulerMsgUsingQueue extends ZkStandAloneCMTestBase { + TestSchedulerMessage.TestMessagingHandlerFactory _factory = + new TestSchedulerMessage.TestMessagingHandlerFactory(); + + @Test() + public void testSchedulerMsgUsingQueue() throws Exception { + // Logger.getRootLogger().setLevel(Level.INFO); + _factory._results.clear(); + Thread.sleep(2000); + HelixManager manager = null; + for (int i = 0; i < NODE_NR; i++) { + _participants[i].getMessagingService().registerMessageHandlerFactory( + _factory.getMessageType(), _factory); + + manager = _participants[i]; // _startCMResultMap.get(hostDest)._manager; + } + + Message schedulerMessage = + new Message(MessageType.SCHEDULER_MSG + "", UUID.randomUUID().toString()); + schedulerMessage.setTgtSessionId("*"); + schedulerMessage.setTgtName("CONTROLLER"); + // TODO: change it to "ADMIN" ? + schedulerMessage.setSrcName("CONTROLLER"); + schedulerMessage.getRecord().setSimpleField( + DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, "TestSchedulerMsg"); + // Template for the individual message sent to each participant + Message msg = new Message(_factory.getMessageType(), "Template"); + msg.setTgtSessionId("*"); + msg.setMsgState(MessageState.NEW); + + // Criteria to send individual messages + Criteria cr = new Criteria(); + cr.setInstanceName("localhost_%"); + cr.setRecipientInstanceType(InstanceType.PARTICIPANT); + cr.setSessionSpecific(false); + cr.setResource("%"); + cr.setPartition("%"); + + ObjectMapper mapper = new ObjectMapper(); + SerializationConfig serializationConfig = mapper.getSerializationConfig(); + serializationConfig.set(SerializationConfig.Feature.INDENT_OUTPUT, true); + + StringWriter sw = new StringWriter(); + mapper.writeValue(sw, cr); + + String crString = sw.toString(); + + schedulerMessage.getRecord().setSimpleField("Criteria", crString); + schedulerMessage.getRecord().setMapField("MessageTemplate", msg.getRecord().getSimpleFields()); + schedulerMessage.getRecord().setSimpleField("TIMEOUT", "-1"); + + HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor(); + PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder(); + helixDataAccessor.createProperty(keyBuilder.controllerMessage(schedulerMessage.getMsgId()), + schedulerMessage); + + for (int i = 0; i < 30; i++) { + Thread.sleep(2000); + if (_PARTITIONS == _factory._results.size()) { + break; + } + } + + Assert.assertEquals(_PARTITIONS, _factory._results.size()); + PropertyKey controllerTaskStatus = + keyBuilder.controllerTaskStatus(MessageType.SCHEDULER_MSG.toString(), + schedulerMessage.getMsgId()); + + int messageResultCount = 0; + for (int i = 0; i < 10; i++) { + ZNRecord statusUpdate = helixDataAccessor.getProperty(controllerTaskStatus).getRecord(); + Assert.assertTrue(statusUpdate.getMapField("SentMessageCount").get("MessageCount") + .equals("" + (_PARTITIONS * 3))); + for (String key : statusUpdate.getMapFields().keySet()) { + if (key.startsWith("MessageResult ")) { + messageResultCount++; + } + } + if (messageResultCount == _PARTITIONS * 3) { + break; + } else { + Thread.sleep(2000); + } + } + Assert.assertEquals(messageResultCount, _PARTITIONS * 3); + int count = 0; + for (Set<String> val : _factory._results.values()) { + count += val.size(); + } + Assert.assertEquals(count, _PARTITIONS * 3); + + } +}
