Repository: incubator-gobblin Updated Branches: refs/heads/master 291b40bf9 -> e7bb4c40f
[GOBBLIN-506] Add job tags support for gobblin cluster Closes #2376 from yukuai518/jobta Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/e7bb4c40 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/e7bb4c40 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/e7bb4c40 Branch: refs/heads/master Commit: e7bb4c40fc7171e561f5ab02f10a9b6fddeed4b5 Parents: 291b40b Author: Kuai Yu <[email protected]> Authored: Mon Jun 4 15:02:52 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Mon Jun 4 15:02:52 2018 -0700 ---------------------------------------------------------------------- .../GobblinClusterConfigurationKeys.java | 4 + .../cluster/GobblinHelixJobLauncher.java | 6 +- .../gobblin/cluster/GobblinTaskRunner.java | 115 +++------- .../gobblin/cluster/TaskRunnerSuiteBase.java | 113 +++++++++ .../cluster/TaskRunnerSuiteProcessModel.java | 62 +++++ .../cluster/TaskRunnerSuiteThreadModel.java | 91 ++++++++ .../gobblin/cluster/ClusterIntegrationTest.java | 228 ++----------------- .../cluster/TaskRunnerSuiteForJobTagTest.java | 72 ++++++ .../cluster/suite/IntegrationBasicSuite.java | 220 ++++++++++++++++++ ...IntegrationDedicatedManagerClusterSuite.java | 54 +++++ .../cluster/suite/IntegrationJobTagSuite.java | 202 ++++++++++++++++ .../suite/IntegrationSeparateProcessSuite.java | 45 ++++ .../src/test/resources/BasicCluster.conf | 4 - .../src/test/resources/BasicManager.conf | 22 ++ .../src/test/resources/BasicWorker.conf | 19 ++ .../gobblin/runtime/ForkThrowableHolder.java | 2 +- 16 files changed, 971 insertions(+), 288 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index 1ad7445..492edb8 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -63,6 +63,8 @@ public class GobblinClusterConfigurationKeys { // Should job be executed in the scheduler thread? public static final String JOB_EXECUTE_IN_SCHEDULING_THREAD = GOBBLIN_CLUSTER_PREFIX + "job.executeInSchedulingThread"; public static final boolean JOB_EXECUTE_IN_SCHEDULING_THREAD_DEFAULT = true; + public static final String HELIX_JOB_TAG_KEY = GOBBLIN_CLUSTER_PREFIX + "helixJobTag"; + public static final String HELIX_INSTANCE_TAGS_KEY = GOBBLIN_CLUSTER_PREFIX + "helixInstanceTags"; /** * A path pointing to a directory that contains job execution files to be executed by Gobblin. This directory can @@ -98,4 +100,6 @@ public class GobblinClusterConfigurationKeys { public static final String HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "jobQueueDeleteTimeoutSeconds"; public static final long DEFAULT_HELIX_JOB_QUEUE_DELETE_TIMEOUT_SECONDS = 300; + + public static final String TASK_RUNNER_SUITE_BUILDER = GOBBLIN_CLUSTER_PREFIX + "taskRunnerSuite.builder"; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java index 8c7bbe1..6b86c5c 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java @@ -257,13 +257,17 @@ public class GobblinHelixJobLauncher extends AbstractJobLauncher { jobConfigBuilder.setTimeoutPerTask(this.jobContext.getJobState().getPropAsLong( ConfigurationKeys.HELIX_TASK_TIMEOUT_SECONDS, ConfigurationKeys.DEFAULT_HELIX_TASK_TIMEOUT_SECONDS) * 1000); - + jobConfigBuilder.setFailureThreshold(workUnits.size()); jobConfigBuilder.addTaskConfigMap(taskConfigMap).setCommand(GobblinTaskRunner.GOBBLIN_TASK_FACTORY_NAME); jobConfigBuilder.setNumConcurrentTasksPerInstance(ConfigUtils.getInt(jobConfig, GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY, GobblinClusterConfigurationKeys.HELIX_CLUSTER_TASK_CONCURRENCY_DEFAULT)); + if (this.jobConfig.hasPath(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY)) { + jobConfigBuilder.setInstanceGroupTag(this.jobConfig.getString(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY)); + } + if (Task.getExecutionModel(ConfigUtils.configToState(jobConfig)).equals(ExecutionModel.STREAMING)) { jobConfigBuilder.setRebalanceRunningTask(true); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index e68774d..b6f04f1 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -58,7 +58,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; @@ -70,22 +69,18 @@ import com.typesafe.config.ConfigFactory; import com.typesafe.config.ConfigValueFactory; import javax.annotation.Nonnull; -import lombok.Getter; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.instrumented.StandardMetricsBridge; import org.apache.gobblin.metrics.GobblinMetrics; import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.runtime.TaskExecutor; -import org.apache.gobblin.runtime.TaskStateTracker; -import org.apache.gobblin.runtime.services.JMXReportingService; +import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.FileUtils; import org.apache.gobblin.util.HadoopUtils; import org.apache.gobblin.util.JvmUtils; -import org.apache.gobblin.util.PathUtils; +import org.apache.gobblin.util.reflection.GobblinConstructorUtils; import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR; @@ -124,6 +119,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge { private final String helixInstanceName; + private final String clusterName; + private HelixManager helixManager; private final ServiceManager serviceManager; @@ -147,6 +144,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge { private final String applicationName; private final String applicationId; private final Path appWorkPath; + private final MetricContext metricContext; private final StandardMetricsBridge.StandardMetrics metrics; @@ -160,21 +158,30 @@ public class GobblinTaskRunner implements StandardMetricsBridge { Configuration conf = HadoopUtils.newConfiguration(); this.fs = buildFileSystem(config, conf); - this.appWorkPath = initAppWorkDir(config, appWorkDirOptional); - this.config = saveConfigToFile(config); + this.clusterName = this.config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY); initHelixManager(); this.containerMetrics = buildContainerMetrics(); - TaskFactoryBuilder builder = new TaskFactoryBuilder(this.config); - this.taskStateModelFactory = createTaskStateModelFactory(builder.build()); - this.metrics = builder.getTaskMetrics(); - this.metricContext = builder.getMetricContext(); - services.addAll(getServices()); - if (services.isEmpty()) { + String builderStr = ConfigUtils.getString(this.config, GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER, TaskRunnerSuiteBase.Builder.class.getName()); + TaskRunnerSuiteBase.Builder builder = GobblinConstructorUtils.<TaskRunnerSuiteBase.Builder>invokeLongestConstructor( + new ClassAliasResolver(TaskRunnerSuiteBase.Builder.class).resolveClass(builderStr), this.config); + + TaskRunnerSuiteBase suite = builder.setAppWorkPath(this.appWorkPath) + .setContainerMetrics(this.containerMetrics) + .setFileSystem(this.fs) + .setHelixManager(this.helixManager).build(); + + this.taskStateModelFactory = createTaskStateModelFactory(suite.getTaskFactory()); + this.metrics = suite.getTaskMetrics(); + this.metricContext = suite.getMetricContext(); + this.services.addAll(suite.getServices()); + + this.services.addAll(getServices()); + if (this.services.isEmpty()) { this.serviceManager = null; } else { this.serviceManager = new ServiceManager(services); @@ -184,38 +191,6 @@ public class GobblinTaskRunner implements StandardMetricsBridge { applicationName, helixInstanceName, applicationId, taskRunnerId, config, appWorkDirOptional); } - private class TaskFactoryBuilder { - private final boolean isRunTaskInSeparateProcessEnabled; - private final TaskFactory taskFactory; - @Getter - private final MetricContext metricContext; - @Getter - private StandardMetricsBridge.StandardMetrics taskMetrics; - - public TaskFactoryBuilder(Config config) { - isRunTaskInSeparateProcessEnabled = getIsRunTaskInSeparateProcessEnabled(config); - metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), this.getClass()); - if (isRunTaskInSeparateProcessEnabled) { - logger.info("Running a task in a separate process is enabled."); - taskFactory = new HelixTaskFactory(GobblinTaskRunner.this.containerMetrics, CLUSTER_CONF_PATH, config); - taskMetrics = new GobblinTaskRunnerMetrics.JvmTaskRunnerMetrics(); - } else { - Properties properties = ConfigUtils.configToProperties(config); - TaskExecutor taskExecutor = new TaskExecutor(properties); - taskFactory = getInProcessTaskFactory(taskExecutor); - taskMetrics = new GobblinTaskRunnerMetrics.InProcessTaskRunnerMetrics(taskExecutor, metricContext); - } - } - - public TaskFactory build(){ - return taskFactory; - } - - private Boolean getIsRunTaskInSeparateProcessEnabled(Config config) { - return ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS, false); - } - } - private Path initAppWorkDir(Config config, Optional<Path> appWorkDirOptional) { return appWorkDirOptional.isPresent() ? appWorkDirOptional.get() : GobblinClusterUtils .getAppWorkDirPathFromConfig(config, this.fs, this.applicationName, this.applicationId); @@ -227,8 +202,7 @@ public class GobblinTaskRunner implements StandardMetricsBridge { logger.info("Using ZooKeeper connection string: " + zkConnectionString); this.helixManager = HelixManagerFactory.getZKHelixManager( - this.config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY), - this.helixInstanceName, InstanceType.PARTICIPANT, zkConnectionString); + this.clusterName, this.helixInstanceName, InstanceType.PARTICIPANT, zkConnectionString); } private TaskStateModelFactory createTaskStateModelFactory(TaskFactory factory) { @@ -242,35 +216,6 @@ public class GobblinTaskRunner implements StandardMetricsBridge { return taskStateModelFactory; } - private TaskFactory getInProcessTaskFactory(TaskExecutor taskExecutor) { - Properties properties = ConfigUtils.configToProperties(this.config); - URI rootPathUri = PathUtils.getRootPath(this.appWorkPath).toUri(); - Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties) - .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, - ConfigValueFactory.fromAnyRef(rootPathUri.toString())); - - TaskStateTracker taskStateTracker = new GobblinHelixTaskStateTracker(properties); - - services.add(taskExecutor); - services.add(taskStateTracker); - services.add(new JMXReportingService( - ImmutableMap.of("task.executor", taskExecutor.getTaskExecutorQueueMetricSet()))); - - TaskFactory taskFactory = - new GobblinHelixTaskFactory(this.containerMetrics, taskExecutor, taskStateTracker, this.fs, - this.appWorkPath, stateStoreJobConfig, this.helixManager); - return taskFactory; - } - - private Boolean getIsRunTaskInSeparateProcessEnabled() { - Boolean enabled = false; - if (this.config.hasPath(GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS)) { - enabled = - this.config.getBoolean(GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS); - } - return enabled; - } - private Config saveConfigToFile(Config config) throws IOException { Config newConf = config @@ -292,6 +237,8 @@ public class GobblinTaskRunner implements StandardMetricsBridge { connectHelixManager(); + addInstanceTags(); + // Start metric reporting if (this.containerMetrics.isPresent()) { this.containerMetrics.get() @@ -373,6 +320,18 @@ public class GobblinTaskRunner implements StandardMetricsBridge { } /** + * Helix participant cannot pre-configure tags before it connects to ZK. So this method can only be invoked after + * {@link HelixManager#connect()}. However this will still work because tagged jobs won't be sent to a non-tagged instance. Hence + * the job with EXAMPLE_INSTANCE_TAG will remain in the ZK until an instance with EXAMPLE_INSTANCE_TAG was found. + */ + private void addInstanceTags() { + if (this.helixManager.isConnected()) { + List<String> tags = ConfigUtils.getStringList(this.config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY); + tags.forEach(tag -> helixManager.getClusterManagmentTool().addInstanceTag(this.clusterName, this.helixInstanceName, tag)); + } + } + + /** * Creates and returns a {@link MessageHandlerFactory} for handling of Helix * {@link org.apache.helix.model.Message.MessageType#USER_DEFINE_MSG}s. * http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java new file mode 100644 index 0000000..080adb0 --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteBase.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.helix.HelixManager; +import org.apache.helix.task.TaskFactory; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Service; +import com.typesafe.config.Config; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.instrumented.StandardMetricsBridge; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.util.ConfigUtils; + +/** + * This suite class contains multiple components used by {@link GobblinTaskRunner}. + * Here is the list of components it contains: + * A {@link TaskFactory} : register Helix task state model. + * A {@link MetricContext} : create task related metrics. + * A {@link StandardMetricsBridge.StandardMetrics} : report task metrics. + * A list of {@link Service} : register any runtime services necessary to run the tasks. + */ +@Slf4j +public abstract class TaskRunnerSuiteBase { + protected TaskFactory taskFactory; + protected MetricContext metricContext; + protected StandardMetricsBridge.StandardMetrics taskMetrics; + protected List<Service> services = Lists.newArrayList(); + + protected TaskRunnerSuiteBase(Builder builder) { + this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(builder.config), this.getClass()); + } + + protected MetricContext getMetricContext() { + return this.metricContext; + } + + protected abstract StandardMetricsBridge.StandardMetrics getTaskMetrics(); + + protected abstract TaskFactory getTaskFactory(); + + protected abstract List<Service> getServices(); + + @Getter + public static class Builder { + private Config config; + private HelixManager helixManager; + private Optional<ContainerMetrics> containerMetrics; + private FileSystem fs; + private Path appWorkPath; + + public Builder(Config config) { + this.config = config; + } + + public Builder setHelixManager(HelixManager manager) { + this.helixManager = manager; + return this; + } + + public Builder setContainerMetrics(Optional<ContainerMetrics> containerMetrics) { + this.containerMetrics = containerMetrics; + return this; + } + + public Builder setFileSystem(FileSystem fs) { + this.fs = fs; + return this; + } + + public Builder setAppWorkPath(Path appWorkPath) { + this.appWorkPath = appWorkPath; + return this; + } + + public TaskRunnerSuiteBase build() { + if (getIsRunTaskInSeparateProcessEnabled(config)) { + return new TaskRunnerSuiteProcessModel(this); + } else { + return new TaskRunnerSuiteThreadModel(this); + } + } + + private Boolean getIsRunTaskInSeparateProcessEnabled(Config config) { + return ConfigUtils.getBoolean(config, GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS, false); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java new file mode 100644 index 0000000..f54223f --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteProcessModel.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.util.List; + +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskFactory; + +import com.google.common.util.concurrent.Service; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.instrumented.StandardMetricsBridge; + +/** + * A sub-type of {@link TaskRunnerSuiteBase} suite which runs all tasks in separate JVMs. + * + * Please refer to {@link HelixTaskFactory#createNewTask(TaskCallbackContext)}. + */ +@Slf4j +class TaskRunnerSuiteProcessModel extends TaskRunnerSuiteBase { + + TaskRunnerSuiteProcessModel(TaskRunnerSuiteBase.Builder builder) { + super(builder); + log.info("Running a task in a separate process is enabled."); + taskFactory = new HelixTaskFactory(builder.getContainerMetrics(), + GobblinTaskRunner.CLUSTER_CONF_PATH, + builder.getConfig()); + taskMetrics = new GobblinTaskRunnerMetrics.JvmTaskRunnerMetrics(); + } + + @Override + protected StandardMetricsBridge.StandardMetrics getTaskMetrics() { + return this.taskMetrics; + } + + @Override + protected TaskFactory getTaskFactory() { + return this.taskFactory; + } + + @Override + protected List<Service> getServices() { + return this.services; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java new file mode 100644 index 0000000..fefa5b6 --- /dev/null +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/TaskRunnerSuiteThreadModel.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.net.URI; +import java.util.List; +import java.util.Properties; + +import org.apache.helix.task.TaskFactory; + +import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.Service; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.instrumented.StandardMetricsBridge; +import org.apache.gobblin.runtime.TaskExecutor; +import org.apache.gobblin.runtime.TaskStateTracker; +import org.apache.gobblin.runtime.services.JMXReportingService; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.PathUtils; + +/** + * A sub-type of {@link TaskRunnerSuiteBase} suite which runs all tasks in a thread pool. + */ +class TaskRunnerSuiteThreadModel extends TaskRunnerSuiteBase { + private final TaskExecutor taskExecutor; + + TaskRunnerSuiteThreadModel(TaskRunnerSuiteBase.Builder builder) { + super(builder); + this.taskExecutor = new TaskExecutor(ConfigUtils.configToProperties(builder.getConfig())); + this.taskFactory = getInProcessTaskFactory(taskExecutor, builder); + this.taskMetrics = new GobblinTaskRunnerMetrics.InProcessTaskRunnerMetrics(taskExecutor, metricContext); + } + + @Override + protected StandardMetricsBridge.StandardMetrics getTaskMetrics() { + return this.taskMetrics; + } + + @Override + protected TaskFactory getTaskFactory() { + return this.taskFactory; + } + + @Override + protected List<Service> getServices() { + return this.services; + } + + private TaskFactory getInProcessTaskFactory(TaskExecutor taskExecutor, Builder builder) { + Properties properties = ConfigUtils.configToProperties(builder.getConfig()); + URI rootPathUri = PathUtils.getRootPath(builder.getAppWorkPath()).toUri(); + Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(properties) + .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, + ConfigValueFactory.fromAnyRef(rootPathUri.toString())); + + TaskStateTracker taskStateTracker = new GobblinHelixTaskStateTracker(properties); + + services.add(taskExecutor); + services.add(taskStateTracker); + services.add(new JMXReportingService( + ImmutableMap.of("task.executor", taskExecutor.getTaskExecutorQueueMetricSet()))); + + TaskFactory taskFactory = + new GobblinHelixTaskFactory(builder.getContainerMetrics(), + taskExecutor, + taskStateTracker, + builder.getFs(), + builder.getAppWorkPath(), + stateStoreJobConfig, + builder.getHelixManager()); + return taskFactory; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java index 58a5210..408ebe2 100644 --- a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/ClusterIntegrationTest.java @@ -15,239 +15,59 @@ * limitations under the License. */ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.apache.gobblin.cluster; -import java.io.File; import java.io.IOException; -import java.io.InputStream; -import java.net.URL; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; -import org.apache.commons.io.FileUtils; -import org.apache.curator.test.TestingServer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; import org.testng.annotations.Test; -import com.google.common.base.Optional; -import com.google.common.io.Resources; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - -import org.apache.gobblin.testing.AssertWithBackoff; +import org.apache.gobblin.cluster.suite.IntegrationBasicSuite; +import org.apache.gobblin.cluster.suite.IntegrationDedicatedManagerClusterSuite; +import org.apache.gobblin.cluster.suite.IntegrationJobTagSuite; +import org.apache.gobblin.cluster.suite.IntegrationSeparateProcessSuite; public class ClusterIntegrationTest { - public final static Logger _logger = LoggerFactory.getLogger(ClusterIntegrationTest.class); - public static final String JOB_CONF_NAME = "HelloWorldJob.conf"; - Config _config; - private Path _workPath; - private Path _jobConfigPath; - private Path _jobOutputBasePath; - private URL _jobConfResourceUrl; - private TestingServer _testingZKServer; - private GobblinTaskRunner _worker; - private GobblinClusterManager _manager; - private boolean _runTaskInSeparateProcess; - private boolean _dedicatedClusterManager = false; + private IntegrationBasicSuite suite; @Test - public void simpleJobShouldComplete() throws Exception { - runSimpleJobAndVerifyResult(); + public void testJobShouldComplete() throws Exception { + this.suite = new IntegrationBasicSuite(); + runAndVerify(); } @Test - public void simpleJobShouldCompleteInTaskIsolationMode() + public void testSeparateProcessMode() throws Exception { - _runTaskInSeparateProcess = true; - runSimpleJobAndVerifyResult(); + this.suite = new IntegrationSeparateProcessSuite(); + runAndVerify(); } @Test - public void dedicatedManagerClusterMode() + public void testDedicatedManagerCluster() throws Exception { - _dedicatedClusterManager = true; - runSimpleJobAndVerifyResult(); + this.suite = new IntegrationDedicatedManagerClusterSuite(); + runAndVerify(); } - private void runSimpleJobAndVerifyResult() + @Test + public void testJobWithTag() throws Exception { - init(); - startCluster(); - waitForAndVerifyOutputFiles(); - shutdownCluster(); - } - - private void init() throws Exception { - initWorkDir(); - initZooKeeper(); - initConfig(); - initJobConfDir(); - initJobOutputDir(); - } - - private void initWorkDir() throws IOException { - // Relative to the current directory - _workPath = Paths.get("gobblin-integration-test-work-dir"); - _logger.info("Created a new work directory: " + _workPath.toAbsolutePath()); - - // Delete the working directory in case the previous test fails to delete the directory - // e.g. when the test was killed forcefully under a debugger. - deleteWorkDir(); - Files.createDirectory(_workPath); - } - - private void initJobConfDir() throws IOException { - String jobConfigDir = _config.getString(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY); - _jobConfigPath = Paths.get(jobConfigDir); - Files.createDirectories(_jobConfigPath); - _jobConfResourceUrl = Resources.getResource(JOB_CONF_NAME); - copyJobConfFromResource(); - } - - private void initJobOutputDir() throws IOException { - _jobOutputBasePath = Paths.get(_workPath + "/job-output"); - Files.createDirectory(_jobOutputBasePath); - } - - private void copyJobConfFromResource() throws IOException { - try (InputStream resourceStream = _jobConfResourceUrl.openStream()) { - File targetFile = new File(_jobConfigPath + "/" + JOB_CONF_NAME); - FileUtils.copyInputStreamToFile(resourceStream, targetFile); - } + this.suite = new IntegrationJobTagSuite(); + runAndVerify(); } - private void initZooKeeper() throws Exception { - _testingZKServer = new TestingServer(false); - _logger.info( - "Created testing ZK Server. Connection string : " + _testingZKServer.getConnectString()); - } - - private void initConfig() { - Config configFromResource = getConfigFromResource(); - Config configOverride = getConfigOverride(); - _config = configOverride.withFallback(configFromResource).resolve(); - } - - private Config getConfigOverride() { - Map<String, String> configMap = new HashMap<>(); - String zkConnectionString = _testingZKServer.getConnectString(); - configMap.put(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY, zkConnectionString); - configMap.put(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR, _workPath.toString()); - if (_runTaskInSeparateProcess) { - configMap.put(GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS, "true"); - } - if (_dedicatedClusterManager) { - configMap.put(GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED, "true"); - configMap.put(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY, "ManagerCluster"); - } - Config config = ConfigFactory.parseMap(configMap); - return config; - } - - private Config getConfigFromResource() { - URL url = Resources.getResource("BasicCluster.conf"); - Config config = ConfigFactory.parseURL(url); - return config; + private void runAndVerify() + throws Exception { + suite.startCluster(); + suite.waitForAndVerifyOutputFiles(); + suite.shutdownCluster(); } @AfterMethod public void tearDown() throws IOException { - deleteWorkDir(); - } - - private void deleteWorkDir() throws IOException { - if ((_workPath != null) && Files.exists(_workPath)) { - FileUtils.deleteDirectory(_workPath.toFile()); - } - } - - private void createHelixCluster() throws Exception { - String zkConnectionString = _config - .getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); - String helix_cluster_name = _config - .getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY); - HelixUtils.createGobblinHelixCluster(zkConnectionString, helix_cluster_name); - - if (_dedicatedClusterManager) { - String manager_cluster_name = _config - .getString(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY); - HelixUtils.createGobblinHelixCluster(zkConnectionString, manager_cluster_name); - } - } - - private void startCluster() throws Exception { - _testingZKServer.start(); - createHelixCluster(); - startWorker(); - startManager(); - } - - private void startWorker() throws Exception { - _worker = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, "Worker", - TestHelper.TEST_APPLICATION_ID, "1", - _config, Optional.absent()); - - // Need to run in another thread since the start call will not return until the stop method - // is called. - Thread workerThread = new Thread(_worker::start); - workerThread.start(); - } - - private void startManager() throws Exception { - _manager = new GobblinClusterManager(TestHelper.TEST_APPLICATION_NAME, - TestHelper.TEST_APPLICATION_ID, - _config, Optional.absent()); - - _manager.start(); - } - - private void shutdownCluster() throws InterruptedException, IOException { - _worker.stop(); - _manager.stop(); - _testingZKServer.close(); - } - - private void waitForAndVerifyOutputFiles() throws Exception { - - AssertWithBackoff asserter = AssertWithBackoff.create().logger(_logger).timeoutMs(60_000) - .maxSleepMs(100).backoffFactor(1.5); - - asserter.assertTrue(this::hasExpectedFilesBeenCreated, "Waiting for job-completion"); - } - - private boolean hasExpectedFilesBeenCreated(Void input) { - int numOfFiles = getNumOfOutputFiles(_jobOutputBasePath); - return numOfFiles == 1; - } - - private int getNumOfOutputFiles(Path jobOutputDir) { - Collection<File> outputFiles = FileUtils - .listFiles(jobOutputDir.toFile(), new String[]{"txt"}, true); - return outputFiles.size(); + this.suite.deleteWorkDir(); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java new file mode 100644 index 0000000..5f3b2fe --- /dev/null +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TaskRunnerSuiteForJobTagTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import java.util.List; +import java.util.Map; + +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskFactory; +import org.testng.Assert; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.cluster.suite.IntegrationJobTagSuite; +import org.apache.gobblin.configuration.ConfigurationKeys; + +/** + * A special {@link TaskRunnerSuiteBase} which can verify if the worker gets the correct jobs based on the tag association. + */ +@Slf4j +public class TaskRunnerSuiteForJobTagTest extends TaskRunnerSuiteThreadModel { + private TaskFactory jobTagTestFactory; + private String instanceName; + + public TaskRunnerSuiteForJobTagTest(IntegrationJobTagSuite.JobTagTaskRunnerSuiteBuilder builder) { + super(builder); + this.instanceName = builder.getInstanceName(); + this.jobTagTestFactory = new JobTagTestFactory(this.taskFactory); + } + + @Override + protected TaskFactory getTaskFactory() { + return this.jobTagTestFactory; + } + + + public class JobTagTestFactory implements TaskFactory { + private TaskFactory factory; + public JobTagTestFactory(TaskFactory factory) { + this.factory = factory; + } + + @Override + public Task createNewTask(TaskCallbackContext context) { + Map<String, String> configMap = context.getTaskConfig().getConfigMap(); + String jobName = configMap.get(ConfigurationKeys.JOB_NAME_KEY); + List<String> allowedJobNames = IntegrationJobTagSuite.EXPECTED_JOB_NAMES.get(TaskRunnerSuiteForJobTagTest.this.instanceName); + if (allowedJobNames.contains(jobName)) { + log.info("{} has job name {}", instanceName, jobName); + } else { + Assert.fail(instanceName + " should not receive " + jobName); + } + return this.factory.createNewTask(context); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java new file mode 100644 index 0000000..bab8b30 --- /dev/null +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster.suite; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.io.FileUtils; +import org.apache.curator.test.TestingServer; +import org.assertj.core.util.Lists; + +import com.google.common.base.Optional; +import com.google.common.io.Resources; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.cluster.ClusterIntegrationTest; +import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; +import org.apache.gobblin.cluster.GobblinClusterManager; +import org.apache.gobblin.cluster.GobblinTaskRunner; +import org.apache.gobblin.cluster.HelixUtils; +import org.apache.gobblin.cluster.TestHelper; +import org.apache.gobblin.testing.AssertWithBackoff; + +/** + * A test suite used for {@link ClusterIntegrationTest#testJobShouldComplete()} + * + * This basic suite class provides utilities to launch one manager and multiple workers (participants). + * User can override {@link IntegrationBasicSuite#getWorkerConfigs()} for worker customization. + * User can also override {@link IntegrationBasicSuite#waitForAndVerifyOutputFiles()} to check different successful condition. + */ +@Slf4j +public class IntegrationBasicSuite { + public static final String JOB_CONF_NAME = "HelloWorldJob.conf"; + public static final String WORKER_INSTANCE_0 = "WorkerInstance_0"; + + // manager and workers + protected Config managerConfig; + protected Collection<Config> workerConfigs = Lists.newArrayList(); + protected Collection<GobblinTaskRunner> workers = Lists.newArrayList(); + protected GobblinClusterManager manager; + + protected Path workPath; + protected Path jobConfigPath; + protected Path jobOutputBasePath; + protected URL jobConfResourceUrl; + protected TestingServer testingZKServer; + + public IntegrationBasicSuite() { + try { + initWorkDir(); + initJobOutputDir(); + initZooKeeper(); + initConfig(); + initJobConfDir(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void initConfig() { + this.managerConfig = this.getManagerConfig(); + this.workerConfigs = this.getWorkerConfigs(); + } + + private void initZooKeeper() throws Exception { + this.testingZKServer = new TestingServer(false); + log.info( + "Created testing ZK Server. Connection string : " + testingZKServer.getConnectString()); + } + + private void initJobConfDir() throws IOException { + String jobConfigDir = this.managerConfig.getString(GobblinClusterConfigurationKeys.JOB_CONF_PATH_KEY); + this.jobConfigPath = Paths.get(jobConfigDir); + Files.createDirectories(this.jobConfigPath); + this.jobConfResourceUrl = Resources.getResource(JOB_CONF_NAME); + copyJobConfFromResource(); + } + + private void initJobOutputDir() throws IOException { + this.jobOutputBasePath = Paths.get(this.workPath + "/job-output"); + Files.createDirectory(this.jobOutputBasePath); + } + + private void initWorkDir() throws IOException { + // Relative to the current directory + this.workPath = Paths.get("gobblin-integration-test-work-dir"); + log.info("Created a new work directory: " + this.workPath.toAbsolutePath()); + + // Delete the working directory in case the previous test fails to delete the directory + // e.g. when the test was killed forcefully under a debugger. + deleteWorkDir(); + Files.createDirectory(this.workPath); + } + + public void deleteWorkDir() throws IOException { + if ((this.workPath != null) && Files.exists(this.workPath)) { + FileUtils.deleteDirectory(this.workPath.toFile()); + } + } + + protected void copyJobConfFromResource() throws IOException { + try (InputStream resourceStream = this.jobConfResourceUrl.openStream()) { + File targetFile = new File(this.jobConfigPath + "/" + JOB_CONF_NAME); + FileUtils.copyInputStreamToFile(resourceStream, targetFile); + } + } + + private Config getClusterConfig() { + URL url = Resources.getResource("BasicCluster.conf"); + Config config = ConfigFactory.parseURL(url); + + Map<String, String> configMap = new HashMap<>(); + String zkConnectionString = this.testingZKServer.getConnectString(); + configMap.put(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY, zkConnectionString); + configMap.put(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR, this.workPath.toString()); + Config overrideConfig = ConfigFactory.parseMap(configMap); + + return overrideConfig.withFallback(config); + } + + protected Config getManagerConfig() { + // manager config initialization + URL url = Resources.getResource("BasicManager.conf"); + Config managerConfig = ConfigFactory.parseURL(url); + managerConfig = managerConfig.withFallback(getClusterConfig()); + return managerConfig.resolve(); + } + + protected Collection<Config> getWorkerConfigs() { + // worker config initialization + URL url = Resources.getResource("BasicWorker.conf"); + Config workerConfig = ConfigFactory.parseURL(url); + workerConfig = workerConfig.withFallback(getClusterConfig()); + return Lists.newArrayList(workerConfig.resolve()); + } + + public void waitForAndVerifyOutputFiles() throws Exception { + AssertWithBackoff asserter = AssertWithBackoff.create().logger(log).timeoutMs(60_000) + .maxSleepMs(100).backoffFactor(1.5); + + asserter.assertTrue(this::hasExpectedFilesBeenCreated, "Waiting for job-completion"); + } + + protected boolean hasExpectedFilesBeenCreated(Void input) { + int numOfFiles = getNumOfOutputFiles(this.jobOutputBasePath); + return numOfFiles == 1; + } + + protected int getNumOfOutputFiles(Path jobOutputDir) { + Collection<File> outputFiles = FileUtils + .listFiles(jobOutputDir.toFile(), new String[]{"txt"}, true); + return outputFiles.size(); + } + + public void startCluster() throws Exception { + this.testingZKServer.start(); + createHelixCluster(); + startWorker(); + startManager(); + } + + private void startManager() throws Exception { + this.manager = new GobblinClusterManager(TestHelper.TEST_APPLICATION_NAME, + TestHelper.TEST_APPLICATION_ID, + this.managerConfig, Optional.absent()); + + this.manager.start(); + } + + protected void startWorker() throws Exception { + this.workers.add(new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, WORKER_INSTANCE_0, + TestHelper.TEST_APPLICATION_ID, "1", + this.workerConfigs.iterator().next(), Optional.absent())); + + // Need to run in another thread since the start call will not return until the stop method + // is called. + Thread workerThread = new Thread(this.workers.iterator().next()::start); + workerThread.start(); + } + + public void shutdownCluster() throws InterruptedException, IOException { + workers.forEach(runner->runner.stop()); + this.manager.stop(); + this.testingZKServer.close(); + } + + protected void createHelixCluster() throws Exception { + String zkConnectionString = this.managerConfig + .getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); + String helix_cluster_name = this.managerConfig + .getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY); + HelixUtils.createGobblinHelixCluster(zkConnectionString, helix_cluster_name); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java new file mode 100644 index 0000000..22e605d --- /dev/null +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationDedicatedManagerClusterSuite.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.gobblin.cluster.suite; + +import java.util.HashMap; +import java.util.Map; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.cluster.ClusterIntegrationTest; +import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; +import org.apache.gobblin.cluster.HelixUtils; + +/** + * A test suite used for {@link ClusterIntegrationTest#testDedicatedManagerCluster()} + */ +public class IntegrationDedicatedManagerClusterSuite extends IntegrationBasicSuite { + + @Override + public void createHelixCluster() throws Exception { + super.createHelixCluster(); + String zkConnectionString = managerConfig + .getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY); + String manager_cluster_name = managerConfig + .getString(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY); + HelixUtils.createGobblinHelixCluster(zkConnectionString, manager_cluster_name); + } + + @Override + protected Config getManagerConfig() { + Map<String, String> configMap = new HashMap<>(); + configMap.put(GobblinClusterConfigurationKeys.DEDICATED_MANAGER_CLUSTER_ENABLED, "true"); + configMap.put(GobblinClusterConfigurationKeys.MANAGER_CLUSTER_NAME_KEY, "ManagerCluster"); + Config config = ConfigFactory.parseMap(configMap); + return config.withFallback(super.getManagerConfig()).resolve(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java new file mode 100644 index 0000000..adaf702 --- /dev/null +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationJobTagSuite.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster.suite; + +import java.io.DataOutputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.io.Reader; +import java.io.Writer; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.testng.collections.Lists; + +import com.google.common.base.Charsets; +import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; +import com.typesafe.config.ConfigRenderOptions; +import com.typesafe.config.ConfigSyntax; + +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.annotation.Alias; +import org.apache.gobblin.cluster.ClusterIntegrationTest; +import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; +import org.apache.gobblin.cluster.GobblinTaskRunner; +import org.apache.gobblin.cluster.TaskRunnerSuiteBase; +import org.apache.gobblin.cluster.TaskRunnerSuiteForJobTagTest; +import org.apache.gobblin.cluster.TestHelper; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.testing.AssertWithBackoff; + +/** + * A test suite used for {@link ClusterIntegrationTest#testJobWithTag()} + * + * Each worker instance will have the tags it can accept. + * Each job is associated with a specific tag. + * Each job will always go to certain workers as expected due to the tag association. + */ +@Slf4j +public class IntegrationJobTagSuite extends IntegrationBasicSuite { + private static final String WORKER_INSTANCE_NAME_KEY = "worker.instance.name"; + private static final String WORKER_INSTANCE_1 = "WorkerInstance_1"; + private static final String WORKER_INSTANCE_2 = "WorkerInstance_2"; + private static final String WORKER_INSTANCE_3 = "WorkerInstance_3"; + + private static final Map<String, List<String>> WORKER_TAG_ASSOCIATION = ImmutableMap.of( + WORKER_INSTANCE_1, ImmutableList.of("T2", "T7", "T8"), + WORKER_INSTANCE_2, ImmutableList.of("T4", "T5", "T6"), + WORKER_INSTANCE_3, ImmutableList.of("T1", "T3")); + + private static final Map<String, String> JOB_TAG_ASSOCIATION = ImmutableMap.<String, String>builder() + .put("jobHello_1", "T2") + .put("jobHello_2", "T4") + .put("jobHello_3", "T5") + .put("jobHello_4", "T6") + .put("jobHello_5", "T7") + .put("jobHello_6", "T8") + .put("jobHello_7", "T1") + .put("jobHello_8", "T3") + .build(); + + public static final Map<String, List<String>> EXPECTED_JOB_NAMES = ImmutableMap.of( + WORKER_INSTANCE_1, ImmutableList.of("jobHello_1", "jobHello_5", "jobHello_6"), + WORKER_INSTANCE_2, ImmutableList.of("jobHello_2", "jobHello_3", "jobHello_4"), + WORKER_INSTANCE_3, ImmutableList.of("jobHello_7", "jobHello_8")); + + private Config addInstanceTags(Config workerConfig, String instanceName, List<String> tags) { + Map<String, String> configMap = new HashMap<>(); + if (tags!= null && tags.size() > 0) { + configMap.put(GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, Joiner.on(',').join(tags)); + configMap.put(WORKER_INSTANCE_NAME_KEY, instanceName); + } + return ConfigFactory.parseMap(configMap).withFallback(workerConfig); + } + + @Override + public Collection<Config> getWorkerConfigs() { + Config parent = super.getWorkerConfigs().iterator().next(); + Config worker_1 = addInstanceTags(parent, WORKER_INSTANCE_1, WORKER_TAG_ASSOCIATION.get(WORKER_INSTANCE_1)); + Config worker_2 = addInstanceTags(parent, WORKER_INSTANCE_2, WORKER_TAG_ASSOCIATION.get(WORKER_INSTANCE_2)); + Config worker_3 = addInstanceTags(parent, WORKER_INSTANCE_3, WORKER_TAG_ASSOCIATION.get(WORKER_INSTANCE_3)); + worker_1 = addTaskRunnerSuiteBuilder(worker_1); + worker_2 = addTaskRunnerSuiteBuilder(worker_2); + worker_3 = addTaskRunnerSuiteBuilder(worker_3); + return Lists.newArrayList(worker_1, worker_2, worker_3); + } + + private Config addTaskRunnerSuiteBuilder(Config workerConfig) { + return ConfigFactory.parseMap(ImmutableMap.of(GobblinClusterConfigurationKeys.TASK_RUNNER_SUITE_BUILDER, "JobTagTaskRunnerSuiteBuilder")).withFallback(workerConfig); + } + + @Override + protected void startWorker() throws Exception { + // Each workerConfig corresponds to a worker instance + for (Config workerConfig: this.workerConfigs) { + GobblinTaskRunner runner = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, workerConfig.getString(WORKER_INSTANCE_NAME_KEY), + TestHelper.TEST_APPLICATION_ID, "1", + workerConfig, Optional.absent()); + this.workers.add(runner); + + // Need to run in another thread since the start call will not return until the stop method + // is called. + Thread workerThread = new Thread(runner::start); + workerThread.start(); + } + } + + /** + * Create different jobs with different tags + */ + @Override + protected void copyJobConfFromResource() throws IOException { + try (InputStream resourceStream = this.jobConfResourceUrl.openStream()) { + Reader reader = new InputStreamReader(resourceStream); + Config jobConfig = ConfigFactory.parseReader(reader, ConfigParseOptions.defaults().setSyntax(ConfigSyntax.CONF)); + for(Map.Entry<String, String> assoc: JOB_TAG_ASSOCIATION.entrySet()) { + generateJobConf(jobConfig,assoc.getKey(),assoc.getValue()); + } + } + } + + private void generateJobConf(Config jobConfig, String jobName, String tag) throws IOException { + Config newConfig = addJobTag(jobConfig, tag); + newConfig = getConfigOverride(newConfig, jobName); + + String targetPath = this.jobConfigPath + "/" + jobName + ".conf"; + String renderedConfig = newConfig.root().render(ConfigRenderOptions.defaults()); + try (DataOutputStream os = new DataOutputStream(new FileOutputStream(targetPath)); + Writer writer = new OutputStreamWriter(os, Charsets.UTF_8)) { + writer.write(renderedConfig); + } + } + + private Config getConfigOverride(Config config, String jobName) { + Config newConfig = ConfigFactory.parseMap(ImmutableMap.of( + ConfigurationKeys.JOB_NAME_KEY, jobName, + ConfigurationKeys.DATA_PUBLISHER_FINAL_DIR, this.jobOutputBasePath + "/" + jobName)) + .withFallback(config); + return newConfig; + } + + private Config addJobTag(Config jobConfig, String jobTag) { + return ConfigFactory.parseMap(ImmutableMap.of(GobblinClusterConfigurationKeys.HELIX_JOB_TAG_KEY, jobTag)) + .withFallback(jobConfig); + } + + @Override + public void waitForAndVerifyOutputFiles() throws Exception { + AssertWithBackoff asserter = AssertWithBackoff.create().logger(log).timeoutMs(60_000) + .maxSleepMs(100).backoffFactor(1.5); + + asserter.assertTrue(this::hasExpectedFilesBeenCreated, "Waiting for job-completion"); + } + + @Override + protected boolean hasExpectedFilesBeenCreated(Void input) { + int numOfFiles = getNumOfOutputFiles(this.jobOutputBasePath); + return numOfFiles == JOB_TAG_ASSOCIATION.size(); + } + + @Alias("JobTagTaskRunnerSuiteBuilder") + public static class JobTagTaskRunnerSuiteBuilder extends TaskRunnerSuiteBase.Builder { + @Getter + private String instanceName; + public JobTagTaskRunnerSuiteBuilder(Config config) { + super(config); + this.instanceName = config.getString(IntegrationJobTagSuite.WORKER_INSTANCE_NAME_KEY); + } + + @Override + public TaskRunnerSuiteBase build() { + return new TaskRunnerSuiteForJobTagTest(this); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationSeparateProcessSuite.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationSeparateProcessSuite.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationSeparateProcessSuite.java new file mode 100644 index 0000000..9969304 --- /dev/null +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationSeparateProcessSuite.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.gobblin.cluster.suite; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import com.google.common.collect.Lists; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.cluster.ClusterIntegrationTest; +import org.apache.gobblin.cluster.GobblinClusterConfigurationKeys; + +/** + * A test suite used for {@link ClusterIntegrationTest#testSeparateProcessMode()} + */ +public class IntegrationSeparateProcessSuite extends IntegrationBasicSuite { + + @Override + protected Collection<Config> getWorkerConfigs() { + Map<String, String> configMap = new HashMap<>(); + configMap.put(GobblinClusterConfigurationKeys.ENABLE_TASK_IN_SEPARATE_PROCESS, "true"); + Config config = ConfigFactory.parseMap(configMap); + Config parent = super.getWorkerConfigs().iterator().next(); + return Lists.newArrayList(config.withFallback(parent).resolve()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/resources/BasicCluster.conf ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/resources/BasicCluster.conf b/gobblin-cluster/src/test/resources/BasicCluster.conf index 92016f8..4cd2b11 100644 --- a/gobblin-cluster/src/test/resources/BasicCluster.conf +++ b/gobblin-cluster/src/test/resources/BasicCluster.conf @@ -20,7 +20,3 @@ gobblin.cluster.helix.cluster.name=BasicGobblinCluster gobblin.cluster.workDir=/tmp/gobblinClusterBasicTest/ gobblin.cluster.job.conf.path=${gobblin.cluster.workDir}/jobs gobblin.cluster.standaloneMode=true -gobblin.cluster.job.executeInSchedulingThread=false -gobblin.cluster.enableTaskInSeparateProcess=false -gobblin.cluster.task.jvm.options="-Xms10m -Xmx1g -XX:MinHeapFreeRatio=10 -XX:MaxHeapFreeRatio=20" -jobexecutor.threadpool.size=20 http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/resources/BasicManager.conf ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/resources/BasicManager.conf b/gobblin-cluster/src/test/resources/BasicManager.conf new file mode 100644 index 0000000..52601c8 --- /dev/null +++ b/gobblin-cluster/src/test/resources/BasicManager.conf @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Cluster / Helix configuration properties +gobblin.cluster.job.executeInSchedulingThread=false +gobblin.cluster.enableTaskInSeparateProcess=false +gobblin.cluster.task.jvm.options="-Xms10m -Xmx1g -XX:MinHeapFreeRatio=10 -XX:MaxHeapFreeRatio=20" +jobexecutor.threadpool.size=20 http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-cluster/src/test/resources/BasicWorker.conf ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/resources/BasicWorker.conf b/gobblin-cluster/src/test/resources/BasicWorker.conf new file mode 100644 index 0000000..23d18c0 --- /dev/null +++ b/gobblin-cluster/src/test/resources/BasicWorker.conf @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Cluster / Helix configuration properties +taskexecutor.threadpool.size=10 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e7bb4c40/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java index 9485f59..cf53d56 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/ForkThrowableHolder.java @@ -29,7 +29,7 @@ import lombok.extern.slf4j.Slf4j; /** - * An object whcih holds all {@link Throwable}s thrown by {@link org.apache.gobblin.runtime.fork.Fork}, so that other + * An object which holds all {@link Throwable}s thrown by {@link org.apache.gobblin.runtime.fork.Fork}, so that other * Gobblin components (like {@link Task}) can have access. */ @Slf4j
