This is an automated email from the ASF dual-hosted git repository. suvasude pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push: new 9ef461a [GOBBLIN-1107] Lazily initialize Helix TaskStateModelFactory in Gobbli… 9ef461a is described below commit 9ef461ad37bb54c023bd383608a3c4024adad06f Author: sv2000 <sudarsh...@gmail.com> AuthorDate: Fri Apr 3 09:29:02 2020 -0700 [GOBBLIN-1107] Lazily initialize Helix TaskStateModelFactory in Gobbli… Closes #2947 from sv2000/taskRunner --- .../apache/gobblin/cluster/GobblinTaskRunner.java | 96 +++++++++++----------- .../gobblin/cluster/ClusterIntegrationTest.java | 12 ++- .../cluster/ClusterIntegrationTestUtils.java | 76 +++++++++++++++++ .../gobblin/cluster/GobblinTaskRunnerTest.java | 93 +++++++++++++++++---- .../cluster/HelixAssignedParticipantCheckTest.java | 37 +++------ .../cluster/suite/IntegrationBasicSuite.java | 9 +- .../cluster/suite/IntegrationJobCancelSuite.java | 19 +---- .../suite/IntegrationJobRestartViaSpecSuite.java | 14 +--- 8 files changed, 236 insertions(+), 120 deletions(-) diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index 2d35dea..7ed555e 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -120,47 +120,37 @@ public class GobblinTaskRunner implements StandardMetricsBridge { public static final String CLUSTER_APP_WORK_DIR = GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX + "appWorkDir"; private static final Logger logger = LoggerFactory.getLogger(GobblinTaskRunner.class); - static final java.nio.file.Path CLUSTER_CONF_PATH = Paths.get("generated-gobblin-cluster.conf"); + static final java.nio.file.Path CLUSTER_CONF_PATH = Paths.get("generated-gobblin-cluster.conf"); static final String GOBBLIN_TASK_FACTORY_NAME = "GobblinTaskFactory"; - static final String GOBBLIN_JOB_FACTORY_NAME = "GobblinJobFactory"; private final String helixInstanceName; - private final String clusterName; + private final Optional<ContainerMetrics> containerMetrics; + private final List<Service> services = Lists.newArrayList(); + private final Path appWorkPath; @Getter private HelixManager jobHelixManager; - private Optional<HelixManager> taskDriverHelixManager = Optional.absent(); - - private final ServiceManager serviceManager; - - private final TaskStateModelFactory taskStateModelFactory; - - private final Optional<ContainerMetrics> containerMetrics; - - protected final String taskRunnerId; - + private ServiceManager serviceManager; + private TaskStateModelFactory taskStateModelFactory; + private boolean isTaskDriver; + private boolean dedicatedTaskDriverCluster; + private Collection<StandardMetricsBridge.StandardMetrics> metricsCollection; + @Getter + private volatile boolean started = false; private volatile boolean stopInProgress = false; - private volatile boolean isStopped = false; + protected final String taskRunnerId; protected final EventBus eventBus = new EventBus(GobblinTaskRunner.class.getSimpleName()); - protected final Config clusterConfig; - @Getter protected final FileSystem fs; - private final List<Service> services = Lists.newArrayList(); protected final String applicationName; protected final String applicationId; - private final Path appWorkPath; - private boolean isTaskDriver; - private boolean dedicatedTaskDriverCluster; - - private final Collection<StandardMetricsBridge.StandardMetrics> metricsCollection; public GobblinTaskRunner(String applicationName, String helixInstanceName, @@ -191,6 +181,17 @@ public class GobblinTaskRunner implements StandardMetricsBridge { this.containerMetrics = buildContainerMetrics(); + logger.info("GobblinTaskRunner({}): applicationName {}, helixInstanceName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}", + this.isTaskDriver? "taskDriver" : "worker", + applicationName, + helixInstanceName, + applicationId, + taskRunnerId, + config, + appWorkDirOptional); + } + + private TaskRunnerSuiteBase initTaskRunnerSuiteBase() throws ReflectiveOperationException { String builderStr = ConfigUtils.getString(this.clusterConfig, GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER, TaskRunnerSuiteBase.Builder.class.getName()); @@ -203,10 +204,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge { } TaskRunnerSuiteBase.Builder builder = GobblinConstructorUtils.<TaskRunnerSuiteBase.Builder>invokeLongestConstructor( - new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class) - .resolveClass(builderStr), this.clusterConfig); + new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class) + .resolveClass(builderStr), this.clusterConfig); - TaskRunnerSuiteBase suite = builder.setAppWorkPath(this.appWorkPath) + return builder.setAppWorkPath(this.appWorkPath) .setContainerMetrics(this.containerMetrics) .setFileSystem(this.fs) .setJobHelixManager(this.jobHelixManager) @@ -216,27 +217,6 @@ public class GobblinTaskRunner implements StandardMetricsBridge { .setContainerId(taskRunnerId) .setHostName(hostName) .build(); - - this.taskStateModelFactory = createTaskStateModelFactory(suite.getTaskFactoryMap()); - this.metricsCollection = suite.getMetricsCollection(); - this.services.addAll(suite.getServices()); - - this.services.addAll(getServices()); - - if (this.services.isEmpty()) { - this.serviceManager = null; - } else { - this.serviceManager = new ServiceManager(services); - } - - logger.info("GobblinTaskRunner({}): applicationName {}, helixInstanceName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}", - this.isTaskDriver?"taskDriver" : "worker", - applicationName, - helixInstanceName, - applicationId, - taskRunnerId, - config, - appWorkDirOptional); } private Path initAppWorkDir(Config config, Optional<Path> appWorkDirOptional) { @@ -304,6 +284,27 @@ public class GobblinTaskRunner implements StandardMetricsBridge { connectHelixManagerWithRetry(); + TaskRunnerSuiteBase suite; + try { + suite = initTaskRunnerSuiteBase(); + synchronized (this) { + this.taskStateModelFactory = createTaskStateModelFactory(suite.getTaskFactoryMap()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + this.metricsCollection = suite.getMetricsCollection(); + this.services.addAll(suite.getServices()); + + this.services.addAll(getServices()); + + if (this.services.isEmpty()) { + this.serviceManager = null; + } else { + this.serviceManager = new ServiceManager(services); + } + addInstanceTags(); // Start metric reporting @@ -315,7 +316,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge { if (this.serviceManager != null) { this.serviceManager.startAsync(); + started = true; this.serviceManager.awaitStopped(); + } else { + started = true; } } diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java index e85413f..5d214a2 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java @@ -80,7 +80,9 @@ public class ClusterIntegrationTest { @Test void testJobShouldGetCancelled() throws Exception { - this.suite =new IntegrationJobCancelSuite(); + Config jobConfigOverrides = ClusterIntegrationTestUtils.buildSleepingJob(IntegrationJobCancelSuite.JOB_ID, + IntegrationJobCancelSuite.TASK_STATE_FILE); + this.suite =new IntegrationJobCancelSuite(jobConfigOverrides); HelixManager helixManager = getHelixManager(); suite.startCluster(); helixManager.connect(); @@ -121,7 +123,9 @@ public class ClusterIntegrationTest { */ @Test (dependsOnMethods = { "testJobShouldGetCancelled" }, groups = {"disabledOnTravis"}) public void testJobRestartViaSpec() throws Exception { - this.suite = new IntegrationJobRestartViaSpecSuite(); + Config jobConfigOverrides = ClusterIntegrationTestUtils.buildSleepingJob(IntegrationJobCancelSuite.JOB_ID, + IntegrationJobCancelSuite.TASK_STATE_FILE); + this.suite = new IntegrationJobRestartViaSpecSuite(jobConfigOverrides); HelixManager helixManager = getHelixManager(); IntegrationJobRestartViaSpecSuite restartViaSpecSuite = (IntegrationJobRestartViaSpecSuite) this.suite; @@ -132,10 +136,10 @@ public class ClusterIntegrationTest { helixManager.connect(); AssertWithBackoff.create().timeoutMs(30000).maxSleepMs(1000).backoffFactor(1). - assertTrue(isTaskStarted(helixManager, IntegrationJobCancelSuite.JOB_ID), "Waiting for the job to start..."); + assertTrue(isTaskStarted(helixManager, IntegrationJobRestartViaSpecSuite.JOB_ID), "Waiting for the job to start..."); AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1). - assertTrue(isTaskRunning(IntegrationJobCancelSuite.TASK_STATE_FILE), "Waiting for the task to enter running state"); + assertTrue(isTaskRunning(IntegrationJobRestartViaSpecSuite.TASK_STATE_FILE), "Waiting for the task to enter running state"); ZkClient zkClient = new ZkClient(this.zkConnectString); PathBasedZkSerializer zkSerializer = ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()).build(); diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTestUtils.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTestUtils.java new file mode 100644 index 0000000..28d030a --- /dev/null +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTestUtils.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import org.apache.helix.HelixManager; +import org.apache.helix.PropertyPathBuilder; +import org.apache.helix.manager.zk.ZkClient; +import org.testng.Assert; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.configuration.ConfigurationKeys; + +public class ClusterIntegrationTestUtils { + /** + * A utility method to build a job that uses the {@link SleepingCustomTaskSource} with the provided config overrides. + * @param jobId + * @param taskStateFile + * @return job config with overrides + */ + public static Config buildSleepingJob(String jobId, String taskStateFile) { + return buildSleepingJob(jobId, taskStateFile, 10L); + } + + /** + * A utility method to build a job that uses the {@link SleepingCustomTaskSource} with the provided config overrides. + * @param jobId + * @param taskStateFile + * @param helixJobTimeoutSecs + * @return job config with overrides + */ + public static Config buildSleepingJob(String jobId, String taskStateFile, Long helixJobTimeoutSecs) { + Config jobConfig = ConfigFactory.empty().withValue(SleepingTask.TASK_STATE_FILE_KEY, ConfigValueFactory.fromAnyRef(taskStateFile)) + .withValue(ConfigurationKeys.JOB_ID_KEY, ConfigValueFactory.fromAnyRef(jobId)) + .withValue(ConfigurationKeys.SOURCE_CLASS_KEY, ConfigValueFactory.fromAnyRef(SleepingCustomTaskSource.class.getName())) + .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, ConfigValueFactory.fromAnyRef(Boolean.TRUE)) + .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, ConfigValueFactory.fromAnyRef(helixJobTimeoutSecs)); + return jobConfig; + } + + /** + * A utility method that creates a partial instance structure in ZK. + */ + public static void createPartialInstanceStructure(HelixManager helixManager, String zkConnectString) { + //Connect and disconnect the helixManager to create a Helix Instance set up. + try { + helixManager.connect(); + helixManager.disconnect(); + } catch (Exception e) { + Assert.fail("Failed to connect to ZK"); + } + + //Delete ERRORS/HISTORY/STATUSUPDATES znodes under INSTANCES to simulate partial instance set up. + ZkClient zkClient = new ZkClient(zkConnectString); + zkClient.delete(PropertyPathBuilder.instanceError(helixManager.getClusterName(), helixManager.getInstanceName())); + zkClient.delete(PropertyPathBuilder.instanceHistory(helixManager.getClusterName(), helixManager.getInstanceName())); + zkClient.delete(PropertyPathBuilder.instanceStatusUpdate(helixManager.getClusterName(), helixManager.getInstanceName())); + } +} diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java index 0e48661..98d13df 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinTaskRunnerTest.java @@ -19,13 +19,16 @@ package org.apache.gobblin.cluster; import java.io.IOException; import java.net.URL; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.curator.test.TestingServer; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.helix.HelixException; -import org.apache.helix.PropertyPathBuilder; -import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.Assert; @@ -39,6 +42,7 @@ import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; +import org.apache.gobblin.cluster.suite.IntegrationBasicSuite; import org.apache.gobblin.testing.AssertWithBackoff; @@ -56,6 +60,9 @@ import org.apache.gobblin.testing.AssertWithBackoff; public class GobblinTaskRunnerTest { public final static Logger LOG = LoggerFactory.getLogger(GobblinTaskRunnerTest.class); + private static final String JOB_ID = "job_taskRunnerTestJob_" + System.currentTimeMillis(); + private static final String TASK_STATE_FILE = "/tmp/" + GobblinTaskRunnerTest.class.getSimpleName() + "/taskState/_RUNNING"; + public static final String HADOOP_OVERRIDE_PROPERTY_NAME = "prop"; private TestingServer testingZKServer; @@ -66,6 +73,7 @@ public class GobblinTaskRunnerTest { private GobblinTaskRunner corruptGobblinTaskRunner; private String clusterName; private String corruptHelixInstance; + private TaskAssignmentAfterConnectionRetry suite; @BeforeClass public void setUp() throws Exception { @@ -110,7 +118,19 @@ public class GobblinTaskRunnerTest { @Test public void testSendReceiveShutdownMessage() throws Exception { + ExecutorService service = Executors.newSingleThreadExecutor(); + service.submit(() -> GobblinTaskRunnerTest.this.gobblinTaskRunner.start()); + Logger log = LoggerFactory.getLogger("testSendReceiveShutdownMessage"); + + // Give Helix some time to start the task runner + AssertWithBackoff.create().logger(log).timeoutMs(20000) + .assertTrue(new Predicate<Void>() { + @Override public boolean apply(Void input) { + return GobblinTaskRunnerTest.this.gobblinTaskRunner.isStarted(); + } + }, "gobblinTaskRunner started"); + this.gobblinClusterManager.sendShutdownRequest(); // Give Helix some time to handle the message @@ -128,21 +148,13 @@ public class GobblinTaskRunnerTest { Assert.assertEquals(fileSystem.getConf().get(HADOOP_OVERRIDE_PROPERTY_NAME), "value"); } + @Test public void testConnectHelixManagerWithRetry() { - //Connect and disconnect the corrupt task runner to create a Helix Instance set up. - try { - this.corruptGobblinTaskRunner.connectHelixManager(); - this.corruptGobblinTaskRunner.disconnectHelixManager(); - } catch (Exception e) { - Assert.fail("Failed to connect to ZK"); - } + HelixManager instanceManager = HelixManagerFactory.getZKHelixManager( + clusterName, corruptHelixInstance, InstanceType.PARTICIPANT, testingZKServer.getConnectString()); - //Delete ERRORS/HISTORY/STATUSUPDATES znodes under INSTANCES to simulate partial instance set up. - ZkClient zkClient = new ZkClient(testingZKServer.getConnectString()); - zkClient.delete(PropertyPathBuilder.instanceError(clusterName, corruptHelixInstance)); - zkClient.delete(PropertyPathBuilder.instanceHistory(clusterName, corruptHelixInstance)); - zkClient.delete(PropertyPathBuilder.instanceStatusUpdate(clusterName, corruptHelixInstance)); + ClusterIntegrationTestUtils.createPartialInstanceStructure(instanceManager, testingZKServer.getConnectString()); //Ensure that the connecting to Helix without retry will throw a HelixException try { @@ -158,11 +170,62 @@ public class GobblinTaskRunnerTest { Assert.assertTrue(true); } + @Test (groups = {"disabledOnTravis"}) + public void testTaskAssignmentAfterHelixConnectionRetry() + throws Exception { + Config jobConfigOverrides = ClusterIntegrationTestUtils.buildSleepingJob(JOB_ID, TASK_STATE_FILE); + this.suite = new TaskAssignmentAfterConnectionRetry(jobConfigOverrides); + + String zkConnectString = suite.getManagerConfig().getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); + String clusterName = suite.getManagerConfig().getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY); + //A test manager instance for observing the state of the cluster + HelixManager helixManager = HelixManagerFactory.getZKHelixManager(clusterName, "TestManager", InstanceType.SPECTATOR, zkConnectString); + + suite.startCluster(); + + helixManager.connect(); + + //Ensure that Helix has created a workflow + AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1). + assertTrue(ClusterIntegrationTest.isTaskStarted(helixManager, JOB_ID), "Waiting for the job to start..."); + + //Ensure that the SleepingTask is running + AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1). + assertTrue(ClusterIntegrationTest.isTaskRunning(TASK_STATE_FILE),"Waiting for the task to enter running state"); + + helixManager.disconnect(); + } + + + public static class TaskAssignmentAfterConnectionRetry extends IntegrationBasicSuite { + TaskAssignmentAfterConnectionRetry(Config jobConfigOverrides) { + super(jobConfigOverrides); + } + + @Override + protected void createHelixCluster() throws Exception { + super.createHelixCluster(); + String clusterName = super.getManagerConfig().getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY); + String zkConnectString = super.getManagerConfig().getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); + HelixManager helixManager = HelixManagerFactory + .getZKHelixManager(clusterName, IntegrationBasicSuite.WORKER_INSTANCE_0, InstanceType.PARTICIPANT, zkConnectString); + + //Create a partial instance setup + ClusterIntegrationTestUtils.createPartialInstanceStructure(helixManager, zkConnectString); + } + } + + @AfterClass - public void tearDown() throws IOException { + public void tearDown() + throws IOException, InterruptedException { try { this.gobblinClusterManager.disconnectHelixManager(); this.gobblinTaskRunner.disconnectHelixManager(); + this.corruptGobblinTaskRunner.disconnectHelixManager(); + if (this.suite != null) { + this.suite.shutdownCluster(); + } } finally { this.testingZKServer.close(); } diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java index eec52b1..1aecd94 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/HelixAssignedParticipantCheckTest.java @@ -17,7 +17,6 @@ package org.apache.gobblin.cluster; import java.io.IOException; -import java.util.Map; import org.apache.helix.HelixManager; import org.apache.helix.HelixManagerFactory; @@ -27,27 +26,28 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableMap; import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; import org.apache.gobblin.cluster.suite.IntegrationBasicSuite; import org.apache.gobblin.commit.CommitStepException; -import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.testing.AssertWithBackoff; public class HelixAssignedParticipantCheckTest { - private IntegrationJobSuite suite; + private static final String JOB_ID = "job_testJob_345"; + private static final String TASK_STATE_FILE = "/tmp/" + HelixAssignedParticipantCheckTest.class.getSimpleName() + "/taskState/_RUNNING"; + + private IntegrationBasicSuite suite; private HelixManager helixManager; private Config helixConfig; @BeforeClass public void setUp() throws Exception { - //Set up a Gobblin Helix cluster - suite = new IntegrationJobSuite(); + Config jobConfigOverrides = ClusterIntegrationTestUtils.buildSleepingJob(JOB_ID, TASK_STATE_FILE); + //Set up a Gobblin Helix cluster integration job + suite = new IntegrationBasicSuite(jobConfigOverrides); helixConfig = suite.getManagerConfig(); String clusterName = helixConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY); @@ -66,10 +66,10 @@ public class HelixAssignedParticipantCheckTest { //Ensure that Helix has created a workflow AssertWithBackoff.create().maxSleepMs(1000).backoffFactor(1). - assertTrue(ClusterIntegrationTest.isTaskStarted(helixManager, IntegrationJobSuite.JOB_ID), "Waiting for the job to start..."); + assertTrue(ClusterIntegrationTest.isTaskStarted(helixManager, JOB_ID), "Waiting for the job to start..."); //Instantiate config for HelixAssignedParticipantCheck - String helixJobId = Joiner.on("_").join(IntegrationJobSuite.JOB_ID, IntegrationJobSuite.JOB_ID); + String helixJobId = Joiner.on("_").join(JOB_ID, JOB_ID); helixConfig = helixConfig.withValue(GobblinClusterConfigurationKeys.HELIX_INSTANCE_NAME_KEY, ConfigValueFactory.fromAnyRef(IntegrationBasicSuite.WORKER_INSTANCE_0)) .withValue(GobblinClusterConfigurationKeys.HELIX_JOB_ID_KEY, ConfigValueFactory.fromAnyRef(helixJobId)) @@ -78,7 +78,7 @@ public class HelixAssignedParticipantCheckTest { //Ensure that the SleepingTask is running AssertWithBackoff.create().maxSleepMs(100).timeoutMs(2000).backoffFactor(1). - assertTrue(ClusterIntegrationTest.isTaskRunning(IntegrationJobSuite.TASK_STATE_FILE),"Waiting for the task to enter running state"); + assertTrue(ClusterIntegrationTest.isTaskRunning(TASK_STATE_FILE),"Waiting for the task to enter running state"); //Run the check. Ensure that the configured Helix instance is indeed the assigned participant // (i.e. no exceptions thrown). @@ -104,21 +104,4 @@ public class HelixAssignedParticipantCheckTest { helixManager.disconnect(); } } - - public static class IntegrationJobSuite extends IntegrationBasicSuite { - public static final String JOB_ID = "job_testJob_345"; - public static final String TASK_STATE_FILE = "/tmp/" + IntegrationJobSuite.class.getSimpleName() + "/taskState/_RUNNING"; - - - @Override - protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) { - Config newConfig = ConfigFactory.parseMap(ImmutableMap.of( - ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource", - ConfigurationKeys.JOB_ID_KEY, JOB_ID, - GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE, - GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L, SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE)) - .withFallback(rawJobConfig); - return ImmutableMap.of(JOB_NAME, newConfig); - } - } } \ No newline at end of file diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java index e6c4587..4f045bb 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java @@ -76,6 +76,8 @@ public class IntegrationBasicSuite { public static final String WORKER_INSTANCE_0 = "WorkerInstance_0"; public static final String TEST_INSTANCE_NAME_KEY = "worker.instance.name"; + protected final Config jobConfigOverrides; + // manager and workers protected Config managerConfig; protected Collection<Config> taskDriverConfigs = Lists.newArrayList(); @@ -93,6 +95,11 @@ public class IntegrationBasicSuite { protected TestingServer testingZKServer; public IntegrationBasicSuite() { + this(ConfigFactory.empty()); + } + + public IntegrationBasicSuite(Config jobConfigOverrides) { + this.jobConfigOverrides = jobConfigOverrides; try { initWorkDir(); initJobOutputDir(); @@ -163,7 +170,7 @@ public class IntegrationBasicSuite { } protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) { - return ImmutableMap.of(JOB_NAME, rawJobConfig); + return ImmutableMap.of(JOB_NAME, this.jobConfigOverrides.withFallback(rawJobConfig)); } private void writeJobConf(String jobName, Config jobConfig) throws IOException { diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java index d6c305e..b16febb 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobCancelSuite.java @@ -17,32 +17,17 @@ package org.apache.gobblin.cluster.suite; -import java.util.Map; - import org.junit.Assert; -import com.google.common.collect.ImmutableMap; import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; -import org.apache.gobblin.cluster.SleepingTask; -import org.apache.gobblin.configuration.ConfigurationKeys; public class IntegrationJobCancelSuite extends IntegrationBasicSuite { public static final String JOB_ID = "job_HelloWorldTestJob_1234"; public static final String TASK_STATE_FILE = "/tmp/IntegrationJobCancelSuite/taskState/_RUNNING"; - @Override - protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) { - Config newConfig = ConfigFactory.parseMap(ImmutableMap.of( - ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource", - ConfigurationKeys.JOB_ID_KEY, JOB_ID, - GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE, - GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 10L, SleepingTask.TASK_STATE_FILE_KEY, TASK_STATE_FILE)) - .withFallback(rawJobConfig); - return ImmutableMap.of(JOB_NAME, newConfig); + public IntegrationJobCancelSuite(Config jobConfigOverrides) { + super(jobConfigOverrides); } @Override diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java index c84c1e7..412603f 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobRestartViaSpecSuite.java @@ -27,7 +27,6 @@ import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import com.google.common.collect.ImmutableMap; import com.google.common.io.Files; import com.google.common.io.Resources; import com.typesafe.config.Config; @@ -36,10 +35,10 @@ import com.typesafe.config.ConfigParseOptions; import com.typesafe.config.ConfigSyntax; import com.typesafe.config.ConfigValueFactory; +import org.apache.gobblin.cluster.ClusterIntegrationTestUtils; import org.apache.gobblin.cluster.FsJobConfigurationManager; import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; import org.apache.gobblin.cluster.SleepingTask; -import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.runtime.api.FsSpecConsumer; import org.apache.gobblin.runtime.api.FsSpecProducer; import org.apache.gobblin.runtime.api.JobSpec; @@ -53,8 +52,8 @@ public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite private final SpecProducer _specProducer; - public IntegrationJobRestartViaSpecSuite() throws IOException { - super(); + public IntegrationJobRestartViaSpecSuite(Config jobConfigOverrides) throws IOException { + super(jobConfigOverrides); FileSystem fs = FileSystem.getLocal(new Configuration()); this._specProducer = new FsSpecProducer(fs, ConfigFactory.empty().withValue(FsSpecConsumer.SPEC_PATH_KEY, ConfigValueFactory.fromAnyRef(FS_SPEC_CONSUMER_DIR))); } @@ -66,12 +65,7 @@ public class IntegrationJobRestartViaSpecSuite extends IntegrationJobCancelSuite ConfigFactory.parseReader(reader, ConfigParseOptions.defaults().setSyntax(ConfigSyntax.CONF)); rawJobConfig = rawJobConfig.withFallback(getClusterConfig()); - Config newConfig = ConfigFactory.parseMap(ImmutableMap - .of(ConfigurationKeys.SOURCE_CLASS_KEY, "org.apache.gobblin.cluster.SleepingCustomTaskSource", - ConfigurationKeys.JOB_ID_KEY, JOB_ID, - GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, Boolean.TRUE, - GobblinClusterConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, 100L, - ConfigurationKeys.JOB_NAME_KEY, JOB_NAME)); + Config newConfig = ClusterIntegrationTestUtils.buildSleepingJob(JOB_ID, TASK_STATE_FILE, 100L); newConfig = newConfig.withValue(SleepingTask.TASK_STATE_FILE_KEY, ConfigValueFactory.fromAnyRef(TASK_STATE_FILE)); newConfig = newConfig.withFallback(rawJobConfig);