Repository: incubator-gobblin Updated Branches: refs/heads/master 106d1ba69 -> af141db59
[GOBBLIN-360] Fix cleanup of the job context from PROPERTYSTORE/TaskRebalancer Closes #2232 from htran1/helix_cleanup Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/af141db5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/af141db5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/af141db5 Branch: refs/heads/master Commit: af141db599c06934ea8ac9e5ac39b0576cb7a798 Parents: 106d1ba Author: Hung Tran <[email protected]> Authored: Mon Jan 8 22:47:05 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Mon Jan 8 22:47:05 2018 -0800 ---------------------------------------------------------------------- .../gobblin/cluster/GobblinHelixTaskDriver.java | 2 +- .../cluster/GobblinHelixJobLauncherTest.java | 148 ++++++++++++------- 2 files changed, 94 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/af141db5/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java index 9160610..bb5c551 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskDriver.java @@ -186,7 +186,7 @@ public class GobblinHelixTaskDriver { removeJobStateFromQueue(queueName, jobName); // Delete the job from property store - removeJobContext(_propertyStore, jobName); + removeJobContext(_propertyStore, namespacedJobName); } /** Remove the job name from the DAG from the queue configuration */ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/af141db5/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java index 3483c62..64be11b 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixJobLauncherTest.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; import org.apache.helix.InstanceType; +import org.apache.helix.task.TaskDriver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -88,16 +89,6 @@ public class GobblinHelixJobLauncherTest { private Path appWorkDir; - private String jobName; - - private File jobOutputFile; - - private GobblinHelixJobLauncher gobblinHelixJobLauncher; - - private GobblinHelixJobLauncher gobblinHelixJobLauncher1; - - private GobblinHelixJobLauncher gobblinHelixJobLauncher2; - private GobblinTaskRunner gobblinTaskRunner; private DatasetStateStore datasetStateStore; @@ -106,6 +97,8 @@ public class GobblinHelixJobLauncherTest { private final Closer closer = Closer.create(); + private Config baseConfig; + @BeforeClass public void setUp() throws Exception { TestingServer testingZKServer = this.closer.register(new TestingServer(-1)); @@ -115,13 +108,21 @@ public class GobblinHelixJobLauncherTest { GobblinHelixJobLauncherTest.class.getSimpleName() + ".conf"); Assert.assertNotNull(url, "Could not find resource " + url); - Config config = ConfigFactory.parseURL(url) + this.appWorkDir = new Path(GobblinHelixJobLauncherTest.class.getSimpleName()); + + // Prepare the source Json file + File sourceJsonFile = new File(this.appWorkDir.toString(), TestHelper.TEST_JOB_NAME + ".json"); + TestHelper.createSourceJsonFile(sourceJsonFile); + + baseConfig = ConfigFactory.parseURL(url) .withValue("gobblin.cluster.zk.connection.string", ConfigValueFactory.fromAnyRef(testingZKServer.getConnectString())) + .withValue(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, + ConfigValueFactory.fromAnyRef(sourceJsonFile.getAbsolutePath())) .resolve(); - String zkConnectingString = config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); - String helixClusterName = config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY); + String zkConnectingString = baseConfig.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); + String helixClusterName = baseConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY); HelixUtils.createGobblinHelixCluster(zkConnectingString, helixClusterName); @@ -136,11 +137,8 @@ public class GobblinHelixJobLauncherTest { }); this.helixManager.connect(); - Properties properties = ConfigUtils.configToProperties(config); - this.localFs = FileSystem.getLocal(new Configuration()); - this.appWorkDir = new Path(GobblinHelixJobLauncherTest.class.getSimpleName()); this.closer.register(new Closeable() { @Override public void close() throws IOException { @@ -150,39 +148,11 @@ public class GobblinHelixJobLauncherTest { } }); - this.jobName = config.getString(ConfigurationKeys.JOB_NAME_KEY); - - this.jobOutputFile = new File(config.getString(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR), - config.getString(ConfigurationKeys.WRITER_FILE_PATH) + File.separator + config - .getString(ConfigurationKeys.WRITER_FILE_NAME)); - - // Prepare the source Json file - File sourceJsonFile = new File(this.appWorkDir.toString(), TestHelper.TEST_JOB_NAME + ".json"); - TestHelper.createSourceJsonFile(sourceJsonFile); - properties.setProperty(ConfigurationKeys.SOURCE_FILEBASED_FILES_TO_PULL, sourceJsonFile.getAbsolutePath()); - - ConcurrentHashMap<String, Boolean> runningMap = new ConcurrentHashMap<>(); - - // Normal job launcher - properties.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_" + this.jobName + "_1504201348470"); - this.gobblinHelixJobLauncher = this.closer.register( - new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap)); - - // Job launcher(1) to test parallel job running - properties.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_" + this.jobName + "_1504201348471"); - this.gobblinHelixJobLauncher1 = this.closer.register( - new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap)); - - // Job launcher(2) to test parallel job running - properties.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_" + this.jobName + "_1504201348472"); - this.gobblinHelixJobLauncher2 = this.closer.register( - new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap)); - this.gobblinTaskRunner = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, TestHelper.TEST_HELIX_INSTANCE_NAME, - TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, config, Optional.of(appWorkDir)); + TestHelper.TEST_APPLICATION_ID, TestHelper.TEST_TASK_RUNNER_ID, baseConfig, Optional.of(appWorkDir)); - String stateStoreType = properties.getProperty(ConfigurationKeys.STATE_STORE_TYPE_KEY, + String stateStoreType = ConfigUtils.getString(baseConfig, ConfigurationKeys.STATE_STORE_TYPE_KEY, ConfigurationKeys.DEFAULT_STATE_STORE_TYPE); ClassAliasResolver<DatasetStateStore.Factory> resolver = @@ -191,7 +161,7 @@ public class GobblinHelixJobLauncherTest { DatasetStateStore.Factory stateStoreFactory = resolver.resolveClass(stateStoreType).newInstance(); - this.datasetStateStore = stateStoreFactory.createStateStore(config); + this.datasetStateStore = stateStoreFactory.createStateStore(baseConfig); this.thread = new Thread(new Runnable() { @Override @@ -202,15 +172,43 @@ public class GobblinHelixJobLauncherTest { this.thread.start(); } - public void testLaunchJob() throws JobException, IOException { - this.gobblinHelixJobLauncher.launchJob(null); + private Properties generateJobProperties(Config baseConfig, String jobNameSuffix, String jobIdSuffix) { + Properties properties = ConfigUtils.configToProperties(baseConfig); - Assert.assertTrue(this.jobOutputFile.exists()); + String jobName = properties.getProperty(ConfigurationKeys.JOB_NAME_KEY) + jobNameSuffix; + + properties.setProperty(ConfigurationKeys.JOB_NAME_KEY, jobName); + + properties.setProperty(ConfigurationKeys.JOB_ID_KEY, "job_" + jobName + jobIdSuffix); + + properties.setProperty(ConfigurationKeys.WRITER_FILE_PATH, jobName); + + return properties; + } + + private File getJobOutputFile(Properties properties) { + return new File(properties.getProperty(ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR), + properties.getProperty(ConfigurationKeys.WRITER_FILE_PATH) + File.separator + properties + .getProperty(ConfigurationKeys.WRITER_FILE_NAME)); + } + + public void testLaunchJob() throws Exception { + final ConcurrentHashMap<String, Boolean> runningMap = new ConcurrentHashMap<>(); + + // Normal job launcher + final Properties properties = generateJobProperties(this.baseConfig, "1", "_1504201348470"); + final GobblinHelixJobLauncher gobblinHelixJobLauncher = this.closer.register( + new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap)); + + gobblinHelixJobLauncher.launchJob(null); + + final File jobOutputFile = getJobOutputFile(properties); + Assert.assertTrue(jobOutputFile.exists()); Schema schema = new Schema.Parser().parse(TestHelper.SOURCE_SCHEMA); - TestHelper.assertGenericRecords(this.jobOutputFile, schema); + TestHelper.assertGenericRecords(jobOutputFile, schema); - List<JobState.DatasetState> datasetStates = this.datasetStateStore.getAll(this.jobName, + List<JobState.DatasetState> datasetStates = this.datasetStateStore.getAll(properties.getProperty(ConfigurationKeys.JOB_NAME_KEY), FsDatasetStateStore.CURRENT_DATASET_STATE_FILE_SUFFIX + FsDatasetStateStore.DATASET_STATE_STORE_TABLE_SUFFIX); Assert.assertEquals(datasetStates.size(), 1); JobState.DatasetState datasetState = datasetStates.get(0); @@ -243,14 +241,26 @@ public class GobblinHelixJobLauncherTest { } } - public void testLaunchMultipleJobs() throws JobException, IOException, InterruptedException { + public void testLaunchMultipleJobs() throws Exception { + final ConcurrentHashMap<String, Boolean> runningMap = new ConcurrentHashMap<>(); + + // Job launcher(1) to test parallel job running + final Properties properties1 = generateJobProperties(this.baseConfig, "2", "_1504201348471"); + final GobblinHelixJobLauncher gobblinHelixJobLauncher1 = this.closer.register( + new GobblinHelixJobLauncher(properties1, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap)); + + // Job launcher(2) to test parallel job running + final Properties properties2 = generateJobProperties(this.baseConfig, "2", "_1504201348472"); + final GobblinHelixJobLauncher gobblinHelixJobLauncher2 = this.closer.register( + new GobblinHelixJobLauncher(properties2, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap)); + CountDownLatch stg1 = new CountDownLatch(1); CountDownLatch stg2 = new CountDownLatch(1); CountDownLatch stg3 = new CountDownLatch(1); SuspendJobListener testListener = new SuspendJobListener(stg1, stg2); (new Thread(() -> { try { - GobblinHelixJobLauncherTest.this.gobblinHelixJobLauncher1.launchJob(testListener); + gobblinHelixJobLauncher1.launchJob(testListener); stg3.countDown(); } catch (JobException e) { } @@ -259,13 +269,41 @@ public class GobblinHelixJobLauncherTest { // Wait for the first job to start stg1.await(); // When first job is in the middle of running, launch the second job (which should do NOOP because previous job is still running) - this.gobblinHelixJobLauncher2.launchJob(testListener); + gobblinHelixJobLauncher2.launchJob(testListener); stg2.countDown(); // Wait for the first job to finish stg3.await(); Assert.assertEquals(testListener.getCompletes().get() == 1, true); } + public void testJobContextCleanup() throws Exception { + final ConcurrentHashMap<String, Boolean> runningMap = new ConcurrentHashMap<>(); + + final Properties properties = generateJobProperties(this.baseConfig, "3", "_1504201348473"); + final GobblinHelixJobLauncher gobblinHelixJobLauncher = + new GobblinHelixJobLauncher(properties, this.helixManager, this.appWorkDir, ImmutableList.<Tag<?>>of(), runningMap); + + gobblinHelixJobLauncher.launchJob(null); + + final TaskDriver taskDriver = new TaskDriver(this.helixManager); + + final String jobName = properties.getProperty(ConfigurationKeys.JOB_NAME_KEY); + final String jobIdKey = properties.getProperty(ConfigurationKeys.JOB_ID_KEY); + final String jobContextName = jobName + "_" + jobIdKey; + + org.apache.helix.task.JobContext jobContext = taskDriver.getJobContext(jobContextName); + + // job context should be present until close + Assert.assertNotNull(jobContext); + + gobblinHelixJobLauncher.close(); + + jobContext = taskDriver.getJobContext(jobContextName); + + // job context should have been deleted + Assert.assertNull(jobContext); + } + @AfterClass public void tearDown() throws IOException { try {
