Repository: incubator-gobblin Updated Branches: refs/heads/master 31986790e -> c1e9cf250
[GOBBLIN-490] Allow jobs to be re-distributed to worker nodes and launch there Closes #2360 from yukuai518/jobDistribute Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c1e9cf25 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c1e9cf25 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c1e9cf25 Branch: refs/heads/master Commit: c1e9cf250c3e844dcf649b730fb9a6b464b740a9 Parents: 3198679 Author: Kuai Yu <[email protected]> Authored: Fri Jun 15 16:07:37 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Fri Jun 15 16:07:37 2018 -0700 ---------------------------------------------------------------------- .../GobblinClusterConfigurationKeys.java | 10 + ...blinHelixDistributeJobExecutionLauncher.java | 312 +++++++++++++++++++ .../gobblin/cluster/GobblinHelixJobFactory.java | 63 ++++ .../cluster/GobblinHelixJobLauncher.java | 79 ++--- .../cluster/GobblinHelixJobScheduler.java | 63 +--- .../gobblin/cluster/GobblinHelixJobTask.java | 107 +++++++ .../gobblin/cluster/GobblinTaskRunner.java | 10 +- .../cluster/HelixRetriggeringJobCallable.java | 150 +++++++++ .../org/apache/gobblin/cluster/HelixUtils.java | 109 ++++++- .../gobblin/cluster/TaskRunnerSuiteBase.java | 6 +- .../cluster/TaskRunnerSuiteProcessModel.java | 11 +- .../cluster/TaskRunnerSuiteThreadModel.java | 10 +- .../gobblin/cluster/ClusterIntegrationTest.java | 8 + .../TaskRunnerSuiteForJobFactoryTest.java | 113 +++++++ .../cluster/TaskRunnerSuiteForJobTagTest.java | 8 +- .../cluster/suite/IntegrationBasicSuite.java | 39 ++- .../suite/IntegrationJobFactorySuite.java | 81 +++++ .../cluster/suite/IntegrationJobTagSuite.java | 46 +-- .../apache/gobblin/util/PropertiesUtils.java | 4 + 19 files changed, 1061 insertions(+), 168 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index 492edb8..648b5bc 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -47,6 +47,11 @@ public class GobblinClusterConfigurationKeys { public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false; public static final String CLUSTER_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + "workDir"; + public static final String DISTRIBUTED_JOB_LAUNCHER_ENABLED = GOBBLIN_CLUSTER_PREFIX + "distributedJobLauncherEnabled"; + public static final boolean DEFAULT_DISTRIBUTED_JOB_LAUNCHER_ENABLED = false; + public static final String DISTRIBUTED_JOB_LAUNCHER_BUILDER = GOBBLIN_CLUSTER_PREFIX + "distributedJobLauncherBuilder"; + + // Helix configuration properties. public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + "helix.cluster.name"; public static final String MANAGER_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + "manager.cluster.name"; @@ -66,6 +71,11 @@ public class GobblinClusterConfigurationKeys { public static final String HELIX_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + "helixJobTag"; public static final String HELIX_INSTANCE_TAGS_KEY = GOBBLIN_CLUSTER_PREFIX + "helixInstanceTags"; + // Planning job properties + public static final String PLANNING_JOB_NAME_PREFIX = "PlanningJob"; + public static final String PLANNING_CONF_PREFIX = GOBBLIN_CLUSTER_PREFIX + "planning."; + public static final String PLANNING_ID_KEY = PLANNING_CONF_PREFIX + "idKey"; + /** * A path pointing to a directory that contains job execution files to be executed by Gobblin. This directory can * have a nested structure. http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java new file mode 100644 index 0000000..5800e8d --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixDistributeJobExecutionLauncher.java @@ -0,0 +1,312 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.FutureTask; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.fs.Path; +import org.apache.helix.HelixManager; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.TaskConfig; +import org.apache.helix.task.TaskDriver; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; + +import javax.annotation.Nonnull; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.runtime.api.ExecutionResult; +import org.apache.gobblin.runtime.api.JobExecutionLauncher; +import org.apache.gobblin.runtime.api.JobExecutionMonitor; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.MonitoredObject; +import org.apache.gobblin.runtime.util.StateStores; +import org.apache.gobblin.source.extractor.partition.Partitioner; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.JobLauncherUtils; +import org.apache.gobblin.util.PropertiesUtils; + + +/** + * To avoid all the task driver logic ({@link GobblinHelixJobLauncher}) runs on the same instance (node), this + * {@link JobExecutionLauncher} can distribute the original job (called planning job) to Helix. Helix will + * assign this job to one participant. The participant can parse the original job properties and run the task driver. + * + * <p> + * For job submission, the Helix workflow name will be the original job name with prefix + * {@link GobblinClusterConfigurationKeys#PLANNING_JOB_NAME_PREFIX}. The Helix job name will be the auto-generated planning + * job ID with prefix {@link GobblinClusterConfigurationKeys#PLANNING_ID_KEY}. + * </p> + * + * <p> + * We will associate this job to Helix's {@link org.apache.helix.task.TaskFactory} + * by specifying {@link GobblinTaskRunner#GOBBLIN_JOB_FACTORY_NAME} in the {@link JobConfig.Builder}. + * This job will only contain a single task, which is the same as planningID. + * </p> + */ +@Alpha +@Slf4j +class GobblinHelixDistributeJobExecutionLauncher implements JobExecutionLauncher { + protected HelixManager helixManager; + protected TaskDriver helixTaskDriver; + protected Properties sysProperties; + protected Properties jobProperties; + protected StateStores stateStores; + + protected static final String PLANNING_WORK_UNIT_DIR_NAME = "_plan_workunits"; + protected static final String PLANNING_TASK_STATE_DIR_NAME = "_plan_taskstates"; + protected static final String PLANNING_JOB_STATE_DIR_NAME = "_plan_jobstates"; + + protected static final String JOB_PROPS_PREFIX = "gobblin.jobProps."; + + private final long jobQueueDeleteTimeoutSeconds; + + public GobblinHelixDistributeJobExecutionLauncher(Builder builder) throws Exception { + this.helixManager = builder.manager; + this.helixTaskDriver = new TaskDriver(this.helixManager); + this.sysProperties = builder.sysProperties; + this.jobProperties = builder.jobProperties; + + Config combined = ConfigUtils.propertiesToConfig(jobProperties) + .withFallback(ConfigUtils.propertiesToConfig(sysProperties)); + + Config stateStoreJobConfig = combined + .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, ConfigValueFactory.fromAnyRef( + new URI(builder.appWorkDir.toUri().getScheme(), null, builder.appWorkDir.toUri().getHost(), + builder.appWorkDir.toUri().getPort(), null, null, null).toString())); + + this.stateStores = new StateStores(stateStoreJobConfig, + builder.appWorkDir, PLANNING_TASK_STATE_DIR_NAME, + builder.appWorkDir, PLANNING_WORK_UNIT_DIR_NAME, + builder.appWorkDir, PLANNING_JOB_STATE_DIR_NAME); + + this.jobQueueDeleteTimeoutSeconds = ConfigUtils.getLong(combined, + GobblinClusterConfigurationKeys.HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS, + GobblinClusterConfigurationKeys.DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS); + } + + @Setter + public static class Builder { + Properties sysProperties; + Properties jobProperties; + HelixManager manager; + Path appWorkDir; + public GobblinHelixDistributeJobExecutionLauncher build() throws Exception { + return new GobblinHelixDistributeJobExecutionLauncher(this); + } + } + + private String getPlanningJobName (Properties jobProps) { + String jobName = JobState.getJobNameFromProps(jobProps); + return GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX + jobName; + } + + protected String getPlanningJobId (Properties jobProps) { + if (jobProps.containsKey(GobblinClusterConfigurationKeys.PLANNING_ID_KEY)) { + return jobProps.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY); + } + String planningId = JobLauncherUtils.newJobId(getPlanningJobName(jobProps)); + jobProps.setProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY, planningId); + return planningId; + } + + /** + * Create a job config builder which has a single task that wraps the original jobProps. + */ + private JobConfig.Builder createPlanningJob (Properties jobProps) { + // Create a single task for job planning + String planningId = getPlanningJobId(jobProps); + Map<String, TaskConfig> taskConfigMap = Maps.newHashMap(); + Map<String, String> rawConfigMap = Maps.newHashMap(); + for (String key : jobProps.stringPropertyNames()) { + rawConfigMap.put(JOB_PROPS_PREFIX + key, (String)jobProps.get(key)); + } + rawConfigMap.put(GobblinClusterConfigurationKeys.TASK_SUCCESS_OPTIONAL_KEY, "true"); + + // Create a single Job which only contains a single task + taskConfigMap.put(planningId, TaskConfig.Builder.from(rawConfigMap)); + JobConfig.Builder jobConfigBuilder = new JobConfig.Builder(); + + jobConfigBuilder.setTimeoutPerTask(PropertiesUtils.getPropAsLong( + jobProps, + ConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS, + ConfigurationKeys.DEFAULT_HELIX_TASK_TIMEOUT_SECONDS) * 1000); + + jobConfigBuilder.setFailureThreshold(1); + jobConfigBuilder.addTaskConfigMap(taskConfigMap).setCommand(GobblinTaskRunner.GOBBLIN_JOB_FACTORY_NAME); + + return jobConfigBuilder; + } + + /** + * Submit job to helix so that it can be re-assigned to one of its participants. + * @param jobName A planning job name which has prefix {@link GobblinClusterConfigurationKeys#PLANNING_JOB_NAME_PREFIX}. + * @param jobId A planning job id created by {@link GobblinHelixDistributeJobExecutionLauncher#getPlanningJobId}. + * @param jobConfigBuilder A job config builder which contains a single task. + */ + private void submitJobToHelix(String jobName, String jobId, JobConfig.Builder jobConfigBuilder) throws Exception { + TaskDriver taskDriver = new TaskDriver(this.helixManager); + HelixUtils.submitJobToQueue(jobConfigBuilder, + jobName, + jobId, + taskDriver, + this.helixManager, + this.jobQueueDeleteTimeoutSeconds); + } + + @Override + public DistributeJobMonitor launchJob(JobSpec jobSpec) { + return new DistributeJobMonitor(new DistributeJobCallable(this.jobProperties)); + } + + @AllArgsConstructor + private class DistributeJobCallable implements Callable<ExecutionResult> { + Properties jobProps; + @Override + public DistributeJobResult call() + throws Exception { + String planningName = getPlanningJobName(this.jobProps); + String planningId = getPlanningJobId(this.jobProps); + JobConfig.Builder builder = createPlanningJob(this.jobProps); + try { + submitJobToHelix(planningName, planningId, builder); + return waitForJobCompletion(planningName, planningId); + } catch (Exception e) { + log.error(planningName + " is not able to submit."); + return new DistributeJobResult(Optional.empty(), Optional.of(e)); + } + } + } + + private DistributeJobResult waitForJobCompletion(String planningName, String planningId) throws InterruptedException { + boolean timeoutEnabled = Boolean.parseBoolean(this.jobProperties.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_ENABLED_KEY, + ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED)); + long timeoutInSeconds = Long.parseLong(this.jobProperties.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, + ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS)); + + try { + HelixUtils.waitJobCompletion( + GobblinHelixDistributeJobExecutionLauncher.this.helixManager, + planningName, + planningId, + timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty()); + return getResultFromUserContent(); + } catch (TimeoutException te) { + HelixUtils.helixTaskDriverWaitToStop(helixManager, helixTaskDriver, planningName, 10L); + this.helixTaskDriver.delete(planningName); + this.helixTaskDriver.resume(planningName); + log.info("stopped the queue, deleted the job"); + return new DistributeJobResult(Optional.empty(), Optional.of(te)); + } + } + + //TODO: change below to Helix UserConentStore + @VisibleForTesting + protected DistributeJobResult getResultFromUserContent() { + String planningId = getPlanningJobId(this.jobProperties); + try { + TaskState taskState = this.stateStores.getTaskStateStore().get(planningId, planningId, planningId); + return new DistributeJobResult(Optional.of(taskState.getProperties()), Optional.empty()); + } catch (IOException e) { + return new DistributeJobResult(Optional.empty(), Optional.of(e)); + } + } + + @Getter + @AllArgsConstructor + static class DistributeJobResult implements ExecutionResult { + boolean isEarlyStopped = false; + Optional<Properties> properties; + Optional<Throwable> throwable; + public DistributeJobResult(Optional<Properties> properties, Optional<Throwable> throwable) { + this.properties = properties; + this.throwable = throwable; + if (properties.isPresent()) { + isEarlyStopped = PropertiesUtils.getPropAsBoolean(this.properties.get(), Partitioner.IS_EARLY_STOPPED, "false"); + } + } + } + + static class DistributeJobMonitor extends FutureTask<ExecutionResult> implements JobExecutionMonitor { + private ExecutorService executor = Executors.newSingleThreadExecutor(); + public DistributeJobMonitor (Callable<ExecutionResult> c) { + super(c); + this.executor.execute(this); + } + + @Override + public MonitoredObject getRunningState() { + throw new UnsupportedOperationException(); + } + } + + @Override + public StandardMetrics getMetrics() { + throw new UnsupportedOperationException(); + } + + @Nonnull + @Override + public MetricContext getMetricContext() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isInstrumentationEnabled() { + return false; + } + + @Override + public List<Tag<?>> generateTags(State state) { + return Lists.newArrayList(); + } + + @Override + public void switchMetricContext(List<Tag<?>> tags) { + throw new UnsupportedOperationException(); + } + + @Override + public void switchMetricContext(MetricContext context) { + throw new UnsupportedOperationException(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java new file mode 100644 index 0000000..2f7ced2 --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.net.URI; + +import org.apache.hadoop.fs.Path; +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskFactory; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.util.StateStores; +import org.apache.gobblin.util.PathUtils; + + +/** + * An implementation of Helix's {@link TaskFactory} for {@link GobblinHelixJobTask}s. + */ +@Slf4j +public class GobblinHelixJobFactory implements TaskFactory { + protected Config sysConfig; + protected StateStores stateStores; + + public GobblinHelixJobFactory(TaskRunnerSuiteBase.Builder builder) { + this.sysConfig = builder.getConfig(); + Path appWorkDir = builder.getAppWorkPath(); + URI rootPathUri = PathUtils.getRootPath(appWorkDir).toUri(); + Config stateStoreJobConfig = sysConfig + .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, + ConfigValueFactory.fromAnyRef(rootPathUri.toString())); + + this.stateStores = new StateStores(stateStoreJobConfig, + appWorkDir, GobblinHelixDistributeJobExecutionLauncher.PLANNING_TASK_STATE_DIR_NAME, + appWorkDir, GobblinHelixDistributeJobExecutionLauncher.PLANNING_WORK_UNIT_DIR_NAME, + appWorkDir, GobblinHelixDistributeJobExecutionLauncher.PLANNING_JOB_STATE_DIR_NAME); + } + + @Override + public Task createNewTask(TaskCallbackContext context) { + return new GobblinHelixJobTask(context, this.sysConfig, stateStores); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index 6b86c5c..231575e 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -21,9 +21,11 @@ import java.io.IOException; import java.net.URI; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeoutException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -31,12 +33,9 @@ import org.apache.hadoop.fs.Path; import org.apache.helix.HelixManager; import org.apache.helix.task.JobConfig; import org.apache.helix.task.JobQueue; -import org.apache.helix.task.TargetState; import org.apache.helix.task.TaskConfig; import org.apache.helix.task.TaskDriver; import org.apache.helix.task.TaskUtil; -import org.apache.helix.task.WorkflowConfig; -import org.apache.helix.task.WorkflowContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -279,27 +278,8 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { * Submit a job to run. */ private void submitJobToHelix(JobConfig.Builder jobConfigBuilder) throws Exception { - WorkflowConfig workflowConfig = this.helixTaskDriver.getWorkflowConfig(this.helixManager, this.helixQueueName); - - // If the queue is present, but in delete state then wait for cleanup before recreating the queue - if (workflowConfig != null && workflowConfig.getTargetState() == TargetState.DELETE) { - GobblinHelixTaskDriver gobblinHelixTaskDriver = new GobblinHelixTaskDriver(this.helixManager); - gobblinHelixTaskDriver.deleteWorkflow(this.helixQueueName, this.jobQueueDeleteTimeoutSeconds); - // if we get here then the workflow was successfully deleted - workflowConfig = null; - } - - // Create one queue for each job with the job name being the queue name - if (workflowConfig == null) { - JobQueue jobQueue = new JobQueue.Builder(this.helixQueueName).build(); - this.helixTaskDriver.createQueue(jobQueue); - LOGGER.info("Created job queue {}", this.helixQueueName); - } else { - LOGGER.info("Job queue {} already exists", this.helixQueueName); - } - - // Put the job into the queue - this.helixTaskDriver.enqueueJob(this.jobContext.getJobName(), this.jobContext.getJobId(), jobConfigBuilder); + HelixUtils.submitJobToQueue(jobConfigBuilder, this.helixQueueName, this.jobContext.getJobId(), + this.helixTaskDriver, this.helixManager, this.jobQueueDeleteTimeoutSeconds); } public void launchJob(@Nullable JobListener jobListener) @@ -380,45 +360,22 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_ENABLED)); long timeoutInSeconds = Long.parseLong(this.jobProps.getProperty(ConfigurationKeys.HELIX_JOB_TIMEOUT_SECONDS, ConfigurationKeys.DEFAULT_HELIX_JOB_TIMEOUT_SECONDS)); - long endTime = System.currentTimeMillis() + timeoutInSeconds*1000; - while (!timeoutEnabled || System.currentTimeMillis() <= endTime) { - WorkflowContext workflowContext = TaskDriver.getWorkflowContext(this.helixManager, this.helixQueueName); - if (workflowContext != null) { - org.apache.helix.task.TaskState helixJobState = workflowContext.getJobState(this.jobResourceName); - if (helixJobState == org.apache.helix.task.TaskState.COMPLETED || - helixJobState == org.apache.helix.task.TaskState.FAILED || - helixJobState == org.apache.helix.task.TaskState.STOPPED) { - return; - } - } - Thread.sleep(1000); - } - helixTaskDriverWaitToStop(this.helixQueueName, 10L); - try { - cancelJob(this.jobListener); - } catch (JobException e) { - throw new RuntimeException("Unable to cancel job " + jobContext.getJobName() + ": ", e); - } - this.helixTaskDriver.resume(this.helixQueueName); - LOGGER.info("stopped the queue, deleted the job"); - } - /** - * Because fix https://github.com/apache/helix/commit/ae8e8e2ef37f48d782fc12f85ca97728cf2b70c4 - * is not available in currently used version 0.6.9 - */ - private void helixTaskDriverWaitToStop(String workflow, long timeoutInSeconds) throws InterruptedException { - this.helixTaskDriver.stop(workflow); - long endTime = System.currentTimeMillis() + timeoutInSeconds*1000; - while (System.currentTimeMillis() <= endTime) { - WorkflowContext workflowContext = TaskDriver.getWorkflowContext(this.helixManager, this.helixQueueName); - if (workflowContext == null || workflowContext.getWorkflowState() - .equals(org.apache.helix.task.TaskState.IN_PROGRESS)) { - Thread.sleep(1000); - } else { - LOGGER.info("Successfully stopped the queue"); - return; + try { + HelixUtils.waitJobCompletion( + this.helixManager, + this.helixQueueName, + this.jobContext.getJobId(), + timeoutEnabled? Optional.of(timeoutInSeconds) : Optional.empty()); + } catch (TimeoutException te) { + HelixUtils.helixTaskDriverWaitToStop(helixManager, helixTaskDriver, helixQueueName, 10L); + try { + cancelJob(this.jobListener); + } catch (JobException e) { + throw new RuntimeException("Unable to cancel job " + jobContext.getJobName() + ": ", e); } + this.helixTaskDriver.resume(this.helixQueueName); + LOGGER.info("stopped the queue, deleted the job"); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java index fac7242..b991406 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobScheduler.java @@ -21,7 +21,6 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.List; import java.util.Properties; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -41,7 +40,6 @@ import com.google.common.eventbus.EventBus; import com.google.common.eventbus.Subscribe; import javax.annotation.Nonnull; -import lombok.Getter; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.cluster.event.DeleteJobConfigArrivalEvent; @@ -56,7 +54,6 @@ import org.apache.gobblin.metrics.MetricContext; import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.runtime.JobContext; import org.apache.gobblin.runtime.JobException; -import org.apache.gobblin.runtime.JobLauncher; import org.apache.gobblin.runtime.JobState; import org.apache.gobblin.runtime.api.JobExecutionLauncher; import org.apache.gobblin.runtime.api.MutableJobCatalog; @@ -65,7 +62,6 @@ import org.apache.gobblin.runtime.listeners.JobListener; import org.apache.gobblin.scheduler.JobScheduler; import org.apache.gobblin.scheduler.SchedulerService; import org.apache.gobblin.util.ConfigUtils; -import org.apache.gobblin.util.PropertiesUtils; /** @@ -277,56 +273,27 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe @Override public void runJob(Properties jobProps, JobListener jobListener) throws JobException { - new RetriggeringJobCallable(jobProps, jobListener).call(); + new HelixRetriggeringJobCallable(this, this.properties, jobProps, jobListener, this.appWorkDir, this.helixManager).call(); } @Override public GobblinHelixJobLauncher buildJobLauncher(Properties jobProps) throws Exception { - return new GobblinHelixJobLauncher(jobProps, this.helixManager, this.appWorkDir, this.metadataTags, this.jobRunningMap); - } - - private class RetriggeringJobCallable implements Callable { - Properties jobProps; - JobListener jobListener; - - public RetriggeringJobCallable(Properties jobProps, JobListener jobListener) { - this.jobProps = jobProps; - this.jobListener = jobListener; - } + Properties combinedProps = new Properties(); + combinedProps.putAll(properties); + combinedProps.putAll(jobProps); - private boolean isRetriggeringEnabled() { - return PropertiesUtils.getPropAsBoolean(jobProps, ConfigurationKeys.JOB_RETRIGGERING_ENABLED, ConfigurationKeys.DEFAULT_JOB_RETRIGGERING_ENABLED); - } - - @Getter - JobLauncher currentJobLauncher = null; - - @Override - public Void call() throws JobException { - try { - while (true) { - currentJobLauncher = buildJobLauncher(jobProps); - boolean isEarlyStopped = runJob(jobProps, jobListener, currentJobLauncher); - boolean isRetriggerEnabled = this.isRetriggeringEnabled(); - if (isEarlyStopped && isRetriggerEnabled) { - LOGGER.info("Job {} will be re-triggered.", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)); - } else { - break; - } - currentJobLauncher = null; - } - } catch (Exception e) { - LOGGER.error("Failed to run job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); - throw new JobException("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); - } - - return null; - } + return new GobblinHelixJobLauncher(combinedProps, this.helixManager, this.appWorkDir, this.metadataTags, this.jobRunningMap); } public Future<?> scheduleJobImmediately(Properties jobProps, JobListener jobListener) { - RetriggeringJobCallable retriggeringJob = new RetriggeringJobCallable(jobProps, jobListener); + HelixRetriggeringJobCallable retriggeringJob = new HelixRetriggeringJobCallable(this, + this.properties, + jobProps, + jobListener, + this.appWorkDir, + this.helixManager); + final Future<?> future = this.jobExecutor.submit(retriggeringJob); return new Future() { @Override @@ -336,10 +303,7 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe } boolean result = true; try { - JobLauncher jobLauncher = retriggeringJob.getCurrentJobLauncher(); - if (jobLauncher != null) { - jobLauncher.cancelJob(jobListener); - } + retriggeringJob.cancel(); } catch (JobException e) { LOGGER.error("Failed to cancel job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); result = false; @@ -377,7 +341,6 @@ public class GobblinHelixJobScheduler extends JobScheduler implements StandardMe LOGGER.info("Received new job configuration of job " + newJobArrival.getJobName()); try { Properties jobConfig = new Properties(); - jobConfig.putAll(this.properties); jobConfig.putAll(newJobArrival.getJobConfig()); metrics.updateTimeBeforeJobScheduling(jobConfig); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java new file mode 100644 index 0000000..f60852e --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobTask.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskConfig; +import org.apache.helix.task.TaskResult; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.runtime.TaskState; +import org.apache.gobblin.runtime.util.StateStores; +import org.apache.gobblin.source.extractor.partition.Partitioner; +import org.apache.gobblin.util.ConfigUtils; + +/** + * An implementation of Helix's {@link org.apache.helix.task.Task} that runs original {@link GobblinHelixJobLauncher} + */ +@Slf4j +public class GobblinHelixJobTask implements Task { + + private final TaskConfig taskConfig; + private Config sysConfig; + private Properties jobConfig; + private StateStores stateStores; + private String planningJobId; + + public GobblinHelixJobTask(TaskCallbackContext context, + Config sysConfig, + StateStores stateStores) { + this.taskConfig = context.getTaskConfig(); + this.sysConfig = sysConfig; + this.jobConfig = ConfigUtils.configToProperties(sysConfig); + Map<String, String> configMap = this.taskConfig.getConfigMap(); + for (Map.Entry<String, String> entry: configMap.entrySet()) { + if (entry.getKey().startsWith(GobblinHelixDistributeJobExecutionLauncher.JOB_PROPS_PREFIX)) { + String key = entry.getKey().replaceFirst(GobblinHelixDistributeJobExecutionLauncher.JOB_PROPS_PREFIX, ""); + jobConfig.put(key, entry.getValue()); + } + } + + if (!jobConfig.containsKey(GobblinClusterConfigurationKeys.PLANNING_ID_KEY)) { + throw new RuntimeException("Job doesn't have plannning ID"); + } + + this.planningJobId = jobConfig.getProperty(GobblinClusterConfigurationKeys.PLANNING_ID_KEY); + this.stateStores = stateStores; + } + + @Override + public TaskResult run() { + log.info("We will run planning job " + this.planningJobId); + + // TODO: We should run GobblinHelixJobLauncher#launchJob() here + + try { + setResultToUserContent(ImmutableMap.of(Partitioner.IS_EARLY_STOPPED, "false")); + } catch (IOException e) { + return new TaskResult(TaskResult.Status.FAILED, "State store cannot be persisted for job " + planningJobId); + } + return new TaskResult(TaskResult.Status.COMPLETED, ""); + } + + //TODO: change below to Helix UserConentStore + @VisibleForTesting + protected void setResultToUserContent(Map<String, String> keyValues) throws IOException { + WorkUnitState wus = new WorkUnitState(); + wus.setProp(ConfigurationKeys.JOB_ID_KEY, this.planningJobId); + wus.setProp(ConfigurationKeys.TASK_ID_KEY, this.planningJobId); + wus.setProp(ConfigurationKeys.TASK_KEY_KEY, this.planningJobId); + keyValues.forEach((key, value)->wus.setProp(key, value)); + TaskState taskState = new TaskState(wus); + + this.stateStores.getTaskStateStore().put(this.planningJobId, this.planningJobId, taskState); + } + + @Override + public void cancel() { + // TODO: We should delete the real job. + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index b6f04f1..fb7136d 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -59,7 +59,6 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Service; @@ -117,6 +116,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge { static final String GOBBLIN_TASK_FACTORY_NAME = "GobblinTaskFactory"; + static final String GOBBLIN_JOB_FACTORY_NAME = "GobblinJobFactory"; + private final String helixInstanceName; private final String clusterName; @@ -175,7 +176,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge { .setFileSystem(this.fs) .setHelixManager(this.helixManager).build(); - this.taskStateModelFactory = createTaskStateModelFactory(suite.getTaskFactory()); + this.taskStateModelFactory = createTaskStateModelFactory(suite.getTaskFactoryMap()); this.metrics = suite.getTaskMetrics(); this.metricContext = suite.getMetricContext(); this.services.addAll(suite.getServices()); @@ -205,10 +206,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge { this.clusterName, this.helixInstanceName, InstanceType.PARTICIPANT, zkConnectionString); } - private TaskStateModelFactory createTaskStateModelFactory(TaskFactory factory) { - Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap(); - - taskFactoryMap.put(GOBBLIN_TASK_FACTORY_NAME, factory); + private TaskStateModelFactory createTaskStateModelFactory(Map<String, TaskFactory> taskFactoryMap) { TaskStateModelFactory taskStateModelFactory = new TaskStateModelFactory(this.helixManager, taskFactoryMap); this.helixManager.getStateMachineEngine() http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java new file mode 100644 index 0000000..ce3619b --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixRetriggeringJobCallable.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.util.Properties; +import java.util.concurrent.Callable; + +import org.apache.hadoop.fs.Path; +import org.apache.helix.HelixManager; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.JobException; +import org.apache.gobblin.runtime.JobLauncher; +import org.apache.gobblin.runtime.api.ExecutionResult; +import org.apache.gobblin.runtime.api.JobExecutionMonitor; +import org.apache.gobblin.runtime.listeners.JobListener; +import org.apache.gobblin.util.ClassAliasResolver; +import org.apache.gobblin.util.PropertiesUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; + + +/** + * A {@link Callable} that runs {@link JobLauncher} multiple times iff re-triggering is enabled and job stops early. + */ +@Slf4j +@Alpha +class HelixRetriggeringJobCallable implements Callable { + private GobblinHelixJobScheduler jobScheduler; + private Properties sysProps; + private Properties jobProps; + private JobListener jobListener; + private JobLauncher currentJobLauncher = null; + private JobExecutionMonitor currentJobMonitor = null; + private Path appWorkDir; + private HelixManager helixManager; + + public HelixRetriggeringJobCallable( + GobblinHelixJobScheduler jobScheduler, + Properties sysProps, + Properties jobProps, + JobListener jobListener, + Path appWorkDir, + HelixManager helixManager) { + this.jobScheduler = jobScheduler; + this.sysProps = sysProps; + this.jobProps = jobProps; + this.jobListener = jobListener; + this.appWorkDir = appWorkDir; + this.helixManager = helixManager; + } + + private boolean isRetriggeringEnabled() { + return PropertiesUtils.getPropAsBoolean(jobProps, ConfigurationKeys.JOB_RETRIGGERING_ENABLED, + ConfigurationKeys.DEFAULT_JOB_RETRIGGERING_ENABLED); + } + + private boolean isDistributeJobEnabled() { + Properties combinedProps = new Properties(); + combinedProps.putAll(sysProps); + combinedProps.putAll(jobProps); + return (PropertiesUtils.getPropAsBoolean(combinedProps, + GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED, + Boolean.toString(GobblinClusterConfigurationKeys.DEFAULT_DISTRIBUTED_JOB_LAUNCHER_ENABLED))); + } + + @Override + public Void call() throws JobException { + if (isDistributeJobEnabled()) { + launchJobExecutionLauncherLoop(); + } else { + launchJobLauncherLoop(); + } + + return null; + } + + private void launchJobLauncherLoop() throws JobException { + try { + while (true) { + currentJobLauncher = this.jobScheduler.buildJobLauncher(jobProps); + boolean isEarlyStopped = this.jobScheduler.runJob(jobProps, jobListener, currentJobLauncher); + boolean isRetriggerEnabled = this.isRetriggeringEnabled(); + if (isEarlyStopped && isRetriggerEnabled) { + log.info("Job {} will be re-triggered.", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)); + } else { + break; + } + currentJobLauncher = null; + } + } catch (Exception e) { + log.error("Failed to run job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); + throw new JobException("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); + } + } + + private void launchJobExecutionLauncherLoop() throws JobException { + try { + while (true) { + String builderStr = jobProps.getProperty(GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, GobblinHelixDistributeJobExecutionLauncher.Builder.class.getName()); + GobblinHelixDistributeJobExecutionLauncher.Builder builder = GobblinConstructorUtils.<GobblinHelixDistributeJobExecutionLauncher.Builder>invokeLongestConstructor( + new ClassAliasResolver(GobblinHelixDistributeJobExecutionLauncher.Builder.class).resolveClass(builderStr)); + + builder.setSysProperties(this.sysProps); + builder.setJobProperties(this.jobProps); + builder.setManager(this.helixManager); + builder.setAppWorkDir(this.appWorkDir); + + this.currentJobMonitor = builder.build().launchJob(null); + ExecutionResult result = this.currentJobMonitor.get(); + boolean isEarlyStopped = ((GobblinHelixDistributeJobExecutionLauncher.DistributeJobResult) result).isEarlyStopped(); + boolean isRetriggerEnabled = this.isRetriggeringEnabled(); + if (isEarlyStopped && isRetriggerEnabled) { + log.info("DistributeJob {} will be re-triggered.", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)); + } else { + break; + } + currentJobMonitor = null; + } + } catch (Exception e) { + log.error("Failed to run job {}", jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); + throw new JobException("Failed to run job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e); + } + } + + public void cancel() throws JobException { + if (currentJobLauncher != null) { + currentJobLauncher.cancelJob(this.jobListener); + } else if (currentJobMonitor != null) { + currentJobMonitor.cancel(false); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java index 8be0621..0c5dbec 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/HelixUtils.java @@ -17,12 +17,24 @@ package org.apache.gobblin.cluster; +import java.util.Optional; +import java.util.concurrent.TimeoutException; + +import org.apache.helix.HelixManager; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.model.HelixConfigScope; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobQueue; +import org.apache.helix.task.TargetState; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.WorkflowConfig; +import org.apache.helix.task.WorkflowContext; import org.apache.helix.tools.ClusterSetup; -import org.apache.gobblin.annotation.Alpha; +import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.annotation.Alpha; /** * A utility class for working with Gobblin on Helix. @@ -30,6 +42,7 @@ import org.apache.gobblin.annotation.Alpha; * @author Yinan Li */ @Alpha +@Slf4j public class HelixUtils { /** @@ -38,7 +51,9 @@ public class HelixUtils { * @param zkConnectionString the ZooKeeper connection string * @param clusterName the Helix cluster name */ - public static void createGobblinHelixCluster(String zkConnectionString, String clusterName) { + public static void createGobblinHelixCluster( + String zkConnectionString, + String clusterName) { createGobblinHelixCluster(zkConnectionString, clusterName, true); } @@ -49,7 +64,10 @@ public class HelixUtils { * @param clusterName the Helix cluster name * @param overwrite true to overwrite exiting cluster, false to reuse existing cluster */ - public static void createGobblinHelixCluster(String zkConnectionString, String clusterName, boolean overwrite) { + public static void createGobblinHelixCluster( + String zkConnectionString, + String clusterName, + boolean overwrite) { ClusterSetup clusterSetup = new ClusterSetup(zkConnectionString); // Create the cluster and overwrite if it already exists clusterSetup.addCluster(clusterName, overwrite); @@ -65,7 +83,90 @@ public class HelixUtils { * @param instanceId an integer instance ID * @return a Helix instance name that is a concatenation of the given prefix and instance ID */ - public static String getHelixInstanceName(String namePrefix, int instanceId) { + public static String getHelixInstanceName( + String namePrefix, + int instanceId) { return namePrefix + "_" + instanceId; } + + public static void submitJobToQueue( + JobConfig.Builder jobConfigBuilder, + String queueName, + String jobName, + TaskDriver helixTaskDriver, + HelixManager helixManager, + long jobQueueDeleteTimeoutSeconds) throws Exception { + + WorkflowConfig workflowConfig = helixTaskDriver.getWorkflowConfig(helixManager, queueName); + + // If the queue is present, but in delete state then wait for cleanup before recreating the queue + if (workflowConfig != null && workflowConfig.getTargetState() == TargetState.DELETE) { + GobblinHelixTaskDriver gobblinHelixTaskDriver = new GobblinHelixTaskDriver(helixManager); + gobblinHelixTaskDriver.deleteWorkflow(queueName, jobQueueDeleteTimeoutSeconds); + // if we get here then the workflow was successfully deleted + workflowConfig = null; + } + + // Create one queue for each job with the job name being the queue name + if (workflowConfig == null) { + JobQueue jobQueue = new JobQueue.Builder(queueName).build(); + helixTaskDriver.createQueue(jobQueue); + log.info("Created job queue {}", queueName); + } else { + log.info("Job queue {} already exists", queueName); + } + + // Put the job into the queue + helixTaskDriver.enqueueJob(queueName, jobName, jobConfigBuilder); + } + + public static void waitJobCompletion( + HelixManager helixManager, + String queueName, + String jobName, + Optional<Long> timeoutInSeconds) throws InterruptedException, TimeoutException { + + log.info("Waiting for job to complete..."); + long endTime = 0; + if (timeoutInSeconds.isPresent()) { + endTime = System.currentTimeMillis() + timeoutInSeconds.get() * 1000; + } + + while (!timeoutInSeconds.isPresent() || System.currentTimeMillis() <= endTime) { + WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, queueName); + if (workflowContext != null) { + org.apache.helix.task.TaskState helixJobState = workflowContext.getJobState(TaskUtil.getNamespacedJobName(queueName, jobName)); + if (helixJobState == org.apache.helix.task.TaskState.COMPLETED || + helixJobState == org.apache.helix.task.TaskState.FAILED || + helixJobState == org.apache.helix.task.TaskState.STOPPED) { + return; + } + } + Thread.sleep(1000); + } + + throw new TimeoutException("task driver wait time [" + timeoutInSeconds + " sec] is expired."); + } + + /** + * Because fix https://github.com/apache/helix/commit/ae8e8e2ef37f48d782fc12f85ca97728cf2b70c4 + * is not available in currently used version 0.6.9 + */ + public static void helixTaskDriverWaitToStop( + HelixManager helixManager, + TaskDriver helixTaskDriver, + String queueName, + long timeoutInSeconds) throws InterruptedException { + helixTaskDriver.stop(queueName); + long endTime = System.currentTimeMillis() + timeoutInSeconds*1000; + while (System.currentTimeMillis() <= endTime) { + WorkflowContext workflowContext = TaskDriver.getWorkflowContext(helixManager, queueName); + if (workflowContext == null || workflowContext.getWorkflowState() + .equals(org.apache.helix.task.TaskState.IN_PROGRESS)) { + Thread.sleep(1000); + } else { + log.info("Successfully stopped the queue"); + } + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java index 080adb0..03d4d42 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java @@ -18,6 +18,7 @@ package org.apache.gobblin.cluster; import java.util.List; +import java.util.Map; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,6 +33,7 @@ import com.typesafe.config.Config; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; import org.apache.gobblin.metrics.MetricContext; @@ -46,8 +48,10 @@ import org.apache.gobblin.util.ConfigUtils; * A list of {@link Service} : register any runtime services necessary to run the tasks. */ @Slf4j +@Alpha public abstract class TaskRunnerSuiteBase { protected TaskFactory taskFactory; + protected TaskFactory jobFactory; protected MetricContext metricContext; protected StandardMetricsBridge.StandardMetrics taskMetrics; protected List<Service> services = Lists.newArrayList(); @@ -62,7 +66,7 @@ public abstract class TaskRunnerSuiteBase { protected abstract StandardMetricsBridge.StandardMetrics getTaskMetrics(); - protected abstract TaskFactory getTaskFactory(); + protected abstract Map<String, TaskFactory> getTaskFactoryMap(); protected abstract List<Service> getServices(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java index f54223f..bf21a4a 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java @@ -18,10 +18,12 @@ package org.apache.gobblin.cluster; import java.util.List; +import java.util.Map; import org.apache.helix.task.TaskCallbackContext; import org.apache.helix.task.TaskFactory; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.Service; import lombok.extern.slf4j.Slf4j; @@ -51,8 +53,13 @@ class TaskRunnerSuiteProcessModel extends TaskRunnerSuiteBase { } @Override - protected TaskFactory getTaskFactory() { - return this.taskFactory; + protected Map<String, TaskFactory> getTaskFactoryMap() { + Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap(); + + taskFactoryMap.put(GobblinTaskRunner.GOBBLIN_TASK_FACTORY_NAME, taskFactory); + + //TODO: taskFactoryMap.put(GOBBLIN_JOB_FACTORY_NAME, jobFactory); + return taskFactoryMap; } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java index fefa5b6..4f3a1e0 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java @@ -19,11 +19,13 @@ package org.apache.gobblin.cluster; import java.net.URI; import java.util.List; +import java.util.Map; import java.util.Properties; import org.apache.helix.task.TaskFactory; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import com.google.common.util.concurrent.Service; import com.typesafe.config.Config; import com.typesafe.config.ConfigValueFactory; @@ -46,6 +48,7 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase { super(builder); this.taskExecutor = new TaskExecutor(ConfigUtils.configToProperties(builder.getConfig())); this.taskFactory = getInProcessTaskFactory(taskExecutor, builder); + this.jobFactory = new GobblinHelixJobFactory(builder); this.taskMetrics = new GobblinTaskRunnerMetrics.InProcessTaskRunnerMetrics(taskExecutor, metricContext); } @@ -55,8 +58,11 @@ class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase { } @Override - protected TaskFactory getTaskFactory() { - return this.taskFactory; + protected Map<String, TaskFactory> getTaskFactoryMap() { + Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap(); + taskFactoryMap.put(GobblinTaskRunner.GOBBLIN_TASK_FACTORY_NAME, taskFactory); + taskFactoryMap.put(GobblinTaskRunner.GOBBLIN_JOB_FACTORY_NAME, jobFactory); + return taskFactoryMap; } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java index 408ebe2..6baaa42 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java @@ -24,6 +24,7 @@ import org.testng.annotations.Test; import org.apache.gobblin.cluster.suite.IntegrationBasicSuite; import org.apache.gobblin.cluster.suite.IntegrationDedicatedManagerClusterSuite; +import org.apache.gobblin.cluster.suite.IntegrationJobFactorySuite; import org.apache.gobblin.cluster.suite.IntegrationJobTagSuite; import org.apache.gobblin.cluster.suite.IntegrationSeparateProcessSuite; @@ -59,6 +60,13 @@ public class ClusterIntegrationTest { runAndVerify(); } + @Test + public void testPlanningJobFactory() + throws Exception { + this.suite = new IntegrationJobFactorySuite(); + runAndVerify(); + } + private void runAndVerify() throws Exception { suite.startCluster(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java new file mode 100644 index 0000000..1fdcda5 --- /dev/null +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobFactoryTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.io.IOException; +import java.util.Map; +import java.util.Properties; + +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskFactory; +import org.testng.Assert; + +import com.google.common.collect.Maps; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alias; +import org.apache.gobblin.cluster.suite.IntegrationJobFactorySuite; +import org.apache.gobblin.runtime.util.StateStores; +import org.apache.gobblin.source.extractor.partition.Partitioner; +import org.apache.gobblin.util.PropertiesUtils; + + +public class TaskRunnerSuiteForJobFactoryTest extends TaskRunnerSuiteThreadModel { + private TaskFactory testJobFactory; + public TaskRunnerSuiteForJobFactoryTest(IntegrationJobFactorySuite.TestJobFactorySuiteBuilder builder) { + super(builder); + this.testJobFactory = new TestJobFactory(builder); + } + + @Override + protected Map<String, TaskFactory> getTaskFactoryMap() { + Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap(); + taskFactoryMap.put(GobblinTaskRunner.GOBBLIN_TASK_FACTORY_NAME, taskFactory); + taskFactoryMap.put(GobblinTaskRunner.GOBBLIN_JOB_FACTORY_NAME, testJobFactory); + return taskFactoryMap; + } + + public class TestJobFactory extends GobblinHelixJobFactory { + public TestJobFactory(IntegrationJobFactorySuite.TestJobFactorySuiteBuilder builder) { + super (builder); + } + + @Override + public Task createNewTask(TaskCallbackContext context) { + return new TestHelixJobTask(context, this.sysConfig, stateStores); + } + } + + public class TestHelixJobTask extends GobblinHelixJobTask { + public TestHelixJobTask(TaskCallbackContext context, + Config sysConfig, + StateStores stateStores) { + super(context, sysConfig, stateStores); + } + + //TODO: change below to Helix UserConentStore + protected void setResultToUserContent(Map<String, String> keyValues) throws IOException { + Map<String, String> customizedKVs = Maps.newHashMap(keyValues); + customizedKVs.put("customizedKey_1", "customizedVal_1"); + customizedKVs.put("customizedKey_2", "customizedVal_2"); + customizedKVs.put("customizedKey_3", "customizedVal_3"); + super.setResultToUserContent(customizedKVs); + } + } + + @Slf4j + public static class TestDistributedExecutionLauncher extends GobblinHelixDistributeJobExecutionLauncher { + + public TestDistributedExecutionLauncher(GobblinHelixDistributeJobExecutionLauncher.Builder builder) throws Exception { + super(builder); + } + + //TODO: change below to Helix UserConentStore + protected DistributeJobResult getResultFromUserContent() { + DistributeJobResult rst = super.getResultFromUserContent(); + Properties properties = rst.getProperties().get(); + Assert.assertTrue(properties.containsKey(Partitioner.IS_EARLY_STOPPED)); + Assert.assertFalse(PropertiesUtils.getPropAsBoolean(properties, Partitioner.IS_EARLY_STOPPED, "false")); + + Assert.assertTrue(properties.getProperty("customizedKey_1").equals("customizedVal_1")); + Assert.assertTrue(properties.getProperty("customizedKey_2").equals("customizedVal_2")); + Assert.assertTrue(properties.getProperty("customizedKey_3").equals("customizedVal_3")); + IntegrationJobFactorySuite.completed.set(true); + return rst; + } + + + @Alias("TestDistributedExecutionLauncherBuilder") + public static class Builder extends GobblinHelixDistributeJobExecutionLauncher.Builder { + public TestDistributedExecutionLauncher build() throws Exception { + return new TestDistributedExecutionLauncher(this); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java index 5f3b2fe..751c314 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java @@ -25,6 +25,8 @@ import org.apache.helix.task.TaskCallbackContext; import org.apache.helix.task.TaskFactory; import org.testng.Assert; +import com.google.common.collect.Maps; + import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.cluster.suite.IntegrationJobTagSuite; @@ -45,8 +47,10 @@ public class TaskRunnerSuiteForJobTagTest extends TaskRunnerSuiteThreadModel { } @Override - protected TaskFactory getTaskFactory() { - return this.jobTagTestFactory; + protected Map<String, TaskFactory> getTaskFactoryMap() { + Map<String, TaskFactory> taskFactoryMap = Maps.newHashMap(); + taskFactoryMap.put(GobblinTaskRunner.GOBBLIN_TASK_FACTORY_NAME, jobTagTestFactory); + return taskFactoryMap; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java index bab8b30..eff11c8 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java @@ -17,9 +17,15 @@ package org.apache.gobblin.cluster.suite; +import java.io.DataOutputStream; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Reader; +import java.io.Writer; import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; @@ -32,10 +38,15 @@ import org.apache.commons.io.FileUtils; import org.apache.curator.test.TestingServer; import org.assertj.core.util.Lists; +import com.google.common.base.Charsets; import com.google.common.base.Optional; +import com.google.common.collect.ImmutableMap; import com.google.common.io.Resources; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import com.typesafe.config.ConfigRenderOptions; +import com.typesafe.config.ConfigSyntax; import lombok.extern.slf4j.Slf4j; @@ -124,10 +135,32 @@ public class IntegrationBasicSuite { } } - protected void copyJobConfFromResource() throws IOException { + private void copyJobConfFromResource() throws IOException { + Map<String, Config> jobConfigs; try (InputStream resourceStream = this.jobConfResourceUrl.openStream()) { - File targetFile = new File(this.jobConfigPath + "/" + JOB_CONF_NAME); - FileUtils.copyInputStreamToFile(resourceStream, targetFile); + Reader reader = new InputStreamReader(resourceStream); + Config rawJobConfig = ConfigFactory.parseReader(reader, ConfigParseOptions.defaults().setSyntax(ConfigSyntax.CONF)); + jobConfigs = overrideJobConfigs(rawJobConfig); + jobConfigs.forEach((jobName, jobConfig)-> { + try { + writeJobConf(jobName, jobConfig); + } catch (IOException e) { + log.error("Job " + jobName + " config cannot be written."); + } + }); + } + } + + protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) { + return ImmutableMap.of("HelloWorldJob", rawJobConfig); + } + + private void writeJobConf(String jobName, Config jobConfig) throws IOException { + String targetPath = this.jobConfigPath + "/" + jobName + ".conf"; + String renderedConfig = jobConfig.root().render(ConfigRenderOptions.defaults()); + try (DataOutputStream os = new DataOutputStream(new FileOutputStream(targetPath)); + Writer writer = new OutputStreamWriter(os, Charsets.UTF_8)) { + writer.write(renderedConfig); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java new file mode 100644 index 0000000..0487c9c --- /dev/null +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobFactorySuite.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster.suite; + +import java.util.Collection; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.testng.collections.Lists; + +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alias; +import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; +import org.apache.gobblin.cluster.TaskRunnerSuiteBase; +import org.apache.gobblin.cluster.TaskRunnerSuiteForJobFactoryTest; + +@Slf4j +public class IntegrationJobFactorySuite extends IntegrationBasicSuite { + + public static AtomicBoolean completed = new AtomicBoolean(false); + + @Override + protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) { + Config newConfig = ConfigFactory.parseMap(ImmutableMap.of( + GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_ENABLED, true, + GobblinClusterConfigurationKeys.DISTRIBUTED_JOB_LAUNCHER_BUILDER, "TestDistributedExecutionLauncherBuilder")); + return ImmutableMap.of("HelloWorldJob", newConfig); + } + + @Override + public Collection<Config> getWorkerConfigs() { + Config rawConfig = super.getWorkerConfigs().iterator().next(); + Config workerConfig = ConfigFactory.parseMap(ImmutableMap.of(GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER, "TestJobFactorySuiteBuilder")) + .withFallback(rawConfig); + + return Lists.newArrayList(workerConfig); + } + + public void waitForAndVerifyOutputFiles() throws Exception { + while (true) { + Thread.sleep(1000); + if (completed.get()) { + break; + } else { + log.info("Waiting for job to be finished"); + } + } + } + + @Alias("TestJobFactorySuiteBuilder") + public static class TestJobFactorySuiteBuilder extends TaskRunnerSuiteBase.Builder { + public TestJobFactorySuiteBuilder(Config config) { + super(config); + } + + @Override + public TaskRunnerSuiteBase build() { + return new TaskRunnerSuiteForJobFactoryTest(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java index adaf702..4424cae 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java @@ -17,31 +17,20 @@ package org.apache.gobblin.cluster.suite; -import java.io.DataOutputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.io.Reader; -import java.io.Writer; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import org.testng.collections.Lists; +import org.testng.collections.Maps; -import com.google.common.base.Charsets; import com.google.common.base.Joiner; import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import com.typesafe.config.ConfigRenderOptions; -import com.typesafe.config.ConfigSyntax; import lombok.Getter; import lombok.extern.slf4j.Slf4j; @@ -136,41 +125,24 @@ public class IntegrationJobTagSuite extends IntegrationBasicSuite { * Create different jobs with different tags */ @Override - protected void copyJobConfFromResource() throws IOException { - try (InputStream resourceStream = this.jobConfResourceUrl.openStream()) { - Reader reader = new InputStreamReader(resourceStream); - Config jobConfig = ConfigFactory.parseReader(reader, ConfigParseOptions.defaults().setSyntax(ConfigSyntax.CONF)); - for(Map.Entry<String, String> assoc: JOB_TAG_ASSOCIATION.entrySet()) { - generateJobConf(jobConfig,assoc.getKey(),assoc.getValue()); - } + protected Map<String, Config> overrideJobConfigs(Config rawJobConfig) { + Map<String, Config> jobConfigs = Maps.newHashMap(); + for(Map.Entry<String, String> assoc: JOB_TAG_ASSOCIATION.entrySet()) { + Config newConfig = getConfigOverride(rawJobConfig, assoc.getKey(), assoc.getValue()); + jobConfigs.put(assoc.getKey(), newConfig); } + return jobConfigs; } - private void generateJobConf(Config jobConfig, String jobName, String tag) throws IOException { - Config newConfig = addJobTag(jobConfig, tag); - newConfig = getConfigOverride(newConfig, jobName); - - String targetPath = this.jobConfigPath + "/" + jobName + ".conf"; - String renderedConfig = newConfig.root().render(ConfigRenderOptions.defaults()); - try (DataOutputStream os = new DataOutputStream(new FileOutputStream(targetPath)); - Writer writer = new OutputStreamWriter(os, Charsets.UTF_8)) { - writer.write(renderedConfig); - } - } - - private Config getConfigOverride(Config config, String jobName) { + private Config getConfigOverride(Config config, String jobName, String jobTag) { Config newConfig = ConfigFactory.parseMap(ImmutableMap.of( + GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY, jobTag, ConfigurationKeys.JOB_NAME_KEY, jobName, ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, this.jobOutputBasePath + "/" + jobName)) .withFallback(config); return newConfig; } - private Config addJobTag(Config jobConfig, String jobTag) { - return ConfigFactory.parseMap(ImmutableMap.of(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY, jobTag)) - .withFallback(jobConfig); - } - @Override public void waitForAndVerifyOutputFiles() throws Exception { AssertWithBackoff asserter = AssertWithBackoff.create().logger(log).timeoutMs(60_000) http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c1e9cf25/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java index fb1e590..4ab6db8 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/PropertiesUtils.java @@ -61,6 +61,10 @@ public class PropertiesUtils { return Boolean.valueOf(properties.getProperty(key, defaultValue)); } + public static long getPropAsLong(Properties properties, String key, long defaultValue) { + return Long.valueOf(properties.getProperty(key, Long.toString(defaultValue))); + } + /** * Extract all the keys that start with a <code>prefix</code> in {@link Properties} to a new {@link Properties} * instance.
