Repository: incubator-gobblin Updated Branches: refs/heads/master 1fb2bb732 -> 85202fab2
[GOBBLIN-548] fix test case fix test case update doc Closes #2409 from arjun4084346/bugFix Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/85202fab Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/85202fab Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/85202fab Branch: refs/heads/master Commit: 85202fab2be3f2938e48f12f3ed4339683e8020b Parents: 1fb2bb7 Author: Arjun <[email protected]> Authored: Tue Jul 31 20:33:33 2018 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Tue Jul 31 20:33:33 2018 -0700 ---------------------------------------------------------------------- .../cluster/GobblinHelixJobLauncherTest.java | 63 +++++++++++++------- 1 file changed, 41 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85202fab/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java index ef5fc4f..c66a9e8 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java @@ -186,6 +186,8 @@ public class GobblinHelixJobLauncherTest { properties.setProperty(ConfigurationKeys.WRITER_FILE_PATH, jobName); + properties.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS, "2"); + return properties; } @@ -295,59 +297,61 @@ public class GobblinHelixJobLauncherTest { final TaskDriver taskDriver = new TaskDriver(this.helixManager); - final String jobName = properties.getProperty(ConfigurationKeys.JOB_NAME_KEY); - final String jobIdKey = properties.getProperty(ConfigurationKeys.JOB_ID_KEY); - final String jobContextName = jobName + "_" + jobIdKey; - final String jobName2 = properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY); + final String jobIdKey1 = properties.getProperty(ConfigurationKeys.JOB_ID_KEY); + final String jobIdKey2 = properties2.getProperty(ConfigurationKeys.JOB_ID_KEY); + org.apache.helix.task.JobContext jobContext1 = taskDriver.getJobContext(jobIdKey1); + org.apache.helix.task.JobContext jobContext2 = taskDriver.getJobContext(jobIdKey2); - org.apache.helix.task.JobContext jobContext = taskDriver.getJobContext(jobContextName); + waitForWorkFlowStartup(taskDriver, jobIdKey1); + waitForWorkFlowStartup(taskDriver, jobIdKey2); // job context should be present until close - Assert.assertNotNull(jobContext); + Assert.assertNotNull(jobContext1); + Assert.assertNotNull(jobContext2); gobblinHelixJobLauncher.close(); - // job queue deleted asynchronously after close - waitForQueueCleanup(taskDriver, jobName); + // workflow deleted asynchronously after close + waitForWorkFlowCleanup(taskDriver, jobIdKey1); - jobContext = taskDriver.getJobContext(jobContextName); + jobContext1 = taskDriver.getJobContext(jobIdKey1); // job context should have been deleted - Assert.assertNull(jobContext); + Assert.assertNull(jobContext1); - // job queue should have been deleted - WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(jobName); + // workflow should have been deleted + WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(jobIdKey1); Assert.assertNull(workflowConfig); - WorkflowContext workflowContext = taskDriver.getWorkflowContext(jobName); + WorkflowContext workflowContext = taskDriver.getWorkflowContext(jobIdKey1); Assert.assertNull(workflowContext); - // second job queue with shared prefix should not be deleted when the first job queue is cleaned up - workflowConfig = taskDriver.getWorkflowConfig(jobName2); + // second workflow with shared prefix should not be deleted when the first workflow is cleaned up + workflowConfig = taskDriver.getWorkflowConfig(jobIdKey2); Assert.assertNotNull(workflowConfig); gobblinHelixJobLauncher2.close(); - // job queue deleted asynchronously after close - waitForQueueCleanup(taskDriver, jobName2); + // workflow deleted asynchronously after close + waitForWorkFlowCleanup(taskDriver, jobIdKey2); - workflowConfig = taskDriver.getWorkflowConfig(jobName2); + workflowConfig = taskDriver.getWorkflowConfig(jobIdKey2); Assert.assertNull(workflowConfig); // check that workunit and taskstate directory for the job are cleaned up final File workunitsDir = new File(this.appWorkDir + File.separator + GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME - + File.separator + jobIdKey); + + File.separator + jobIdKey1); final File taskstatesDir = new File(this.appWorkDir + File.separator + GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME - + File.separator + jobIdKey); + + File.separator + jobIdKey1); Assert.assertFalse(workunitsDir.exists()); Assert.assertFalse(taskstatesDir.exists()); // check that job.state file is cleaned up - final File jobStateFile = new File(GobblinClusterUtils.getJobStateFilePath(true, this.appWorkDir, jobIdKey).toString()); + final File jobStateFile = new File(GobblinClusterUtils.getJobStateFilePath(true, this.appWorkDir, jobIdKey1).toString()); Assert.assertFalse(jobStateFile.exists()); } @@ -364,7 +368,7 @@ public class GobblinHelixJobLauncherTest { } } - private void waitForQueueCleanup(TaskDriver taskDriver, String queueName) { + private void waitForWorkFlowCleanup(TaskDriver taskDriver, String queueName) { for (int i = 0; i < 60; i++) { WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(queueName); @@ -378,4 +382,19 @@ public class GobblinHelixJobLauncherTest { } } } + + private void waitForWorkFlowStartup(TaskDriver taskDriver, String workflow) { + for (int i = 0; i < 5; i++) { + WorkflowConfig workflowConfig = taskDriver.getWorkflowConfig(workflow); + + if (workflowConfig != null) { + break; + } + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + } }
