Repository: helix Updated Branches: refs/heads/helix-0.6.x 7bbb20be6 -> 1798e7935
http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java index 38c9113..79adcd5 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java @@ -24,13 +24,17 @@ import java.util.Calendar; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.helix.ExternalViewChangeListener; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; +import org.apache.helix.NotificationContext; import org.apache.helix.PropertyKey; import org.apache.helix.TestHelper; import org.apache.helix.ZNRecord; @@ -195,7 +199,6 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { return new JobQueue.Builder(jobQueueName).fromMap(cfgMap); } - private JobQueue.Builder buildRecurrentJobQueue(String jobQueueName) { return buildRecurrentJobQueue(jobQueueName, 0); } @@ -299,9 +302,8 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { // ensure job 1 is started before deleting it String deletedJob1 = currentJobNames.get(0); String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1); - TestUtil - .pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS, - TaskState.COMPLETED); + TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS, + TaskState.COMPLETED); // stop the queue LOG.info("Pausing job-queue: " + scheduledQueue); @@ -417,54 +419,68 @@ public class TestRecurringJobQueue extends ZkIntegrationTestBase { public void testJobsDisableExternalView() throws Exception { String queueName = TestHelper.getTestMethodName(); + ExternviewChecker externviewChecker = new ExternviewChecker(); + _manager.addExternalViewChangeListener(externviewChecker); + // Create a queue LOG.info("Starting job-queue: " + queueName); JobQueue.Builder queueBuilder = buildRecurrentJobQueue(queueName); - // create jobs - Map<String, String> commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500)); + JobConfig.Builder job1 = new JobConfig.Builder().setCommand("Reindex") + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setTargetPartitionStates(Sets.newHashSet("SLAVE")); - JobConfig.Builder job1 = - new JobConfig.Builder().setCommand("Reindex").setJobCommandConfigMap(commandConfig) - .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) - .setTargetPartitionStates(Sets.newHashSet("SLAVE")) - .setDisableExternalView(true); + JobConfig.Builder job2 = new JobConfig.Builder().setCommand("Reindex") + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setTargetPartitionStates(Sets.newHashSet("SLAVE")).setDisableExternalView(true); - JobConfig.Builder job2 = - new JobConfig.Builder().setCommand("Reindex").setJobCommandConfigMap(commandConfig) - .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) - .setTargetPartitionStates(Sets.newHashSet("MASTER")); + JobConfig.Builder job3 = new JobConfig.Builder().setCommand("Reindex") + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setTargetPartitionStates(Sets.newHashSet("MASTER")).setDisableExternalView(false); // enqueue both jobs queueBuilder.enqueueJob("job1", job1); queueBuilder.enqueueJob("job2", job2); + queueBuilder.enqueueJob("job3", job3); _driver.createQueue(queueBuilder.build()); - WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName); String scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); - // ensure job1 is started - String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, "job1"); - TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob1, TaskState.IN_PROGRESS, - TaskState.COMPLETED); - - PropertyKey.Builder keyBuilder = _accessor.keyBuilder(); - // verify external view for job does not exists - ExternalView externalView = _accessor.getProperty(keyBuilder.externalView(namedSpaceJob1)); - Assert.assertNull(externalView, "External View for " + namedSpaceJob1 + " shoudld not exist!"); + // ensure all jobs are completed + String namedSpaceJob3 = String.format("%s_%s", scheduledQueue, "job3"); + TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob3, TaskState.COMPLETED); - // ensure job2 is completed + Set<String> seenExternalViews = externviewChecker.getSeenExternalViews(); + String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, "job1"); String namedSpaceJob2 = String.format("%s_%s", scheduledQueue, "job2"); - TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob2, TaskState.IN_PROGRESS, - TaskState.COMPLETED); - // verify external view for job does not exists - externalView = _accessor.getProperty(keyBuilder.externalView(namedSpaceJob2)); - Assert.assertNotNull(externalView, "Can not find external View for " + namedSpaceJob2 + "!"); + Assert.assertTrue(seenExternalViews.contains(namedSpaceJob1), + "Can not find external View for " + namedSpaceJob1 + "!"); + Assert.assertTrue(!seenExternalViews.contains(namedSpaceJob2), + "External View for " + namedSpaceJob2 + " shoudld not exist!"); + Assert.assertTrue(seenExternalViews.contains(namedSpaceJob3), + "Can not find external View for " + namedSpaceJob3 + "!"); + + _manager + .removeListener(new PropertyKey.Builder(CLUSTER_NAME).externalViews(), externviewChecker); } + private static class ExternviewChecker implements ExternalViewChangeListener { + private Set<String> _seenExternalViews = new HashSet<String>(); + + @Override public void onExternalViewChange(List<ExternalView> externalViewList, + NotificationContext changeContext) { + for (ExternalView view : externalViewList) { + _seenExternalViews.add(view.getResourceName()); + } + } + + public Set<String> getSeenExternalViews() { + return _seenExternalViews; + } + } private void verifyJobDeleted(String queueName, String jobName) throws Exception { HelixDataAccessor accessor = _manager.getHelixDataAccessor(); http://git-wip-us.apache.org/repos/asf/helix/blob/1798e793/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java index f49f941..f402b82 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java @@ -185,7 +185,7 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase { // Wait for job to finish and expire TestUtil.pollForWorkflowState(_manager, jobName, TaskState.COMPLETED); Thread.sleep(expiry); - _driver.invokeRebalance(); + TaskUtil.invokeRebalance(_manager.getHelixDataAccessor(), flow.getName()); Thread.sleep(expiry); // Ensure workflow config and context were cleaned up by now
