Repository: incubator-gobblin
Updated Branches:
  refs/heads/master 178acbb47 -> 5d343e318


[GOBBLIN-324] Add the cluster working directory config

Currently, the appWorkDir value is passed to the
GobblinClusterManager
constructor and the GobblinTaskRunner constructor.
It's used to determine where the state files will
be stored.
The default launch scripts call the main methods
which pass in a
hardcoded "null" value and the code will take a
default value like
file:/Users/username/standalone_cluster/1

It's useful to specify this value via a
configuration.

When the config is not specified, the behavior is
the same as before.

Also add some examples in the sample config file.

Testing:

Add new unit tests.
Manually run the cluster with and without this
config.

Closes #2174 from HappyRay/add-work-dir-config


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5d343e31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5d343e31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5d343e31

Branch: refs/heads/master
Commit: 5d343e3188c99eb950ca4af0728769220e9caf5c
Parents: 178acbb
Author: Ray Yang <[email protected]>
Authored: Wed Nov 29 13:38:47 2017 -0800
Committer: Abhishek Tiwari <[email protected]>
Committed: Wed Nov 29 13:38:47 2017 -0800

----------------------------------------------------------------------
 conf/standalone/application.conf                | 16 ++++-
 .../GobblinClusterConfigurationKeys.java        |  4 +-
 .../gobblin/cluster/GobblinClusterManager.java  |  2 +-
 .../gobblin/cluster/GobblinClusterUtils.java    | 15 ++++-
 .../gobblin/cluster/GobblinTaskRunner.java      |  2 +-
 .../cluster/GobblinClusterUtilsTest.java        | 62 ++++++++++++++++++++
 6 files changed, 92 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/conf/standalone/application.conf
----------------------------------------------------------------------
diff --git a/conf/standalone/application.conf b/conf/standalone/application.conf
index fa601dd..e9b8323 100644
--- a/conf/standalone/application.conf
+++ b/conf/standalone/application.conf
@@ -16,10 +16,22 @@
 #
 
 # Sample configuration properties for the Gobblin Standalone cluster
+gobblin.cluster.workDir=${gobblin.cluster.work.dir}/GobblinStandaloneCluster
+
+# default is the JobConfigurationManager
+# use this manager to accept jobs from Kafka. It requires some additional 
Kafka related parameters.
+#gobblin.cluster.job.configuration.manager=org.apache.gobblin.cluster.StreamingJobConfigurationManager
+#spec.kafka.topics=ruyang_test_kafka_gobblin
+#kafka.brokers="hostname:12913/kafka-queuing"
+#jobSpecMonitor.kafka.zookeeper.connect="hostname:12913/kafka-queuing"
 
 # Cluster configuration properties
-gobblin.cluster.helix.cluster.name=GobblinStandaloneCluster
-gobblin.cluster.job.conf.path=<path where Gobblin job configuration file are 
located>
+gobblin.cluster.helix.cluster.name=GobblinStandaloneClusterCli
+
+# used by the JobConfigurationManager
+gobblin.cluster.job.conf.path=${gobblin.cluster.work.dir}/jobs
+
+gobblin.cluster.jobconf.fullyQualifiedPath=${gobblin.cluster.work.dir}/jobs
 
 # File system URIs
 writer.fs.uri=${fs.uri}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index ea75dc3..ab6f8b4 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -35,7 +35,7 @@ public class GobblinClusterConfigurationKeys {
   public static final String STANDALONE_CLUSTER_MODE = "standalone_cluster";
   public static final String STANDALONE_CLUSTER_MODE_KEY = 
GOBBLIN_CLUSTER_PREFIX + "standaloneMode";
   public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false;
-
+  public static final String CLUSTRER_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + 
"workDir";
 
   // Helix configuration properties.
   public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_CLUSTER_PREFIX + 
"helix.cluster.name";
@@ -80,4 +80,4 @@ public class GobblinClusterConfigurationKeys {
   public static final String STOP_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + 
"stopTimeoutSeconds";
   public static final long DEFAULT_STOP_TIMEOUT_SECONDS = 60;
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 8ced294..7948a8a 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -163,7 +163,7 @@ public class GobblinClusterManager implements 
ApplicationLauncher {
 
     this.fs = buildFileSystem(config);
     this.appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get()
-        : GobblinClusterUtils.getAppWorkDirPath(this.fs, clusterName, 
applicationId);
+        : GobblinClusterUtils.getAppWorkDirPathFromConfig(config, this.fs, 
clusterName, applicationId);
 
     initializeAppLauncherAndServices();
   }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
index a8a335a..3082720 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
@@ -17,14 +17,15 @@
 
 package org.apache.gobblin.cluster;
 
+import static 
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTRER_WORK_DIR;
+
+import com.typesafe.config.Config;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-
+import org.apache.gobblin.annotation.Alpha;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import org.apache.gobblin.annotation.Alpha;
-
 @Alpha
 public class GobblinClusterUtils {
 
@@ -51,6 +52,14 @@ public class GobblinClusterUtils {
     return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName, 
applicationId));
   }
 
+  public static Path getAppWorkDirPathFromConfig(Config config, FileSystem fs,
+      String applicationName, String applicationId) {
+    if (config.hasPath(CLUSTRER_WORK_DIR)) {
+      return new Path(config.getString(CLUSTRER_WORK_DIR));
+    }
+    return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName, 
applicationId));
+  }
+
   /**
    * Get the application working directory {@link String}.
    *

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index f86874b..1de9bb1 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -160,7 +160,7 @@ public class GobblinTaskRunner {
     TaskStateTracker taskStateTracker = new 
GobblinHelixTaskStateTracker(properties, this.helixManager);
 
     Path appWorkDir = appWorkDirOptional.isPresent() ? 
appWorkDirOptional.get() :
-        GobblinClusterUtils.getAppWorkDirPath(this.fs, applicationName, 
applicationId);
+        GobblinClusterUtils.getAppWorkDirPathFromConfig(config, this.fs, 
applicationName, applicationId);
 
     List<Service> services = Lists.newArrayList(taskExecutor, taskStateTracker,
         new JMXReportingService(ImmutableMap.of("task.executor" 
,taskExecutor.getTaskExecutorQueueMetricSet())));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5d343e31/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
new file mode 100644
index 0000000..4d83658
--- /dev/null
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.cluster;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.annotations.Test;
+
+public class GobblinClusterUtilsTest {
+
+  FileSystem fs = mock(FileSystem.class);
+
+  @Test
+  public void work_dir_should_get_value_from_config_when_specified() throws 
Exception {
+    Map<String, String> configMap = new HashMap<>();
+    configMap.put("gobblin.cluster.workDir", "/foo/bar");
+
+    Config config = ConfigFactory.parseMap(configMap);
+
+    Path workDirPath = GobblinClusterUtils
+        .getAppWorkDirPathFromConfig(config, fs, "appName", "appid");
+
+    assertEquals(new Path("/foo/bar"), workDirPath);
+
+  }
+
+  @Test
+  public void 
work_dir_should_get_default_calculated_value_when_not_specified() throws 
Exception {
+    Map<String, String> configMap = new HashMap<>();
+    Config config = ConfigFactory.parseMap(configMap);
+
+    when(fs.getHomeDirectory()).thenReturn(new Path("/home/"));
+
+    Path workDirPath = GobblinClusterUtils
+        .getAppWorkDirPathFromConfig(config, fs, "appName", "appid");
+
+    assertEquals(new Path("/home/appName/appid"), workDirPath);
+  }
+}

Reply via email to