Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 1fb2bb732 -> 85202fab2


[GOBBLIN-548] fix test case

fix test case

update doc

Closes #2409 from arjun4084346/bugFix


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/85202fab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/85202fab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/85202fab

Branch: refs/heads/master
Commit: 85202fab2be3f2938e48f12f3ed4339683e8020b
Parents: 1fb2bb7
Author: Arjun <[email protected]>
Authored: Tue Jul 31 20:33:33 2018 -0700
Committer: Abhishek Tiwari <[email protected]>
Committed: Tue Jul 31 20:33:33 2018 -0700

----------------------------------------------------------------------
 .../cluster/GobblinHelixJobLauncherTest.java    | 63 +++++++++++++-------
 1 file changed, 41 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/85202fab/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index ef5fc4f..c66a9e8 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -186,6 +186,8 @@ public class GobblinHelixJobLauncherTest {
 
     properties.setProperty(ConfigurationKeys.WRITER_FILE_PATH, jobName);
 
+    
properties.setProperty(GobblinClusterConfigurationKeys.HELIX_WORKFLOW_EXPIRY_TIME_SECONDS,
 "2");
+
     return properties;
   }
 
@@ -295,59 +297,61 @@ public class GobblinHelixJobLauncherTest {
 
     final TaskDriver taskDriver = new TaskDriver(this.helixManager);
 
-    final String jobName = 
properties.getProperty(ConfigurationKeys.JOB_NAME_KEY);
-    final String jobIdKey = 
properties.getProperty(ConfigurationKeys.JOB_ID_KEY);
-    final String jobContextName = jobName + "_" + jobIdKey;
-    final String jobName2 = 
properties2.getProperty(ConfigurationKeys.JOB_NAME_KEY);
+    final String jobIdKey1 = 
properties.getProperty(ConfigurationKeys.JOB_ID_KEY);
+    final String jobIdKey2 = 
properties2.getProperty(ConfigurationKeys.JOB_ID_KEY);
+    org.apache.helix.task.JobContext jobContext1 = 
taskDriver.getJobContext(jobIdKey1);
+    org.apache.helix.task.JobContext jobContext2 = 
taskDriver.getJobContext(jobIdKey2);
 
-    org.apache.helix.task.JobContext jobContext = 
taskDriver.getJobContext(jobContextName);
+    waitForWorkFlowStartup(taskDriver, jobIdKey1);
+    waitForWorkFlowStartup(taskDriver, jobIdKey2);
 
     // job context should be present until close
-    Assert.assertNotNull(jobContext);
+    Assert.assertNotNull(jobContext1);
+    Assert.assertNotNull(jobContext2);
 
     gobblinHelixJobLauncher.close();
 
-    // job queue deleted asynchronously after close
-    waitForQueueCleanup(taskDriver, jobName);
+    // workflow deleted asynchronously after close
+    waitForWorkFlowCleanup(taskDriver, jobIdKey1);
 
-    jobContext = taskDriver.getJobContext(jobContextName);
+    jobContext1 = taskDriver.getJobContext(jobIdKey1);
 
     // job context should have been deleted
-    Assert.assertNull(jobContext);
+    Assert.assertNull(jobContext1);
 
-    // job queue should have been deleted
-    WorkflowConfig workflowConfig  = taskDriver.getWorkflowConfig(jobName);
+    // workflow should have been deleted
+    WorkflowConfig workflowConfig  = taskDriver.getWorkflowConfig(jobIdKey1);
     Assert.assertNull(workflowConfig);
 
-    WorkflowContext workflowContext = taskDriver.getWorkflowContext(jobName);
+    WorkflowContext workflowContext = taskDriver.getWorkflowContext(jobIdKey1);
     Assert.assertNull(workflowContext);
 
-    // second job queue with shared prefix should not be deleted when the 
first job queue is cleaned up
-    workflowConfig  = taskDriver.getWorkflowConfig(jobName2);
+    // second workflow with shared prefix should not be deleted when the first 
workflow is cleaned up
+    workflowConfig  = taskDriver.getWorkflowConfig(jobIdKey2);
     Assert.assertNotNull(workflowConfig);
 
     gobblinHelixJobLauncher2.close();
 
-    // job queue deleted asynchronously after close
-    waitForQueueCleanup(taskDriver, jobName2);
+    // workflow deleted asynchronously after close
+    waitForWorkFlowCleanup(taskDriver, jobIdKey2);
 
-    workflowConfig  = taskDriver.getWorkflowConfig(jobName2);
+    workflowConfig  = taskDriver.getWorkflowConfig(jobIdKey2);
     Assert.assertNull(workflowConfig);
 
     // check that workunit and taskstate directory for the job are cleaned up
     final File workunitsDir =
         new File(this.appWorkDir + File.separator + 
GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME
-        + File.separator + jobIdKey);
+        + File.separator + jobIdKey1);
 
     final File taskstatesDir =
         new File(this.appWorkDir + File.separator + 
GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME
-            + File.separator + jobIdKey);
+            + File.separator + jobIdKey1);
 
     Assert.assertFalse(workunitsDir.exists());
     Assert.assertFalse(taskstatesDir.exists());
 
     // check that job.state file is cleaned up
-    final File jobStateFile = new 
File(GobblinClusterUtils.getJobStateFilePath(true, this.appWorkDir, 
jobIdKey).toString());
+    final File jobStateFile = new 
File(GobblinClusterUtils.getJobStateFilePath(true, this.appWorkDir, 
jobIdKey1).toString());
 
     Assert.assertFalse(jobStateFile.exists());
   }
@@ -364,7 +368,7 @@ public class GobblinHelixJobLauncherTest {
     }
   }
 
-   private void waitForQueueCleanup(TaskDriver taskDriver, String queueName) {
+   private void waitForWorkFlowCleanup(TaskDriver taskDriver, String 
queueName) {
      for (int i = 0; i < 60; i++) {
        WorkflowConfig workflowConfig  = 
taskDriver.getWorkflowConfig(queueName);
 
@@ -378,4 +382,19 @@ public class GobblinHelixJobLauncherTest {
        }
      }
   }
+
+  private void waitForWorkFlowStartup(TaskDriver taskDriver, String workflow) {
+    for (int i = 0; i < 5; i++) {
+      WorkflowConfig workflowConfig  = taskDriver.getWorkflowConfig(workflow);
+
+      if (workflowConfig != null) {
+        break;
+      }
+
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+      }
+    }
+  }
 }

Reply via email to