Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 106d1ba69 -> af141db59


[GOBBLIN-360] Fix cleanup of the job context from PROPERTYSTORE/TaskRebalancer

Closes #2232 from htran1/helix_cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/af141db5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/af141db5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/af141db5

Branch: refs/heads/master
Commit: af141db599c06934ea8ac9e5ac39b0576cb7a798
Parents: 106d1ba
Author: Hung Tran <[email protected]>
Authored: Mon Jan 8 22:47:05 2018 -0800
Committer: Hung Tran <[email protected]>
Committed: Mon Jan 8 22:47:05 2018 -0800

----------------------------------------------------------------------
 .../gobblin/cluster/GobblinHelixTaskDriver.java |   2 +-
 .../cluster/GobblinHelixJobLauncherTest.java    | 148 ++++++++++++-------
 2 files changed, 94 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/af141db5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
index 9160610..bb5c551 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java
@@ -186,7 +186,7 @@ public class GobblinHelixTaskDriver {
     removeJobStateFromQueue(queueName, jobName);
 
     // Delete the job from property store
-    removeJobContext(_propertyStore, jobName);
+    removeJobContext(_propertyStore, namespacedJobName);
   }
 
   /** Remove the job name from the DAG from the queue configuration */

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/af141db5/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
index 3483c62..64be11b 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.task.TaskDriver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -88,16 +89,6 @@ public class GobblinHelixJobLauncherTest {
 
   private Path appWorkDir;
 
-  private String jobName;
-
-  private File jobOutputFile;
-
-  private GobblinHelixJobLauncher gobblinHelixJobLauncher;
-
-  private GobblinHelixJobLauncher gobblinHelixJobLauncher1;
-
-  private GobblinHelixJobLauncher gobblinHelixJobLauncher2;
-
   private GobblinTaskRunner gobblinTaskRunner;
 
   private DatasetStateStore datasetStateStore;
@@ -106,6 +97,8 @@ public class GobblinHelixJobLauncherTest {
 
   private final Closer closer = Closer.create();
 
+  private Config baseConfig;
+
   @BeforeClass
   public void setUp() throws Exception {
     TestingServer testingZKServer = this.closer.register(new 
TestingServer(-1));
@@ -115,13 +108,21 @@ public class GobblinHelixJobLauncherTest {
         GobblinHelixJobLauncherTest.class.getSimpleName() + ".conf");
     Assert.assertNotNull(url, "Could not find resource " + url);
 
-    Config config = ConfigFactory.parseURL(url)
+    this.appWorkDir = new 
Path(GobblinHelixJobLauncherTest.class.getSimpleName());
+
+    // Prepare the source Json file
+    File sourceJsonFile = new File(this.appWorkDir.toString(), 
TestHelper.TEST_JOB_NAME + ".json");
+    TestHelper.createSourceJsonFile(sourceJsonFile);
+
+    baseConfig = ConfigFactory.parseURL(url)
         .withValue("gobblin.cluster.zk.connection.string",
                    
ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString()))
+        .withValue(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL,
+            ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath()))
         .resolve();
 
-    String zkConnectingString = 
config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
-    String helixClusterName = 
config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+    String zkConnectingString = 
baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY);
+    String helixClusterName = 
baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
 
     HelixUtils.createGobblinHelixCluster(zkConnectingString, helixClusterName);
 
@@ -136,11 +137,8 @@ public class GobblinHelixJobLauncherTest {
     });
     this.helixManager.connect();
 
-    Properties properties = ConfigUtils.configToProperties(config);
-
     this.localFs = FileSystem.getLocal(new Configuration());
 
-    this.appWorkDir = new 
Path(GobblinHelixJobLauncherTest.class.getSimpleName());
     this.closer.register(new Closeable() {
       @Override
       public void close() throws IOException {
@@ -150,39 +148,11 @@ public class GobblinHelixJobLauncherTest {
       }
     });
 
-    this.jobName = config.getString(ConfigurationKeys.JOB_NAME_KEY);
-
-    this.jobOutputFile = new 
File(config.getString(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR),
-        config.getString(ConfigurationKeys.WRITER_FILE_PATH) + File.separator 
+ config
-            .getString(ConfigurationKeys.WRITER_FILE_NAME));
-
-    // Prepare the source Json file
-    File sourceJsonFile = new File(this.appWorkDir.toString(), 
TestHelper.TEST_JOB_NAME + ".json");
-    TestHelper.createSourceJsonFile(sourceJsonFile);
-    properties.setProperty(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, 
sourceJsonFile.getAbsolutePath());
-
-    ConcurrentHashMap<String, Boolean> runningMap = new ConcurrentHashMap<>();
-
-    // Normal job launcher
-    properties.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_" + this.jobName 
+ "_1504201348470");
-    this.gobblinHelixJobLauncher = this.closer.register(
-        new GobblinHelixJobLauncher(properties, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap));
-
-    // Job launcher(1) to test parallel job running
-    properties.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_" + this.jobName 
+ "_1504201348471");
-    this.gobblinHelixJobLauncher1 = this.closer.register(
-        new GobblinHelixJobLauncher(properties, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap));
-
-    // Job launcher(2) to test parallel job running
-    properties.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_" + this.jobName 
+ "_1504201348472");
-    this.gobblinHelixJobLauncher2 = this.closer.register(
-        new GobblinHelixJobLauncher(properties, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap));
-
     this.gobblinTaskRunner =
         new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, 
TestHelper.TEST_HELIX_INSTANCE_NAME,
-            TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, 
config, Optional.of(appWorkDir));
+            TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, 
baseConfig, Optional.of(appWorkDir));
 
-    String stateStoreType = 
properties.getProperty(ConfigurationKeys.STATE_STORE_TYPE_KEY,
+    String stateStoreType = ConfigUtils.getString(baseConfig, 
ConfigurationKeys.STATE_STORE_TYPE_KEY,
         ConfigurationKeys.DEFAULT_STATE_STORE_TYPE);
 
     ClassAliasResolver<DatasetStateStore.Factory> resolver =
@@ -191,7 +161,7 @@ public class GobblinHelixJobLauncherTest {
     DatasetStateStore.Factory stateStoreFactory =
         resolver.resolveClass(stateStoreType).newInstance();
 
-    this.datasetStateStore = stateStoreFactory.createStateStore(config);
+    this.datasetStateStore = stateStoreFactory.createStateStore(baseConfig);
 
     this.thread = new Thread(new Runnable() {
       @Override
@@ -202,15 +172,43 @@ public class GobblinHelixJobLauncherTest {
     this.thread.start();
   }
 
-  public void testLaunchJob() throws JobException, IOException {
-    this.gobblinHelixJobLauncher.launchJob(null);
+  private Properties generateJobProperties(Config baseConfig, String 
jobNameSuffix, String jobIdSuffix) {
+    Properties properties = ConfigUtils.configToProperties(baseConfig);
 
-    Assert.assertTrue(this.jobOutputFile.exists());
+    String jobName = properties.getProperty(ConfigurationKeys.JOB_NAME_KEY) + 
jobNameSuffix;
+
+    properties.setProperty(ConfigurationKeys.JOB_NAME_KEY, jobName);
+
+    properties.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_" + jobName + 
jobIdSuffix);
+
+    properties.setProperty(ConfigurationKeys.WRITER_FILE_PATH, jobName);
+
+    return properties;
+  }
+
+  private File getJobOutputFile(Properties properties) {
+    return new 
File(properties.getProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR),
+        properties.getProperty(ConfigurationKeys.WRITER_FILE_PATH) + 
File.separator + properties
+            .getProperty(ConfigurationKeys.WRITER_FILE_NAME));
+  }
+
+  public void testLaunchJob() throws Exception {
+    final ConcurrentHashMap<String, Boolean> runningMap = new 
ConcurrentHashMap<>();
+
+    // Normal job launcher
+    final Properties properties = generateJobProperties(this.baseConfig, "1", 
"_1504201348470");
+    final GobblinHelixJobLauncher gobblinHelixJobLauncher = 
this.closer.register(
+        new GobblinHelixJobLauncher(properties, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap));
+
+    gobblinHelixJobLauncher.launchJob(null);
+
+    final File jobOutputFile = getJobOutputFile(properties);
+    Assert.assertTrue(jobOutputFile.exists());
 
     Schema schema = new Schema.Parser().parse(TestHelper.SOURCE_SCHEMA);
-    TestHelper.assertGenericRecords(this.jobOutputFile, schema);
+    TestHelper.assertGenericRecords(jobOutputFile, schema);
 
-    List<JobState.DatasetState> datasetStates = 
this.datasetStateStore.getAll(this.jobName,
+    List<JobState.DatasetState> datasetStates = 
this.datasetStateStore.getAll(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY),
         FsDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + 
FsDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX);
     Assert.assertEquals(datasetStates.size(), 1);
     JobState.DatasetState datasetState = datasetStates.get(0);
@@ -243,14 +241,26 @@ public class GobblinHelixJobLauncherTest {
     }
   }
 
-  public void testLaunchMultipleJobs() throws JobException, IOException, 
InterruptedException {
+  public void testLaunchMultipleJobs() throws Exception {
+    final ConcurrentHashMap<String, Boolean> runningMap = new 
ConcurrentHashMap<>();
+
+    // Job launcher(1) to test parallel job running
+    final Properties properties1 = generateJobProperties(this.baseConfig, "2", 
"_1504201348471");
+    final GobblinHelixJobLauncher gobblinHelixJobLauncher1 = 
this.closer.register(
+        new GobblinHelixJobLauncher(properties1, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap));
+
+    // Job launcher(2) to test parallel job running
+    final Properties properties2 = generateJobProperties(this.baseConfig, "2", 
"_1504201348472");
+    final GobblinHelixJobLauncher gobblinHelixJobLauncher2 = 
this.closer.register(
+        new GobblinHelixJobLauncher(properties2, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap));
+
     CountDownLatch stg1 = new CountDownLatch(1);
     CountDownLatch stg2 = new CountDownLatch(1);
     CountDownLatch stg3 = new CountDownLatch(1);
     SuspendJobListener testListener = new SuspendJobListener(stg1, stg2);
     (new Thread(() -> {
       try {
-        
GobblinHelixJobLauncherTest.this.gobblinHelixJobLauncher1.launchJob(testListener);
+        gobblinHelixJobLauncher1.launchJob(testListener);
         stg3.countDown();
       } catch (JobException e) {
       }
@@ -259,13 +269,41 @@ public class GobblinHelixJobLauncherTest {
     // Wait for the first job to start
     stg1.await();
     // When first job is in the middle of running, launch the second job 
(which should do NOOP because previous job is still running)
-    this.gobblinHelixJobLauncher2.launchJob(testListener);
+    gobblinHelixJobLauncher2.launchJob(testListener);
     stg2.countDown();
     // Wait for the first job to finish
     stg3.await();
     Assert.assertEquals(testListener.getCompletes().get() == 1, true);
   }
 
+  public void testJobContextCleanup() throws Exception {
+    final ConcurrentHashMap<String, Boolean> runningMap = new 
ConcurrentHashMap<>();
+
+    final Properties properties = generateJobProperties(this.baseConfig, "3", 
"_1504201348473");
+    final GobblinHelixJobLauncher gobblinHelixJobLauncher =
+        new GobblinHelixJobLauncher(properties, this.helixManager, 
this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap);
+
+    gobblinHelixJobLauncher.launchJob(null);
+
+    final TaskDriver taskDriver = new TaskDriver(this.helixManager);
+
+    final String jobName = 
properties.getProperty(ConfigurationKeys.JOB_NAME_KEY);
+    final String jobIdKey = 
properties.getProperty(ConfigurationKeys.JOB_ID_KEY);
+    final String jobContextName = jobName + "_" + jobIdKey;
+
+    org.apache.helix.task.JobContext jobContext = 
taskDriver.getJobContext(jobContextName);
+
+    // job context should be present until close
+    Assert.assertNotNull(jobContext);
+
+    gobblinHelixJobLauncher.close();
+
+    jobContext = taskDriver.getJobContext(jobContextName);
+
+    // job context should have been deleted
+    Assert.assertNull(jobContext);
+  }
+
   @AfterClass
   public void tearDown() throws IOException {
     try {

Reply via email to