Repository: incubator-gobblin Updated Branches: refs/heads/master 11a1c46ab -> d10bae881
[GOBBLIN-649] Add task driver cluster Add task driver cluster Fix comments and add more metrics Closes #2518 from kyuamazon/td3 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/d10bae88 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/d10bae88 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/d10bae88 Branch: refs/heads/master Commit: d10bae881aead4ba9c91b06048434aefd7583285 Parents: 11a1c46 Author: Kuai Yu <[email protected]> Authored: Mon Dec 10 17:28:41 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Mon Dec 10 17:28:41 2018 -0800 ---------------------------------------------------------------------- .../GobblinClusterConfigurationKeys.java | 10 +- .../gobblin/cluster/GobblinClusterManager.java | 10 +- ...blinHelixDistributeJobExecutionLauncher.java | 28 +++-- .../cluster/GobblinHelixJobScheduler.java | 27 +++-- .../gobblin/cluster/GobblinHelixJobTask.java | 18 ++-- .../cluster/GobblinHelixMultiManager.java | 62 ++++++++++- .../GobblinHelixPlanningJobLauncherMetrics.java | 76 ++++++++++++++ .../gobblin/cluster/GobblinHelixTask.java | 36 ++++--- .../cluster/GobblinHelixTaskFactory.java | 26 +++-- .../gobblin/cluster/GobblinTaskRunner.java | 81 +++++++++++---- .../cluster/HelixRetriggeringJobCallable.java | 40 +++++-- .../gobblin/cluster/TaskRunnerSuiteBase.java | 12 ++- .../cluster/TaskRunnerSuiteThreadModel.java | 11 +- .../gobblin/cluster/ClusterIntegrationTest.java | 11 +- .../gobblin/cluster/GobblinHelixTaskTest.java | 20 +++- .../cluster/suite/IntegrationBasicSuite.java | 70 +++++++++++-- ...egrationDedicatedTaskDriverClusterSuite.java | 103 +++++++++++++++++++ .../cluster/suite/IntegrationJobTagSuite.java | 21 +--- .../src/test/resources/BasicTaskDriver.conf | 19 ++++ 19 files changed, 561 insertions(+), 120 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index 873793f..dba2a42 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -51,12 +51,18 @@ public class GobblinClusterConfigurationKeys { public static final boolean DEFAULT_DISTRIBUTED_JOB_LAUNCHER_ENABLED = false; public static final String DISTRIBUTED_JOB_LAUNCHER_BUILDER = GOBBLIN_CLUSTER_PREFIX + "distributedJobLauncherBuilder"; - // Helix configuration properties. + public static final String DEDICATED_JOB_CLUSTER_CONTROLLER_ENABLED = GOBBLIN_CLUSTER_PREFIX + "dedicatedJobClusterController.enabled"; public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + "helix.cluster.name"; + public static final String MANAGER_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + "manager.cluster.name"; public static final String DEDICATED_MANAGER_CLUSTER_ENABLED = GOBBLIN_CLUSTER_PREFIX + "dedicatedManagerCluster.enabled"; - public static final String DEDICATED_JOB_CLUSTER_CONTROLLER_ENABLED = GOBBLIN_CLUSTER_PREFIX + "dedicatedJobClusterController.enabled"; + + public static final String DEDICATED_TASK_DRIVER_CLUSTER_CONTROLLER_ENABLED = GOBBLIN_CLUSTER_PREFIX + "dedicatedTaskDriverClusterController.enabled"; + public static final String TASK_DRIVER_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + "taskDriver.cluster.name"; + public static final String DEDICATED_TASK_DRIVER_CLUSTER_ENABLED = GOBBLIN_CLUSTER_PREFIX + "dedicatedTaskDriverCluster.enabled"; + public static final String TASK_DRIVER_ENABLED = GOBBLIN_CLUSTER_PREFIX + "taskDriver.enabled"; + public static final String ZK_CONNECTION_STRING_KEY = GOBBLIN_CLUSTER_PREFIX + "zk.connection.string"; public static final String WORK_UNIT_FILE_PATH = GOBBLIN_CLUSTER_PREFIX + "work.unit.file.path"; public static final String HELIX_INSTANCE_NAME_OPTION_NAME = "helix_instance_name"; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java index 437da37..1722cc3 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java @@ -315,8 +315,14 @@ public class GobblinClusterManager implements ApplicationLauncher, StandardMetri private GobblinHelixJobScheduler buildGobblinHelixJobScheduler(Config config, Path appWorkDir, List<? extends Tag<?>> metadataTags, SchedulerService schedulerService) throws Exception { Properties properties = ConfigUtils.configToProperties(config); - return new GobblinHelixJobScheduler(properties, this.multiManager.getJobClusterHelixManager(), this.eventBus, appWorkDir, metadataTags, - schedulerService, this.jobCatalog); + return new GobblinHelixJobScheduler(properties, + this.multiManager.getJobClusterHelixManager(), + this.multiManager.getTaskDriverHelixManager(), + this.eventBus, + appWorkDir, + metadataTags, + schedulerService, + this.jobCatalog); } /** http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java index 609639e..1d592c4 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java @@ -82,11 +82,12 @@ import org.apache.gobblin.util.PropertiesUtils; @Slf4j class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher, Closeable { - protected HelixManager helixManager; + protected HelixManager planningJobHelixManager; protected TaskDriver helixTaskDriver; protected Properties sysProps; protected Properties jobPlanningProps; protected HelixJobsMapping jobsMapping; + protected GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics; protected static final String JOB_PROPS_PREFIX = "gobblin.jobProps."; @@ -104,8 +105,12 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher private DistributeJobMonitor jobMonitor; public GobblinHelixDistributeJobExecutionLauncher(Builder builder) throws Exception { - this.helixManager = builder.manager; - this.helixTaskDriver = new TaskDriver(this.helixManager); + if (builder.taskDriverHelixManager.isPresent()) { + this.planningJobHelixManager = builder.taskDriverHelixManager.get(); + } else { + this.planningJobHelixManager = builder.jobHelixManager; + } + this.helixTaskDriver = new TaskDriver(this.planningJobHelixManager); this.sysProps = builder.sysProps; this.jobPlanningProps = builder.jobPlanningProps; this.jobSubmitted = false; @@ -147,8 +152,10 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher public static class Builder { Properties sysProps; Properties jobPlanningProps; - HelixManager manager; + HelixManager jobHelixManager; + Optional<HelixManager> taskDriverHelixManager; Path appWorkDir; + GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics; public GobblinHelixDistributeJobExecutionLauncher build() throws Exception { return new GobblinHelixDistributeJobExecutionLauncher(this); } @@ -217,12 +224,12 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher * @param jobConfigBuilder A job config builder which contains a single task. */ private void submitJobToHelix(String jobName, String jobId, JobConfig.Builder jobConfigBuilder) throws Exception { - TaskDriver taskDriver = new TaskDriver(this.helixManager); + TaskDriver taskDriver = new TaskDriver(this.planningJobHelixManager); HelixUtils.submitJobToWorkFlow(jobConfigBuilder, jobName, jobId, taskDriver, - this.helixManager, + this.planningJobHelixManager, this.workFlowExpiryTimeSeconds); this.jobSubmitted = true; } @@ -243,7 +250,10 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher JobConfig.Builder builder = createJobBuilder(this.jobPlanningProps); try { submitJobToHelix(planningId, planningId, builder); - return waitForJobCompletion(planningId, planningId); + long startTime = System.currentTimeMillis(); + DistributeJobResult rst = waitForJobCompletion(planningId, planningId); + GobblinHelixDistributeJobExecutionLauncher.this.planningJobLauncherMetrics.updateTimeForHelixWait(startTime); + return rst; } catch (Exception e) { log.error(planningId + " is not able to submit."); return new DistributeJobResult(false); @@ -259,14 +269,14 @@ class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher try { HelixUtils.waitJobCompletion( - GobblinHelixDistributeJobExecutionLauncher.this.helixManager, + GobblinHelixDistributeJobExecutionLauncher.this.planningJobHelixManager, workFlowName, jobName, timeoutEnabled ? Optional.of(timeoutInSeconds) : Optional.empty()); return getResultFromUserContent(); } catch (TimeoutException te) { HelixUtils.handleJobTimeout(workFlowName, jobName, - helixManager, this, null); + planningJobHelixManager, this, null); return new DistributeJobResult(false); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java index 2e488c5..134d382 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java @@ -21,6 +21,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -77,7 +78,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe private static final Logger LOGGER = LoggerFactory.getLogger(GobblinHelixJobScheduler.class); private final Properties properties; - private final HelixManager helixManager; + private final HelixManager jobHelixManager; + private final Optional<HelixManager> taskDriverHelixManager; private final EventBus eventBus; private final Path appWorkDir; private final List<? extends Tag<?>> metadataTags; @@ -87,11 +89,13 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe final GobblinHelixJobSchedulerMetrics jobSchedulerMetrics; final GobblinHelixJobLauncherMetrics launcherMetrics; + final GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics; private boolean startServicesCompleted; public GobblinHelixJobScheduler(Properties properties, - HelixManager helixManager, + HelixManager jobHelixManager, + Optional<HelixManager> taskDriverHelixManager, EventBus eventBus, Path appWorkDir, List<? extends Tag<?>> metadataTags, SchedulerService schedulerService, @@ -99,7 +103,8 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe super(properties, schedulerService); this.properties = properties; - this.helixManager = helixManager; + this.jobHelixManager = jobHelixManager; + this.taskDriverHelixManager = taskDriverHelixManager; this.eventBus = eventBus; this.jobRunningMap = new ConcurrentHashMap<>(); this.appWorkDir = appWorkDir; @@ -119,12 +124,16 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe this.metricContext, metricsWindowSizeInMin); + this.planningJobLauncherMetrics = new GobblinHelixPlanningJobLauncherMetrics("planningLauncherInScheduler", + this.metricContext, + metricsWindowSizeInMin); + this.startServicesCompleted = false; } @Override public Collection<StandardMetrics> getStandardMetricsCollection() { - return ImmutableList.of(this.launcherMetrics, this.jobSchedulerMetrics); + return ImmutableList.of(this.launcherMetrics, this.jobSchedulerMetrics, this.planningJobLauncherMetrics); } @Override @@ -162,8 +171,10 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe this.properties, jobProps, jobListener, + this.planningJobLauncherMetrics, this.appWorkDir, - this.helixManager).call(); + this.jobHelixManager, + this.taskDriverHelixManager).call(); } @Override @@ -173,7 +184,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe combinedProps.putAll(properties); combinedProps.putAll(jobProps); - return new GobblinHelixJobLauncher(combinedProps, this.helixManager, this.appWorkDir, this.metadataTags, this.jobRunningMap); + return new GobblinHelixJobLauncher(combinedProps, this.jobHelixManager, this.appWorkDir, this.metadataTags, this.jobRunningMap); } public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener) { @@ -181,8 +192,10 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe this.properties, jobProps, jobListener, + this.planningJobLauncherMetrics, this.appWorkDir, - this.helixManager); + this.jobHelixManager, + this.taskDriverHelixManager); final Future<?> future = this.jobExecutor.submit(retriggeringJob); return new Future() { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java index 0266413..9447b94 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java @@ -60,8 +60,10 @@ class GobblinHelixJobTask implements Task { private final Config sysConfig; private final Properties jobPlusSysConfig; private final HelixJobsMapping jobsMapping; + private final String applicationName; + private final String instanceName; private final String planningJobId; - private final HelixManager helixManager; + private final HelixManager jobHelixManager; private final Path appWorkDir; private final List<? extends Tag<?>> metadataTags; private GobblinHelixJobLauncher launcher; @@ -73,10 +75,12 @@ class GobblinHelixJobTask implements Task { TaskRunnerSuiteBase.Builder builder, GobblinHelixJobLauncherMetrics launcherMetrics, GobblinHelixJobTaskMetrics jobTaskMetrics) { + this.applicationName = builder.getApplicationName(); + this.instanceName = builder.getInstanceName(); this.jobTaskMetrics = jobTaskMetrics; this.taskConfig = context.getTaskConfig(); this.sysConfig = builder.getConfig(); - this.helixManager = builder.getHelixManager(); + this.jobHelixManager = builder.getJobHelixManager(); this.jobPlusSysConfig = ConfigUtils.configToProperties(sysConfig); this.jobLauncherListener = new GobblinHelixJobLauncherListener(launcherMetrics); @@ -124,7 +128,7 @@ class GobblinHelixJobTask implements Task { private GobblinHelixJobLauncher createJobLauncher() throws Exception { return new GobblinHelixJobLauncher(jobPlusSysConfig, - this.helixManager, + this.jobHelixManager, this.appWorkDir, this.metadataTags, new ConcurrentHashMap<>()); @@ -135,7 +139,7 @@ class GobblinHelixJobTask implements Task { */ @Override public TaskResult run() { - log.info("Running planning job {}", this.planningJobId); + log.info("Running planning job {} [{} {}]", this.planningJobId, this.applicationName, this.instanceName); this.jobTaskMetrics.updateTimeBetweenJobSubmissionAndExecution(this.jobPlusSysConfig); try (Closer closer = Closer.create()) { @@ -156,12 +160,12 @@ class GobblinHelixJobTask implements Task { Optional<String> actualJobIdFromStateStore = this.jobsMapping.getActualJobId(jobName); if (actualJobIdFromStateStore.isPresent()) { String previousActualJobId = actualJobIdFromStateStore.get(); - if (HelixUtils.isJobFinished(previousActualJobId, previousActualJobId, this.helixManager)) { + if (HelixUtils.isJobFinished(previousActualJobId, previousActualJobId, this.jobHelixManager)) { log.info("Previous actual job {} [plan: {}] finished, will launch a new job.", previousActualJobId, this.planningJobId); } else { log.info("Previous actual job {} [plan: {}] not finished, kill it now.", previousActualJobId, this.planningJobId); try { - HelixUtils.deleteWorkflow(previousActualJobId, this.helixManager, timeOut); + HelixUtils.deleteWorkflow(previousActualJobId, this.jobHelixManager, timeOut); } catch (HelixException e) { log.error("Helix cannot delete previous actual job id {} within 5 min.", previousActualJobId); return new TaskResult(TaskResult.Status.FAILED, ExceptionUtils.getFullStackTrace(e)); @@ -184,9 +188,11 @@ class GobblinHelixJobTask implements Task { } } } catch (Exception e) { + log.info("Failing planning job {}", this.planningJobId); return new TaskResult(TaskResult.Status.FAILED, "Exception occurred for job " + planningJobId + ":" + ExceptionUtils .getFullStackTrace(e)); } + log.info("Completing planning job {}", this.planningJobId); return new TaskResult(TaskResult.Status.COMPLETED, ""); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java index a7c5e2e..81ed7ce 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixMultiManager.java @@ -88,6 +88,13 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge { private HelixManager jobClusterHelixManager = null; /** + * Helix manager to handle planning job distribution. + * Corresponds to cluster with key name {@link GobblinClusterConfigurationKeys#HELIX_CLUSTER_NAME_KEY}. + */ + @Getter + private Optional<HelixManager> taskDriverHelixManager = Optional.empty(); + + /** * Helix controller for job distribution. Effective only iff below two conditions are established: * 1. In {@link GobblinHelixMultiManager#dedicatedManagerCluster} mode. * 2. {@link GobblinHelixMultiManager#dedicatedJobClusterController} is turned on. @@ -96,16 +103,31 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge { private Optional<HelixManager> jobClusterController = Optional.empty(); /** + * Helix controller for planning job distribution. Effective only iff below two conditions are established: + * 1. In {@link GobblinHelixMultiManager#dedicatedManagerCluster} mode. + * 2. {@link GobblinHelixMultiManager#dedicatedTaskDriverCluster} is turned on. + * Typically used for unit test and local deployment. + */ + private Optional<HelixManager> taskDriverClusterController = Optional.empty(); + + /** * Separate manager cluster and job distribution cluster iff this flag is turned on. Otherwise {@link GobblinHelixMultiManager#jobClusterHelixManager} * is same as {@link GobblinHelixMultiManager#managerClusterHelixManager}. */ private boolean dedicatedManagerCluster = false; + private boolean dedicatedTaskDriverCluster = false; + /** * Create a dedicated controller for job distribution. */ private boolean dedicatedJobClusterController = true; + /** + * Create a dedicated controller for planning job distribution. + */ + private boolean dedicatedTaskDriverClusterController = true; + @Getter boolean isLeader = false; boolean isStandaloneMode = false; @@ -131,6 +153,8 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge { this.metrics = new HelixManagerMetrics(this.metricContext, this.config); this.dedicatedManagerCluster = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED,false); + this.dedicatedTaskDriverCluster = ConfigUtils.getBoolean(config, + GobblinClusterConfigurationKeys.DEDICATED_TASK_DRIVER_CLUSTER_ENABLED, false); this.userDefinedMessageHandlerFactory = messageHandlerFactoryFunction.apply(null); initialize(); } @@ -180,6 +204,26 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge { .buildHelixManager(this.config, zkConnectionString, GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY, InstanceType.CONTROLLER)); } + + // This will creat a dedicated controller for planning job distribution + if (this.dedicatedTaskDriverCluster) { + // This will create a Helix administrator to dispatch jobs to ZooKeeper + this.taskDriverHelixManager = Optional.of(buildHelixManager(this.config, + zkConnectionString, + GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY, + InstanceType.ADMINISTRATOR)); + + this.dedicatedTaskDriverClusterController = ConfigUtils.getBoolean( + this.config, + GobblinClusterConfigurationKeys.DEDICATED_TASK_DRIVER_CLUSTER_CONTROLLER_ENABLED, + true); + + if (this.dedicatedTaskDriverClusterController) { + this.taskDriverClusterController = Optional.of(GobblinHelixMultiManager + .buildHelixManager(this.config, zkConnectionString, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY, + InstanceType.CONTROLLER)); + } + } } else { log.info("We will use same cluster to manage GobblinClusterManager and job distribution."); // This will create and register a Helix controller in ZooKeeper @@ -200,7 +244,15 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge { if (jobClusterController.isPresent()) { this.jobClusterController.get().connect(); } + if (this.dedicatedTaskDriverCluster) { + if (taskDriverClusterController.isPresent()) { + this.taskDriverClusterController.get().connect(); + } + } this.jobClusterHelixManager.connect(); + if (this.taskDriverHelixManager.isPresent()) { + this.taskDriverHelixManager.get().connect(); + } } this.jobClusterHelixManager.addLiveInstanceChangeListener(new GobblinLiveInstanceChangeListener()); @@ -239,10 +291,17 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge { this.jobClusterHelixManager.disconnect(); } + if (taskDriverHelixManager.isPresent()) { + this.taskDriverHelixManager.get().disconnect(); + } + if (jobClusterController.isPresent() && jobClusterController.get().isConnected()) { this.jobClusterController.get().disconnect(); } + if (taskDriverClusterController.isPresent() && taskDriverClusterController.get().isConnected()) { + this.taskDriverClusterController.get().disconnect(); + } } } @@ -269,7 +328,8 @@ public class GobblinHelixMultiManager implements StandardMetricsBridge { void handleLeadershipChange(NotificationContext changeContext) { this.metrics.clusterLeadershipChange.update(1); if (this.managerClusterHelixManager.isLeader()) { - // can get multiple notifications on a leadership change, so only start the application launcher the first time + // can get multiple notifications on a leadership change, + // so only start the application launcher the first time // the notification is received log.info("Leader notification for {} isLeader {} HM.isLeader {}", managerClusterHelixManager.getInstanceName(), http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java new file mode 100644 index 0000000..a2e0991 --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixPlanningJobLauncherMetrics.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.util.concurrent.TimeUnit; + +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.instrumented.StandardMetricsBridge; +import org.apache.gobblin.metrics.ContextAwareTimer; +import org.apache.gobblin.metrics.MetricContext; + + +public class GobblinHelixPlanningJobLauncherMetrics extends StandardMetricsBridge.StandardMetrics { + private final String metricsName; + + public static final String TIMER_FOR_COMPLETED_PLANNING_JOBS = "timeForCompletedPlanningJobs"; + public static final String TIMER_FOR_FAILED_PLANNING_JOBS = "timeForFailedPlanningJobs"; + public static final String TIMER_FOR_HELIX_WAIT = "timeForHelixWait"; + + final ContextAwareTimer timeForCompletedPlanningJobs; + final ContextAwareTimer timeForFailedPlanningJobs; + final ContextAwareTimer timeForHelixWait; + + public GobblinHelixPlanningJobLauncherMetrics(String metricsName, + final MetricContext metricContext, + int windowSizeInMin) { + + this.metricsName = metricsName; + + this.timeForCompletedPlanningJobs = metricContext.contextAwareTimer(TIMER_FOR_COMPLETED_PLANNING_JOBS, windowSizeInMin, TimeUnit.MINUTES); + this.timeForFailedPlanningJobs = metricContext.contextAwareTimer(TIMER_FOR_FAILED_PLANNING_JOBS, windowSizeInMin, TimeUnit.MINUTES); + this.timeForHelixWait = metricContext.contextAwareTimer(TIMER_FOR_HELIX_WAIT, windowSizeInMin, TimeUnit.MINUTES); + + this.contextAwareMetrics.add(timeForCompletedPlanningJobs); + this.contextAwareMetrics.add(timeForFailedPlanningJobs); + this.contextAwareMetrics.add(timeForHelixWait); + } + + public void updateTimeForCompletedPlanningJobs(long startTime) { + Instrumented.updateTimer( + com.google.common.base.Optional.of(this.timeForCompletedPlanningJobs), + System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); + } + + public void updateTimeForFailedPlanningJobs(long startTime) { + Instrumented.updateTimer( + com.google.common.base.Optional.of(this.timeForFailedPlanningJobs), + System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); + } + + public void updateTimeForHelixWait(long startTime) { + Instrumented.updateTimer( + com.google.common.base.Optional.of(this.timeForHelixWait), + System.currentTimeMillis() - startTime, TimeUnit.MILLISECONDS); + } + + @Override + public String getName() { + return this.metricsName; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java index a2516eb..bd1d3b2 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTask.java @@ -62,6 +62,9 @@ import org.apache.gobblin.util.Id; public class GobblinHelixTask implements Task { private final TaskConfig taskConfig; + private final String applicationName; + private final String instanceName; + private String jobName; private String jobId; private String jobKey; @@ -70,19 +73,27 @@ public class GobblinHelixTask implements Task { private SingleTask task; - public GobblinHelixTask(TaskCallbackContext taskCallbackContext, FileSystem fs, Path appWorkDir, - TaskAttemptBuilder taskAttemptBuilder, StateStores stateStores) - throws IOException { + public GobblinHelixTask(TaskRunnerSuiteBase.Builder builder, + TaskCallbackContext taskCallbackContext, + TaskAttemptBuilder taskAttemptBuilder, + StateStores stateStores) throws IOException { this.taskConfig = taskCallbackContext.getTaskConfig(); + this.applicationName = builder.getApplicationName(); + this.instanceName = builder.getInstanceName(); getInfoFromTaskConfig(); - Path jobStateFilePath = - GobblinClusterUtils.getJobStateFilePath(stateStores.haveJobStateStore(), appWorkDir, this.jobId); - - this.task = - new SingleTask(this.jobId, workUnitFilePath, jobStateFilePath, fs, taskAttemptBuilder, - stateStores); + Path jobStateFilePath = GobblinClusterUtils + .getJobStateFilePath(stateStores.haveJobStateStore(), + builder.getAppWorkPath(), + this.jobId); + + this.task = new SingleTask(this.jobId, + this.workUnitFilePath, + jobStateFilePath, + builder.getFs(), + taskAttemptBuilder, + stateStores); } private void getInfoFromTaskConfig() { @@ -97,18 +108,19 @@ public class GobblinHelixTask implements Task { @Override public TaskResult run() { - log.info("Actual task {} started.", this.taskId); + log.info("Actual task {} started. [{} {}]", this.taskId, this.applicationName, this.instanceName); try (Closer closer = Closer.create()) { closer.register(MDC.putCloseable(ConfigurationKeys.JOB_NAME_KEY, this.jobName)); closer.register(MDC.putCloseable(ConfigurationKeys.JOB_KEY_KEY, this.jobKey)); this.task.run(); - log.info("Actual task {} finished.", this.taskId); + log.info("Actual task {} completed.", this.taskId); return new TaskResult(TaskResult.Status.COMPLETED, ""); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); + log.error("Actual task {} interrupted.", this.taskId); return new TaskResult(TaskResult.Status.CANCELED, ""); } catch (Throwable t) { - log.error("GobblinHelixTask " + taskId + " failed due to " + t.getMessage(), t); + log.error("Actual task {} failed due to {}", this.taskId, t.getMessage()); return new TaskResult(TaskResult.Status.FAILED, Throwables.getStackTraceAsString(t)); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java index 2fa845c..14d22b4 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixTaskFactory.java @@ -19,7 +19,6 @@ package org.apache.gobblin.cluster; import java.io.IOException; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.helix.HelixManager; import org.apache.helix.task.Task; @@ -53,6 +52,7 @@ public class GobblinHelixTaskFactory implements TaskFactory { private final Optional<ContainerMetrics> containerMetrics; private final HelixManager helixManager; + private TaskRunnerSuiteBase.Builder builder; /** * A {@link Counter} to count the number of new {@link GobblinHelixTask}s that are created. @@ -60,15 +60,17 @@ public class GobblinHelixTaskFactory implements TaskFactory { private final Optional<Counter> newTasksCounter; private final TaskExecutor taskExecutor; private final TaskStateTracker taskStateTracker; - private final FileSystem fs; private final Path appWorkDir; private final StateStores stateStores; private final TaskAttemptBuilder taskAttemptBuilder; - public GobblinHelixTaskFactory(Optional<ContainerMetrics> containerMetrics, TaskExecutor taskExecutor, - TaskStateTracker taskStateTracker, FileSystem fs, Path appWorkDir, Config config, HelixManager helixManager) { - this.containerMetrics = containerMetrics; - this.helixManager = helixManager; + public GobblinHelixTaskFactory(TaskRunnerSuiteBase.Builder builder, + TaskExecutor taskExecutor, + TaskStateTracker taskStateTracker, + Config stateStoreConfig) { + this.builder = builder; + this.containerMetrics = builder.getContainerMetrics(); + this.helixManager = builder.getJobHelixManager(); if (this.containerMetrics.isPresent()) { this.newTasksCounter = Optional.of(this.containerMetrics.get().getCounter(GOBBLIN_CLUSTER_NEW_HELIX_TASK_COUNTER)); } else { @@ -76,10 +78,12 @@ public class GobblinHelixTaskFactory implements TaskFactory { } this.taskExecutor = taskExecutor; this.taskStateTracker = taskStateTracker; - this.fs = fs; - this.appWorkDir = appWorkDir; - this.stateStores = new StateStores(config, appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME, - appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME, appWorkDir, + + this.appWorkDir = builder.getAppWorkPath(); + this.stateStores = new StateStores(stateStoreConfig, + appWorkDir, GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME, + appWorkDir, GobblinClusterConfigurationKeys.INPUT_WORK_UNIT_DIR_NAME, + appWorkDir, GobblinClusterConfigurationKeys.JOB_STATE_DIR_NAME); this.taskAttemptBuilder = createTaskAttemptBuilder(); } @@ -98,7 +102,7 @@ public class GobblinHelixTaskFactory implements TaskFactory { if (this.newTasksCounter.isPresent()) { this.newTasksCounter.get().inc(); } - return new GobblinHelixTask(context, this.fs, this.appWorkDir, this.taskAttemptBuilder, this.stateStores); + return new GobblinHelixTask(builder, context, this.taskAttemptBuilder, this.stateStores); } catch (IOException ioe) { LOGGER.error("Failed to create a new GobblinHelixTask", ioe); throw Throwables.propagate(ioe); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index 63e7fd2..9f353ed 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -121,7 +121,9 @@ public class GobblinTaskRunner implements StandardMetricsBridge { private final String clusterName; - private HelixManager helixManager; + private HelixManager jobHelixManager; + + private Optional<HelixManager> taskDriverHelixManager = Optional.absent(); private final ServiceManager serviceManager; @@ -144,6 +146,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge { private final String applicationName; private final String applicationId; private final Path appWorkPath; + private boolean isTaskDriver; + private boolean dedicatedTaskDriverCluster; private final Collection<StandardMetricsBridge.StandardMetrics> metricsCollection; @@ -153,12 +157,13 @@ public class GobblinTaskRunner implements StandardMetricsBridge { String taskRunnerId, Config config, Optional<Path> appWorkDirOptional) throws Exception { - + this.isTaskDriver = ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.TASK_DRIVER_ENABLED,false); this.helixInstanceName = helixInstanceName; this.taskRunnerId = taskRunnerId; this.applicationName = applicationName; this.applicationId = applicationId; - + this.dedicatedTaskDriverCluster = ConfigUtils.getBoolean(config, + GobblinClusterConfigurationKeys.DEDICATED_TASK_DRIVER_CLUSTER_ENABLED, false); Configuration conf = HadoopUtils.newConfiguration(); this.fs = buildFileSystem(config, conf); this.appWorkPath = initAppWorkDir(config, appWorkDirOptional); @@ -180,9 +185,10 @@ public class GobblinTaskRunner implements StandardMetricsBridge { TaskRunnerSuiteBase suite = builder.setAppWorkPath(this.appWorkPath) .setContainerMetrics(this.containerMetrics) .setFileSystem(this.fs) - .setHelixManager(this.helixManager) + .setJobHelixManager(this.jobHelixManager) .setApplicationId(applicationId) .setApplicationName(applicationName) + .setInstanceName(helixInstanceName) .build(); this.taskStateModelFactory = createTaskStateModelFactory(suite.getTaskFactoryMap()); @@ -196,7 +202,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge { this.serviceManager = new ServiceManager(services); } - logger.debug("GobblinTaskRunner: applicationName {}, helixInstanceName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}", + logger.info("GobblinTaskRunner({}): applicationName {}, helixInstanceName {}, applicationId {}, taskRunnerId {}, config {}, appWorkDir {}", + this.isTaskDriver?"taskDriver" : "worker", applicationName, helixInstanceName, applicationId, @@ -215,14 +222,37 @@ public class GobblinTaskRunner implements StandardMetricsBridge { this.config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); logger.info("Using ZooKeeper connection string: " + zkConnectionString); - this.helixManager = HelixManagerFactory.getZKHelixManager( - this.clusterName, this.helixInstanceName, InstanceType.PARTICIPANT, zkConnectionString); + if (this.isTaskDriver && this.dedicatedTaskDriverCluster) { + // This will create a Helix manager to receive the planning job + this.taskDriverHelixManager = Optional.of(HelixManagerFactory.getZKHelixManager( + ConfigUtils.getString(this.config, GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY, ""), + this.helixInstanceName, + InstanceType.PARTICIPANT, + zkConnectionString)); + this.jobHelixManager = HelixManagerFactory.getZKHelixManager( + this.clusterName, + this.helixInstanceName, + InstanceType.ADMINISTRATOR, + zkConnectionString); + } else { + this.jobHelixManager = HelixManagerFactory.getZKHelixManager( + this.clusterName, + this.helixInstanceName, + InstanceType.PARTICIPANT, + zkConnectionString); + } + } + + private HelixManager getReceiverManager() { + return taskDriverHelixManager.isPresent()?taskDriverHelixManager.get() + : this.jobHelixManager; } private TaskStateModelFactory createTaskStateModelFactory(Map<String, TaskFactory> taskFactoryMap) { + HelixManager receiverManager = getReceiverManager(); TaskStateModelFactory taskStateModelFactory = - new TaskStateModelFactory(this.helixManager, taskFactoryMap); - this.helixManager.getStateMachineEngine() + new TaskStateModelFactory(receiverManager, taskFactoryMap); + receiverManager.getStateMachineEngine() .registerStateModelFactory("Task", taskStateModelFactory); return taskStateModelFactory; } @@ -316,13 +346,16 @@ public class GobblinTaskRunner implements StandardMetricsBridge { @VisibleForTesting void connectHelixManager() { try { - this.helixManager.connect(); - this.helixManager.getMessagingService() + this.jobHelixManager.connect(); + this.jobHelixManager.getMessagingService() .registerMessageHandlerFactory(GobblinHelixConstants.SHUTDOWN_MESSAGE_TYPE, new ParticipantShutdownMessageHandlerFactory()); - this.helixManager.getMessagingService() + this.jobHelixManager.getMessagingService() .registerMessageHandlerFactory(Message.MessageType.USER_DEFINE_MSG.toString(), getUserDefinedMessageHandlerFactory()); + if (this.taskDriverHelixManager.isPresent()) { + this.taskDriverHelixManager.get().connect(); + } } catch (Exception e) { logger.error("HelixManager failed to connect", e); throw Throwables.propagate(e); @@ -335,12 +368,16 @@ public class GobblinTaskRunner implements StandardMetricsBridge { * the job with EXAMPLE_INSTANCE_TAG will remain in the ZK until an instance with EXAMPLE_INSTANCE_TAG was found. */ private void addInstanceTags() { - if (this.helixManager.isConnected()) { - List<String> tags = ConfigUtils.getStringList(this.config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY); - logger.info("Adding tags binding " + tags); - tags.forEach(tag -> helixManager.getClusterManagmentTool().addInstanceTag(this.clusterName, this.helixInstanceName, tag)); - logger.info("Actual tags binding " + helixManager.getClusterManagmentTool() - .getInstanceConfig(this.clusterName, this.helixInstanceName).getTags()); + List<String> tags = ConfigUtils.getStringList(this.config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY); + HelixManager receiverManager = getReceiverManager(); + if (receiverManager.isConnected()) { + if (!tags.isEmpty()) { + logger.info("Adding tags binding " + tags); + tags.forEach(tag -> receiverManager.getClusterManagmentTool() + .addInstanceTag(this.clusterName, this.helixInstanceName, tag)); + logger.info("Actual tags binding " + receiverManager.getClusterManagmentTool() + .getInstanceConfig(this.clusterName, this.helixInstanceName).getTags()); + } } } @@ -356,8 +393,12 @@ public class GobblinTaskRunner implements StandardMetricsBridge { @VisibleForTesting void disconnectHelixManager() { - if (this.helixManager.isConnected()) { - this.helixManager.disconnect(); + if (this.jobHelixManager.isConnected()) { + this.jobHelixManager.disconnect(); + } + + if (this.taskDriverHelixManager.isPresent()) { + this.taskDriverHelixManager.get().disconnect(); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java index f9c70f7..4cda2ed 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java @@ -20,6 +20,7 @@ package org.apache.gobblin.cluster; import java.util.Optional; import java.util.Properties; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.Path; import org.apache.helix.HelixManager; @@ -30,6 +31,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.runtime.JobException; import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.api.JobExecutionMonitor; @@ -84,8 +86,10 @@ class HelixRetriggeringJobCallable implements Callable { private final Properties sysProps; private final Properties jobProps; private final JobListener jobListener; + private final GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics; private final Path appWorkDir; - private final HelixManager helixManager; + private final HelixManager jobHelixManager; + private final Optional<HelixManager> taskDriverHelixManager; protected HelixJobsMapping jobsMapping; private GobblinHelixJobLauncher currentJobLauncher = null; private JobExecutionMonitor currentJobMonitor = null; @@ -96,14 +100,18 @@ class HelixRetriggeringJobCallable implements Callable { Properties sysProps, Properties jobProps, JobListener jobListener, + GobblinHelixPlanningJobLauncherMetrics planningJobLauncherMetrics, Path appWorkDir, - HelixManager helixManager) { + HelixManager jobHelixManager, + Optional<HelixManager> taskDriverHelixManager) { this.jobScheduler = jobScheduler; this.sysProps = sysProps; this.jobProps = jobProps; this.jobListener = jobListener; + this.planningJobLauncherMetrics = planningJobLauncherMetrics; this.appWorkDir = appWorkDir; - this.helixManager = helixManager; + this.jobHelixManager = jobHelixManager; + this.taskDriverHelixManager = taskDriverHelixManager; this.isDistributeJobEnabled = isDistributeJobEnabled(); this.jobsMapping = new HelixJobsMapping(ConfigUtils.propertiesToConfig(sysProps), PathUtils.getRootPath(appWorkDir).toUri(), @@ -169,6 +177,7 @@ class HelixRetriggeringJobCallable implements Callable { * @see {@link GobblinHelixJobTask#run()} for the task driver logic. */ private void runJobExecutionLauncher() throws JobException { + long startTime = 0; try { String builderStr = jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName()); @@ -179,7 +188,10 @@ class HelixRetriggeringJobCallable implements Callable { if (planningJobIdFromStore.isPresent()) { String previousPlanningJobId = planningJobIdFromStore.get(); - if (HelixUtils.isJobFinished(previousPlanningJobId, previousPlanningJobId, this.helixManager)) { + HelixManager planningJobManager = this.taskDriverHelixManager.isPresent()? + this.taskDriverHelixManager.get() : this.jobHelixManager; + + if (HelixUtils.isJobFinished(previousPlanningJobId, previousPlanningJobId, planningJobManager)) { log.info("Previous planning job {} has reached to the final state. Start a new one.", previousPlanningJobId); } else { log.info("Previous planning job {} has not finished yet. Skip it.", previousPlanningJobId); @@ -205,22 +217,34 @@ class HelixRetriggeringJobCallable implements Callable { builder.setSysProps(this.sysProps); builder.setJobPlanningProps(jobPlanningProps); - builder.setManager(this.helixManager); + builder.setJobHelixManager(this.jobHelixManager); + builder.setTaskDriverHelixManager(this.taskDriverHelixManager); builder.setAppWorkDir(this.appWorkDir); + builder.setPlanningJobLauncherMetrics(this.planningJobLauncherMetrics); try (Closer closer = Closer.create()) { + log.info("Planning job {} started.", planningId); GobblinHelixDistributeJobExecutionLauncher launcher = builder.build(); closer.register(launcher); this.jobsMapping.setPlanningJobId(jobName, planningId); + startTime = System.currentTimeMillis(); this.currentJobMonitor = launcher.launchJob(null); this.currentJobMonitor.get(); this.currentJobMonitor = null; + log.info("Planning job {} finished.", planningId); + this.planningJobLauncherMetrics.updateTimeForCompletedPlanningJobs(startTime); } catch (Throwable t) { - throw new JobException("Failed to launch and run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), t); + if (startTime != 0) { + this.planningJobLauncherMetrics.updateTimeForFailedPlanningJobs(startTime); + } + throw new JobException("Failed to launch and run planning job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), t); } } catch (Exception e) { - log.error("Failed to run job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); - throw new JobException("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); + if (startTime != 0) { + this.planningJobLauncherMetrics.updateTimeForFailedPlanningJobs(startTime); + } + log.error("Failed to run planning job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); + throw new JobException("Failed to run planning job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java index 182cd07..1ae8639 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java @@ -85,19 +85,20 @@ public abstract class TaskRunnerSuiteBase { @Getter public static class Builder { private Config config; - private HelixManager helixManager; + private HelixManager jobHelixManager; private Optional<ContainerMetrics> containerMetrics; private FileSystem fs; private Path appWorkPath; private String applicationId; private String applicationName; + private String instanceName; public Builder(Config config) { this.config = config; } - public Builder setHelixManager(HelixManager manager) { - this.helixManager = manager; + public Builder setJobHelixManager(HelixManager jobHelixManager) { + this.jobHelixManager = jobHelixManager; return this; } @@ -106,6 +107,11 @@ public abstract class TaskRunnerSuiteBase { return this; } + public Builder setInstanceName(String instanceName) { + this.instanceName = instanceName; + return this; + } + public Builder setApplicationId(String applicationId) { this.applicationId = applicationId; return this; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java index 6080e1f..92d3427 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java @@ -89,12 +89,9 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase { services.add(new JMXReportingService( ImmutableMap.of("task.executor", taskExecutor.getTaskExecutorQueueMetricSet()))); - return new GobblinHelixTaskFactory(builder.getContainerMetrics(), - taskExecutor, - taskStateTracker, - builder.getFs(), - builder.getAppWorkPath(), - stateStoreJobConfig, - builder.getHelixManager()); + return new GobblinHelixTaskFactory(builder, + taskExecutor, + taskStateTracker, + stateStoreJobConfig); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java index 99f225a..a2f0d7b 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java @@ -24,6 +24,7 @@ import org.testng.annotations.Test; import org.apache.gobblin.cluster.suite.IntegrationBasicSuite; import org.apache.gobblin.cluster.suite.IntegrationDedicatedManagerClusterSuite; +import org.apache.gobblin.cluster.suite.IntegrationDedicatedTaskDriverClusterSuite; import org.apache.gobblin.cluster.suite.IntegrationJobFactorySuite; import org.apache.gobblin.cluster.suite.IntegrationJobTagSuite; import org.apache.gobblin.cluster.suite.IntegrationSeparateProcessSuite; @@ -34,7 +35,8 @@ public class ClusterIntegrationTest { private IntegrationBasicSuite suite; @Test - public void testJobShouldComplete() throws Exception { + public void testJobShouldComplete() + throws Exception { this.suite = new IntegrationBasicSuite(); runAndVerify(); } @@ -54,6 +56,13 @@ public class ClusterIntegrationTest { } @Test(enabled = false) + public void testDedicatedTaskDriverCluster() + throws Exception { + this.suite = new IntegrationDedicatedTaskDriverClusterSuite(); + runAndVerify(); + } + + @Test(enabled = false) public void testJobWithTag() throws Exception { this.suite = new IntegrationJobTagSuite(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java index a104b7d..e91c984 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinHelixTaskTest.java @@ -48,8 +48,10 @@ import org.apache.gobblin.runtime.AbstractJobLauncher; import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.TaskExecutor; import org.apache.gobblin.source.workunit.WorkUnit; +import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.Id; import org.apache.gobblin.util.SerializationUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; import org.apache.gobblin.writer.AvroDataWriterBuilder; import org.apache.gobblin.writer.Destination; import org.apache.gobblin.writer.WriterOutputFormat; @@ -140,9 +142,23 @@ public class GobblinHelixTaskTest { Mockito.when(taskCallbackContext.getTaskConfig()).thenReturn(taskConfig); Mockito.when(taskCallbackContext.getManager()).thenReturn(this.helixManager); + + TaskRunnerSuiteBase.Builder builder = new TaskRunnerSuiteBase.Builder(ConfigFactory.empty()); + builder.setInstanceName("TestInstance") + .setApplicationName("TestApplication") + .setAppWorkPath(appWorkDir) + .setContainerMetrics(Optional.absent()) + .setFileSystem(localFs) + .setJobHelixManager(this.helixManager) + .setApplicationId("TestApplication-1") + .build(); + GobblinHelixTaskFactory gobblinHelixTaskFactory = - new GobblinHelixTaskFactory(Optional.<ContainerMetrics>absent(), this.taskExecutor, this.taskStateTracker, - this.localFs, this.appWorkDir, ConfigFactory.empty(), this.helixManager); + new GobblinHelixTaskFactory(builder, + this.taskExecutor, + this.taskStateTracker, + ConfigFactory.empty()); + this.gobblinHelixTask = (GobblinHelixTask) gobblinHelixTaskFactory.createNewTask(taskCallbackContext); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java index 6f758b8..5a4a977 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java @@ -30,6 +30,7 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -39,6 +40,7 @@ import org.apache.curator.test.TestingServer; import com.google.common.base.Charsets; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.io.Resources; @@ -69,11 +71,14 @@ import org.apache.gobblin.testing.AssertWithBackoff; public class IntegrationBasicSuite { public static final String JOB_CONF_NAME = "HelloWorldJob.conf"; public static final String WORKER_INSTANCE_0 = "WorkerInstance_0"; + public static final String TEST_INSTANCE_NAME_KEY = "worker.instance.name"; // manager and workers protected Config managerConfig; + protected Collection<Config> taskDriverConfigs = Lists.newArrayList(); protected Collection<Config> workerConfigs = Lists.newArrayList(); protected Collection<GobblinTaskRunner> workers = Lists.newArrayList(); + protected Collection<GobblinTaskRunner> taskDrivers = Lists.newArrayList(); protected GobblinClusterManager manager; protected Path workPath; @@ -96,6 +101,7 @@ public class IntegrationBasicSuite { private void initConfig() { this.managerConfig = this.getManagerConfig(); + this.taskDriverConfigs = this.getTaskDriverConfigs(); this.workerConfigs = this.getWorkerConfigs(); } @@ -164,7 +170,7 @@ public class IntegrationBasicSuite { } } - private Config getClusterConfig() { + protected Config getClusterConfig() { URL url = Resources.getResource("BasicCluster.conf"); Config config = ConfigFactory.parseURL(url); @@ -185,6 +191,10 @@ public class IntegrationBasicSuite { return managerConfig.resolve(); } + protected Collection<Config> getTaskDriverConfigs() { + return new ArrayList<>(); + } + protected Collection<Config> getWorkerConfigs() { // worker config initialization URL url = Resources.getResource("BasicWorker.conf"); @@ -193,6 +203,13 @@ public class IntegrationBasicSuite { return Lists.newArrayList(workerConfig.resolve()); } + protected Config addInstanceName(Config baseConfig, String instanceName) { + Map<String, String> configMap = new HashMap<>(); + configMap.put(IntegrationBasicSuite.TEST_INSTANCE_NAME_KEY, instanceName); + Config instanceConfig = ConfigFactory.parseMap(configMap); + return instanceConfig.withFallback(baseConfig).resolve(); + } + public void waitForAndVerifyOutputFiles() throws Exception { AssertWithBackoff asserter = AssertWithBackoff.create().logger(log).timeoutMs(120_000) .maxSleepMs(100).backoffFactor(1.5); @@ -215,6 +232,7 @@ public class IntegrationBasicSuite { this.testingZKServer.start(); createHelixCluster(); startWorker(); + startTaskDriver(); startManager(); } @@ -226,19 +244,51 @@ public class IntegrationBasicSuite { this.manager.start(); } - protected void startWorker() throws Exception { - this.workers.add(new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, WORKER_INSTANCE_0, - TestHelper.TEST_APPLICATION_ID, "1", - this.workerConfigs.iterator().next(), Optional.absent())); + private void startTaskDriver() throws Exception { + for (Config taskDriverConfig: this.taskDriverConfigs) { + GobblinTaskRunner runner = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, + taskDriverConfig.getString(TEST_INSTANCE_NAME_KEY), + TestHelper.TEST_APPLICATION_ID, "1", + taskDriverConfig, Optional.absent()); + this.taskDrivers.add(runner); + + // Need to run in another thread since the start call will not return until the stop method + // is called. + Thread workerThread = new Thread(runner::start); + workerThread.start(); + } + } - // Need to run in another thread since the start call will not return until the stop method - // is called. - Thread workerThread = new Thread(this.workers.iterator().next()::start); - workerThread.start(); + private void startWorker() throws Exception { + if (workerConfigs.size() == 1) { + this.workers.add(new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, WORKER_INSTANCE_0, + TestHelper.TEST_APPLICATION_ID, "1", + this.workerConfigs.iterator().next(), Optional.absent())); + + // Need to run in another thread since the start call will not return until the stop method + // is called. + Thread workerThread = new Thread(this.workers.iterator().next()::start); + workerThread.start(); + } else { + // Each workerConfig corresponds to a worker instance + for (Config workerConfig: this.workerConfigs) { + GobblinTaskRunner runner = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, + workerConfig.getString(TEST_INSTANCE_NAME_KEY), + TestHelper.TEST_APPLICATION_ID, "1", + workerConfig, Optional.absent()); + this.workers.add(runner); + + // Need to run in another thread since the start call will not return until the stop method + // is called. + Thread workerThread = new Thread(runner::start); + workerThread.start(); + } + } } public void shutdownCluster() throws InterruptedException, IOException { - workers.forEach(runner->runner.stop()); + this.workers.forEach(runner->runner.stop()); + this.taskDrivers.forEach(runner->runner.stop()); this.manager.stop(); this.testingZKServer.close(); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java new file mode 100644 index 0000000..1a14451 --- /dev/null +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedTaskDriverClusterSuite.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster.suite; + +import java.net.URL; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.io.Resources; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.cluster.ClusterIntegrationTest; +import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; +import org.apache.gobblin.cluster.HelixUtils; + +/** + * <p> A test suite used for {@link ClusterIntegrationTest#testDedicatedTaskDriverCluster()} ()} + * + * <p> We will have two separate clusters, one for planning job, one for actual job. + * + * <p> Each planning job is submitted by manager instance and reaches to task driver + * instance via 'task driver cluster (or planning job cluster)'. + * + * <p> Each actual job is submitted by task driver instance and reaches to the worker + * instance via 'job cluster'. + */ +public class IntegrationDedicatedTaskDriverClusterSuite extends IntegrationBasicSuite { + + @Override + public void createHelixCluster() throws Exception { + super.createHelixCluster(); + String zkConnectionString = managerConfig + .getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); + String managerClusterName = managerConfig + .getString(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY); + HelixUtils.createGobblinHelixCluster(zkConnectionString, managerClusterName); + String taskDriverClusterName = taskDriverConfigs.iterator().next() + .getString(GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY); + HelixUtils.createGobblinHelixCluster(zkConnectionString, taskDriverClusterName); + } + + @Override + protected Config getManagerConfig() { + Map<String, String> configMap = new HashMap<>(); + configMap.put(GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED, "true"); + configMap.put(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY, "ManagerCluster"); + Config config = ConfigFactory.parseMap(configMap); + return config.withFallback(super.getManagerConfig()).resolve(); + } + + @Override + protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) { + Config newConfig = ConfigFactory.parseMap(ImmutableMap.of( + GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED, true)) + .withFallback(rawJobConfig); + return ImmutableMap.of("HelloWorldJob", newConfig); + } + + @Override + protected Collection<Config> getTaskDriverConfigs() { + // task driver config initialization + URL url = Resources.getResource("BasicTaskDriver.conf"); + Config taskDriverConfig = ConfigFactory.parseURL(url); + taskDriverConfig = taskDriverConfig.withFallback(getClusterConfig()); + Config taskDriver1 = addInstanceName(taskDriverConfig, "TaskDriver1"); + return ImmutableList.of(taskDriver1); + } + + @Override + protected Config getClusterConfig() { + Map<String, String> configMap = new HashMap<>(); + configMap.put(GobblinClusterConfigurationKeys.DEDICATED_TASK_DRIVER_CLUSTER_ENABLED, "true"); + configMap.put(GobblinClusterConfigurationKeys.TASK_DRIVER_CLUSTER_NAME_KEY, "TaskDriverCluster"); + Config config = ConfigFactory.parseMap(configMap); + return config.withFallback(super.getClusterConfig()).resolve(); + } + + @Override + protected Collection<Config> getWorkerConfigs() { + Config baseConfig = super.getWorkerConfigs().iterator().next(); + Config workerConfig1 = addInstanceName(baseConfig, "Worker1"); + return ImmutableList.of(workerConfig1); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java index 4424cae..905db6d 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java @@ -54,7 +54,6 @@ import org.apache.gobblin.testing.AssertWithBackoff; */ @Slf4j public class IntegrationJobTagSuite extends IntegrationBasicSuite { - private static final String WORKER_INSTANCE_NAME_KEY = "worker.instance.name"; private static final String WORKER_INSTANCE_1 = "WorkerInstance_1"; private static final String WORKER_INSTANCE_2 = "WorkerInstance_2"; private static final String WORKER_INSTANCE_3 = "WorkerInstance_3"; @@ -84,7 +83,7 @@ public class IntegrationJobTagSuite extends IntegrationBasicSuite { Map<String, String> configMap = new HashMap<>(); if (tags!= null && tags.size() > 0) { configMap.put(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, Joiner.on(',').join(tags)); - configMap.put(WORKER_INSTANCE_NAME_KEY, instanceName); + configMap.put(IntegrationBasicSuite.TEST_INSTANCE_NAME_KEY, instanceName); } return ConfigFactory.parseMap(configMap).withFallback(workerConfig); } @@ -105,22 +104,6 @@ public class IntegrationJobTagSuite extends IntegrationBasicSuite { return ConfigFactory.parseMap(ImmutableMap.of(GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER, "JobTagTaskRunnerSuiteBuilder")).withFallback(workerConfig); } - @Override - protected void startWorker() throws Exception { - // Each workerConfig corresponds to a worker instance - for (Config workerConfig: this.workerConfigs) { - GobblinTaskRunner runner = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, workerConfig.getString(WORKER_INSTANCE_NAME_KEY), - TestHelper.TEST_APPLICATION_ID, "1", - workerConfig, Optional.absent()); - this.workers.add(runner); - - // Need to run in another thread since the start call will not return until the stop method - // is called. - Thread workerThread = new Thread(runner::start); - workerThread.start(); - } - } - /** * Create different jobs with different tags */ @@ -163,7 +146,7 @@ public class IntegrationJobTagSuite extends IntegrationBasicSuite { private String instanceName; public JobTagTaskRunnerSuiteBuilder(Config config) { super(config); - this.instanceName = config.getString(IntegrationJobTagSuite.WORKER_INSTANCE_NAME_KEY); + this.instanceName = config.getString(IntegrationJobTagSuite.TEST_INSTANCE_NAME_KEY); } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/d10bae88/gobblin-cluster/src/test/resources/BasicTaskDriver.conf ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/resources/BasicTaskDriver.conf b/gobblin-cluster/src/test/resources/BasicTaskDriver.conf new file mode 100644 index 0000000..84bc5b2 --- /dev/null +++ b/gobblin-cluster/src/test/resources/BasicTaskDriver.conf @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Cluster / Helix configuration properties +gobblin.cluster.taskDriver.enabled=true \ No newline at end of file
