Repository: incubator-gobblin Updated Branches: refs/heads/master 178acbb47 -> 5d343e318
[GOBBLIN-324] Add the cluster working directory config Currently, the appWorkDir value is passed to the GobblinClusterManager constructor and the GobblinTaskRunner constructor. It's used to determine where the state files will be stored. The default launch scripts call the main methods which pass in a hardcoded "null" value and the code will take a default value like file:/Users/username/standalone_cluster/1 It's useful to specify this value via a configuration. When the config is not specified, the behavior is the same as before. Also add some examples in the sample config file. Testing: Add new unit tests. Manually run the cluster with and without this config. Closes #2174 from HappyRay/add-work-dir-config Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5d343e31 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5d343e31 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5d343e31 Branch: refs/heads/master Commit: 5d343e3188c99eb950ca4af0728769220e9caf5c Parents: 178acbb Author: Ray Yang <[email protected]> Authored: Wed Nov 29 13:38:47 2017 -0800 Committer: Abhishek Tiwari <[email protected]> Committed: Wed Nov 29 13:38:47 2017 -0800 ---------------------------------------------------------------------- conf/standalone/application.conf | 16 ++++- .../GobblinClusterConfigurationKeys.java | 4 +- .../gobblin/cluster/GobblinClusterManager.java | 2 +- .../gobblin/cluster/GobblinClusterUtils.java | 15 ++++- .../gobblin/cluster/GobblinTaskRunner.java | 2 +- .../cluster/GobblinClusterUtilsTest.java | 62 ++++++++++++++++++++ 6 files changed, 92 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/conf/standalone/application.conf ---------------------------------------------------------------------- diff --git a/conf/standalone/application.conf b/conf/standalone/application.conf index fa601dd..e9b8323 100644 --- a/conf/standalone/application.conf +++ b/conf/standalone/application.conf @@ -16,10 +16,22 @@ # # Sample configuration properties for the Gobblin Standalone cluster +gobblin.cluster.workDir=${gobblin.cluster.work.dir}/GobblinStandaloneCluster + +# default is the JobConfigurationManager +# use this manager to accept jobs from Kafka. It requires some additional Kafka related parameters. +#gobblin.cluster.job.configuration.manager=org.apache.gobblin.cluster.StreamingJobConfigurationManager +#spec.kafka.topics=ruyang_test_kafka_gobblin +#kafka.brokers="hostname:12913/kafka-queuing" +#jobSpecMonitor.kafka.zookeeper.connect="hostname:12913/kafka-queuing" # Cluster configuration properties -gobblin.cluster.helix.cluster.name=GobblinStandaloneCluster -gobblin.cluster.job.conf.path=<path where Gobblin job configuration file are located> +gobblin.cluster.helix.cluster.name=GobblinStandaloneClusterCli + +# used by the JobConfigurationManager +gobblin.cluster.job.conf.path=${gobblin.cluster.work.dir}/jobs + +gobblin.cluster.jobconf.fullyQualifiedPath=${gobblin.cluster.work.dir}/jobs # File system URIs writer.fs.uri=${fs.uri} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index ea75dc3..ab6f8b4 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -35,7 +35,7 @@ public class GobblinClusterConfigurationKeys { public static final String STANDALONE_CLUSTER_MODE = "standalone_cluster"; public static final String STANDALONE_CLUSTER_MODE_KEY = GOBBLIN_CLUSTER_PREFIX + "standaloneMode"; public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false; - + public static final String CLUSTRER_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + "workDir"; // Helix configuration properties. public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + "helix.cluster.name"; @@ -80,4 +80,4 @@ public class GobblinClusterConfigurationKeys { public static final String STOP_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "stopTimeoutSeconds"; public static final long DEFAULT_STOP_TIMEOUT_SECONDS = 60; -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java index 8ced294..7948a8a 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java @@ -163,7 +163,7 @@ public class GobblinClusterManager implements ApplicationLauncher { this.fs = buildFileSystem(config); this.appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get() - : GobblinClusterUtils.getAppWorkDirPath(this.fs, clusterName, applicationId); + : GobblinClusterUtils.getAppWorkDirPathFromConfig(config, this.fs, clusterName, applicationId); initializeAppLauncherAndServices(); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java index a8a335a..3082720 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java @@ -17,14 +17,15 @@ package org.apache.gobblin.cluster; +import static org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTRER_WORK_DIR; + +import com.typesafe.config.Config; import java.net.InetAddress; import java.net.UnknownHostException; - +import org.apache.gobblin.annotation.Alpha; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.gobblin.annotation.Alpha; - @Alpha public class GobblinClusterUtils { @@ -51,6 +52,14 @@ public class GobblinClusterUtils { return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName, applicationId)); } + public static Path getAppWorkDirPathFromConfig(Config config, FileSystem fs, + String applicationName, String applicationId) { + if (config.hasPath(CLUSTRER_WORK_DIR)) { + return new Path(config.getString(CLUSTRER_WORK_DIR)); + } + return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName, applicationId)); + } + /** * Get the application working directory {@link String}. * http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java index f86874b..1de9bb1 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java @@ -160,7 +160,7 @@ public class GobblinTaskRunner { TaskStateTracker taskStateTracker = new GobblinHelixTaskStateTracker(properties, this.helixManager); Path appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get() : - GobblinClusterUtils.getAppWorkDirPath(this.fs, applicationName, applicationId); + GobblinClusterUtils.getAppWorkDirPathFromConfig(config, this.fs, applicationName, applicationId); List<Service> services = Lists.newArrayList(taskExecutor, taskStateTracker, new JMXReportingService(ImmutableMap.of("task.executor" ,taskExecutor.getTaskExecutorQueueMetricSet()))); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java new file mode 100644 index 0000000..4d83658 --- /dev/null +++ b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.cluster; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import java.util.HashMap; +import java.util.Map; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.testng.annotations.Test; + +public class GobblinClusterUtilsTest { + + FileSystem fs = mock(FileSystem.class); + + @Test + public void work_dir_should_get_value_from_config_when_specified() throws Exception { + Map<String, String> configMap = new HashMap<>(); + configMap.put("gobblin.cluster.workDir", "/foo/bar"); + + Config config = ConfigFactory.parseMap(configMap); + + Path workDirPath = GobblinClusterUtils + .getAppWorkDirPathFromConfig(config, fs, "appName", "appid"); + + assertEquals(new Path("/foo/bar"), workDirPath); + + } + + @Test + public void work_dir_should_get_default_calculated_value_when_not_specified() throws Exception { + Map<String, String> configMap = new HashMap<>(); + Config config = ConfigFactory.parseMap(configMap); + + when(fs.getHomeDirectory()).thenReturn(new Path("/home/")); + + Path workDirPath = GobblinClusterUtils + .getAppWorkDirPathFromConfig(config, fs, "appName", "appid"); + + assertEquals(new Path("/home/appName/appid"), workDirPath); + } +}
