This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c574dff  [GOBBLIN-1076] Make Gobblin cluster working directories 
configurable
c574dff is described below

commit c574dff7d4e66b126986510c08c55aab74b03964
Author: sv2000 <[email protected]>
AuthorDate: Tue Mar 10 17:23:14 2020 -0700

    [GOBBLIN-1076] Make Gobblin cluster working directories configurable
    
    Closes #2916 from sv2000/clusterWorkDir
---
 .../cluster/GobblinClusterConfigurationKeys.java   |  1 +
 .../gobblin/cluster/GobblinClusterManager.java     | 20 +--------
 .../gobblin/cluster/GobblinClusterUtils.java       | 32 +++++++++++++--
 .../apache/gobblin/cluster/GobblinTaskRunner.java  | 25 +++---------
 .../org/apache/gobblin/cluster/SingleTask.java     |  2 -
 .../apache/gobblin/cluster/SingleTaskRunner.java   |  4 +-
 .../gobblin/cluster/GobblinClusterUtilsTest.java   | 47 +++++++++++-----------
 .../org/apache/gobblin/cluster/TestSingleTask.java | 40 +++++++++---------
 .../cluster/suite/IntegrationBasicSuite.java       |  6 +--
 .../test/resources/_workunits/store/workunit.wu    |  0
 gobblin-cluster/src/test/resources/clusterConf     |  1 -
 .../gobblin/yarn/GobblinYarnAppLauncher.java       | 17 ++++----
 .../java/org/apache/gobblin/yarn/YarnService.java  |  2 +-
 13 files changed, 96 insertions(+), 101 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
index 7c27d20..7dbf7b1 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java
@@ -45,6 +45,7 @@ public class GobblinClusterConfigurationKeys {
   public static final String STANDALONE_CLUSTER_MODE = "standalone_cluster";
   public static final String STANDALONE_CLUSTER_MODE_KEY = 
GOBBLIN_CLUSTER_PREFIX + "standaloneMode";
   public static final boolean DEFAULT_STANDALONE_CLUSTER_MODE = false;
+  // Root working directory for Gobblin cluster
   public static final String CLUSTER_WORK_DIR = GOBBLIN_CLUSTER_PREFIX + 
"workDir";
 
   public static final String DISTRIBUTED_JOB_LAUNCHER_ENABLED = 
GOBBLIN_CLUSTER_PREFIX + "distributedJobLauncherEnabled";
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 2d967d6..8eabad1 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -18,7 +18,6 @@
 package org.apache.gobblin.cluster;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -72,7 +71,6 @@ import org.apache.gobblin.runtime.app.ApplicationLauncher;
 import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher;
 import org.apache.gobblin.scheduler.SchedulerService;
 import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.util.JobConfigurationUtils;
 import org.apache.gobblin.util.JvmUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
@@ -156,9 +154,10 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
 
     initializeHelixManager();
 
-    this.fs = buildFileSystem(config);
+    this.fs = GobblinClusterUtils.buildFileSystem(config, new Configuration());
     this.appWorkDir = appWorkDirOptional.isPresent() ? appWorkDirOptional.get()
         : GobblinClusterUtils.getAppWorkDirPathFromConfig(config, this.fs, 
clusterName, applicationId);
+    LOGGER.info("Configured GobblinClusterManager work dir to: {}", 
this.appWorkDir);
 
     initializeAppLauncherAndServices();
   }
@@ -358,21 +357,6 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
   }
 
   /**
-   * Build the {@link FileSystem} for the Application Master.
-   */
-  private FileSystem buildFileSystem(Config config) throws IOException {
-    Config hadoopOverrides = ConfigUtils.getConfigOrEmpty(config, 
GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX);
-
-    Configuration conf = new Configuration();
-    //Add any Hadoop-specific overrides into the Configuration object
-    
JobConfigurationUtils.putPropertiesIntoConfiguration(ConfigUtils.configToProperties(hadoopOverrides),
 conf);
-
-    return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
-        .get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
-        : FileSystem.get(conf);
-  }
-
-  /**
    * Build the {@link GobblinHelixJobScheduler} for the Application Master.
    */
   private GobblinHelixJobScheduler buildGobblinHelixJobScheduler(Config 
config, Path appWorkDir,
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
index e414074..a7e4ad8 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterUtils.java
@@ -17,9 +17,12 @@
 
 package org.apache.gobblin.cluster;
 
+import java.io.IOException;
 import java.net.InetAddress;
+import java.net.URI;
 import java.net.UnknownHostException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -28,11 +31,13 @@ import com.typesafe.config.Config;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.annotation.Alpha;
+import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.DynamicConfigGenerator;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.DynamicConfigGeneratorFactory;
-
-import static 
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.JobConfigurationUtils;
+import org.apache.gobblin.util.PathUtils;
 
 @Alpha
 @Slf4j
@@ -63,8 +68,9 @@ public class GobblinClusterUtils {
 
   public static Path getAppWorkDirPathFromConfig(Config config, FileSystem fs,
       String applicationName, String applicationId) {
-    if (config.hasPath(CLUSTER_WORK_DIR)) {
-      return new Path(config.getString(CLUSTER_WORK_DIR));
+    if (config.hasPath(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR)) {
+      return new Path(new Path(fs.getUri()), 
PathUtils.combinePaths(config.getString(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR),
+          getAppWorkDirPath(applicationName, applicationId)));
     }
     return new Path(fs.getHomeDirectory(), getAppWorkDirPath(applicationName, 
applicationId));
   }
@@ -129,4 +135,22 @@ public class GobblinClusterUtils {
     return getDynamicConfig(config).withFallback(config);
   }
 
+  /**
+   * A utility method to construct a {@link FileSystem} object with the 
configured Hadoop overrides provided as part of
+   * the cluster configuration.
+   * @param config
+   * @param conf
+   * @return a {@link FileSystem} object that is instantiated with the 
appropriated Hadoop config overrides.
+   * @throws IOException
+   */
+  public static FileSystem buildFileSystem(Config config, Configuration conf)
+      throws IOException {
+    Config hadoopOverrides = ConfigUtils.getConfigOrEmpty(config, 
GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX);
+
+    //Add any Hadoop-specific overrides into the Configuration object
+    
JobConfigurationUtils.putPropertiesIntoConfiguration(ConfigUtils.configToProperties(hadoopOverrides),
 conf);
+    return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
+        .get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
+        : FileSystem.get(conf);
+  }
 }
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
index d71dbd9..6b3fff7 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinTaskRunner.java
@@ -19,7 +19,6 @@ package org.apache.gobblin.cluster;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -74,20 +73,15 @@ import com.typesafe.config.ConfigValueFactory;
 import lombok.Getter;
 
 import org.apache.gobblin.annotation.Alpha;
-import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.instrumented.StandardMetricsBridge;
 import org.apache.gobblin.metrics.GobblinMetrics;
 import org.apache.gobblin.util.ClassAliasResolver;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.FileUtils;
 import org.apache.gobblin.util.HadoopUtils;
-import org.apache.gobblin.util.JobConfigurationUtils;
 import org.apache.gobblin.util.JvmUtils;
 import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
 
-import static 
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
-
-
 /**
  * The main class running in the containers managing services for running 
Gobblin
  * {@link org.apache.gobblin.source.workunit.WorkUnit}s.
@@ -113,6 +107,9 @@ import static 
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER
  */
 @Alpha
 public class GobblinTaskRunner implements StandardMetricsBridge {
+  // Working directory key for applications. This config is set dynamically.
+  public static final String CLUSTER_APP_WORK_DIR = 
GobblinClusterConfigurationKeys.GOBBLIN_CLUSTER_PREFIX + "appWorkDir";
+
   private static final Logger logger = 
LoggerFactory.getLogger(GobblinTaskRunner.class);
   static final java.nio.file.Path CLUSTER_CONF_PATH = 
Paths.get("generated-gobblin-cluster.conf");
 
@@ -169,10 +166,11 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
     this.dedicatedTaskDriverCluster = ConfigUtils.getBoolean(config,
         GobblinClusterConfigurationKeys.DEDICATED_TASK_DRIVER_CLUSTER_ENABLED, 
false);
     Configuration conf = HadoopUtils.newConfiguration();
-    this.fs = buildFileSystem(config, conf);
+    this.fs = GobblinClusterUtils.buildFileSystem(config, conf);
     this.appWorkPath = initAppWorkDir(config, appWorkDirOptional);
     this.clusterConfig = saveConfigToFile(config);
     this.clusterName = 
this.clusterConfig.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY);
+    logger.info("Configured GobblinTaskRunner work dir to: {}", 
this.appWorkPath.toString());
 
     //Set system properties passed in via application config. As an example, 
Helix uses System#getProperty() for ZK configuration
     // overrides such as sessionTimeout. In this case, the overrides specified
@@ -279,7 +277,7 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
   private Config saveConfigToFile(Config config)
       throws IOException {
     Config newConf = config
-        .withValue(CLUSTER_WORK_DIR, 
ConfigValueFactory.fromAnyRef(this.appWorkPath.toString()));
+        .withValue(CLUSTER_APP_WORK_DIR, 
ConfigValueFactory.fromAnyRef(this.appWorkPath.toString()));
     ConfigUtils configUtils = new ConfigUtils(new FileUtils());
     configUtils.saveConfigToFile(newConf, CLUSTER_CONF_PATH);
     return newConf;
@@ -437,17 +435,6 @@ public class GobblinTaskRunner implements 
StandardMetricsBridge {
     });
   }
 
-  private FileSystem buildFileSystem(Config config, Configuration conf)
-      throws IOException {
-    Config hadoopOverrides = ConfigUtils.getConfigOrEmpty(config, 
GobblinClusterConfigurationKeys.HADOOP_CONFIG_OVERRIDES_PREFIX);
-
-    //Add any Hadoop-specific overrides into the Configuration object
-    
JobConfigurationUtils.putPropertiesIntoConfiguration(ConfigUtils.configToProperties(hadoopOverrides),
 conf);
-    return config.hasPath(ConfigurationKeys.FS_URI_KEY) ? FileSystem
-        .get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), conf)
-        : FileSystem.get(conf);
-  }
-
   private Optional<ContainerMetrics> buildContainerMetrics() {
     Properties properties = ConfigUtils.configToProperties(this.clusterConfig);
     if (GobblinMetrics.isEnabled(properties)) {
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
index be0f0de..4778371 100644
--- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
+++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTask.java
@@ -50,8 +50,6 @@ import org.apache.gobblin.util.SerializationUtils;
 public class SingleTask {
 
   private static final Logger _logger = 
LoggerFactory.getLogger(SingleTask.class);
-  public static final String MAX_RETRY_WAITING_FOR_INIT_KEY = 
"maxRetryBlockedOnTaskAttemptInit";
-  public static final int DEFAULT_MAX_RETRY_WAITING_FOR_INIT = 2;
 
   private GobblinMultiTaskAttempt _taskAttempt;
   private String _jobId;
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
index de97fec..8eabbba 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/SingleTaskRunner.java
@@ -44,8 +44,6 @@ import org.apache.gobblin.runtime.util.StateStores;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.HadoopUtils;
 
-import static 
org.apache.gobblin.cluster.GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR;
-
 
 class SingleTaskRunner {
   private static final Logger logger = 
LoggerFactory.getLogger(SingleTaskRunner.class);
@@ -65,7 +63,7 @@ class SingleTaskRunner {
     this.jobId = jobId;
     this.workUnitFilePath = workUnitFilePath;
     this.clusterConfig = ConfigFactory.parseFile(new 
File(clusterConfigFilePath));
-    final String workDir = this.clusterConfig.getString(CLUSTER_WORK_DIR);
+    final String workDir = 
this.clusterConfig.getString(GobblinTaskRunner.CLUSTER_APP_WORK_DIR);
     this.appWorkPath = new Path(workDir);
   }
 
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
index 9288d42..ee5d7cf 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/GobblinClusterUtilsTest.java
@@ -17,48 +17,47 @@
 
 package org.apache.gobblin.cluster;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.io.IOException;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.testng.annotations.Test;
 
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+
+import org.apache.gobblin.util.PathUtils;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 
 public class GobblinClusterUtilsTest {
-
-  FileSystem fs = mock(FileSystem.class);
+  private static final String TEST_APP_NAME = "appName";
+  private static final String TEST_APP_ID = "appId";
+  private static final String TEST_WORK_DIR = "file:///foo/bar";
+  private static final String DEFAULT_HOME_DIR = "file:///home";
 
   @Test
-  public void work_dir_should_get_value_from_config_when_specified() throws 
Exception {
-    Map<String, String> configMap = new HashMap<>();
-    configMap.put("gobblin.cluster.workDir", "/foo/bar");
-
-    Config config = ConfigFactory.parseMap(configMap);
-
-    Path workDirPath = GobblinClusterUtils
-        .getAppWorkDirPathFromConfig(config, fs, "appName", "appid");
+  public void testGetAppWorkDirPathFromConfig() throws IOException {
+    FileSystem localFs = FileSystem.getLocal(new Configuration());
+    FileSystem mockFs = mock(FileSystem.class);
 
-    assertEquals(new Path("/foo/bar"), workDirPath);
-
-  }
-
-  @Test
-  public void 
work_dir_should_get_default_calculated_value_when_not_specified() throws 
Exception {
-    Map<String, String> configMap = new HashMap<>();
-    Config config = ConfigFactory.parseMap(configMap);
+    when(mockFs.getHomeDirectory()).thenReturn(new Path(DEFAULT_HOME_DIR));
+    when(mockFs.getUri()).thenReturn(localFs.getUri());
 
-    when(fs.getHomeDirectory()).thenReturn(new Path("/home/"));
+    //Set gobblin.cluster.workDir config
+    Config config = 
ConfigFactory.empty().withValue(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR,
+        ConfigValueFactory.fromAnyRef(TEST_WORK_DIR));
+    Path workDirPath = GobblinClusterUtils.getAppWorkDirPathFromConfig(config, 
localFs, TEST_APP_NAME, TEST_APP_ID);
 
-    Path workDirPath = GobblinClusterUtils
-        .getAppWorkDirPathFromConfig(config, fs, "appName", "appid");
+    assertEquals(PathUtils.combinePaths(TEST_WORK_DIR, TEST_APP_NAME, 
TEST_APP_ID), workDirPath);
 
-    assertEquals(new Path("/home/appName/appid"), workDirPath);
+    //Get workdir when gobblin.cluster.workDir is not specified
+    workDirPath = GobblinClusterUtils
+        .getAppWorkDirPathFromConfig(ConfigFactory.empty(), mockFs, 
TEST_APP_NAME, TEST_APP_ID);
+    assertEquals(PathUtils.combinePaths(DEFAULT_HOME_DIR, TEST_APP_NAME, 
TEST_APP_ID), workDirPath);
   }
 }
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
index 2b6499c..04a800c 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/TestSingleTask.java
@@ -17,21 +17,21 @@
 
 package org.apache.gobblin.cluster;
 
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 
-import org.apache.gobblin.testing.AssertWithBackoff;
 import org.junit.Assert;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Files;
+import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
 
-import javax.annotation.Nullable;
-
-import static 
org.apache.gobblin.cluster.SingleTask.MAX_RETRY_WAITING_FOR_INIT_KEY;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.util.FileUtils;
 
 
 /**
@@ -43,14 +43,18 @@ import static 
org.apache.gobblin.cluster.SingleTask.MAX_RETRY_WAITING_FOR_INIT_K
  */
 public class TestSingleTask {
 
-  private InMemorySingleTaskRunner createInMemoryTaskRunner() {
-    final String clusterConfigPath = "clusterConf";
-    final String wuPath = "_workunits/store/workunit.wu";
-    String clusterConfPath = 
this.getClass().getClassLoader().getResource(clusterConfigPath).getPath();
+  private InMemorySingleTaskRunner createInMemoryTaskRunner()
+      throws IOException {
+    final File clusterWorkDirPath = Files.createTempDir();
+    Path clusterConfigPath = Paths.get(clusterWorkDirPath.getAbsolutePath(), 
"clusterConf");
+    Config config = 
ConfigFactory.empty().withValue(GobblinTaskRunner.CLUSTER_APP_WORK_DIR, 
ConfigValueFactory.fromAnyRef(clusterWorkDirPath.toString()));
+    ConfigUtils configUtils = new ConfigUtils(new FileUtils());
+    configUtils.saveConfigToFile(config, clusterConfigPath);
 
-    InMemorySingleTaskRunner inMemorySingleTaskRunner = new 
InMemorySingleTaskRunner(clusterConfPath, "testJob",
-        this.getClass().getClassLoader().getResource(wuPath).getPath());
+    final Path wuPath = Paths.get(clusterWorkDirPath.getAbsolutePath(), 
"_workunits/store/workunit.wu");
 
+    InMemorySingleTaskRunner inMemorySingleTaskRunner =
+        new InMemorySingleTaskRunner(clusterConfigPath.toString(), "testJob", 
wuPath.toString());
     return inMemorySingleTaskRunner;
   }
 
@@ -60,15 +64,13 @@ public class TestSingleTask {
    * re-run it again.
    */
   @Test
-  public void testSingleTaskRerunAfterFailure()
-      throws Exception {
-    SingleTaskRunner inMemorySingleTaskRunner = createInMemoryTaskRunner();
+  public void testSingleTaskRerunAfterFailure() throws Exception {
+    InMemorySingleTaskRunner inMemorySingleTaskRunner = 
createInMemoryTaskRunner();
     try {
       inMemorySingleTaskRunner.run(true);
     } catch (Exception e) {
       inMemorySingleTaskRunner.run();
     }
-
     Assert.assertTrue(true);
   }
 }
diff --git 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
index 7da4915..e6c4587 100644
--- 
a/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
+++ 
b/gobblin-cluster/src/test/java/org/apache/gobblin/cluster/suite/IntegrationBasicSuite.java
@@ -130,8 +130,8 @@ public class IntegrationBasicSuite {
   }
 
   private void initWorkDir() throws IOException {
-    // Relative to the current directory
-    this.workPath = Paths.get("gobblin-integration-test-work-dir");
+    this.workPath = 
Paths.get(ConfigFactory.parseURL(Resources.getResource("BasicCluster.conf"))
+        .getString(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR));
     log.info("Created a new work directory: " + 
this.workPath.toAbsolutePath());
 
     // Delete the working directory in case the previous test fails to delete 
the directory
@@ -182,7 +182,7 @@ public class IntegrationBasicSuite {
     Map<String, String> configMap = new HashMap<>();
     String zkConnectionString = this.testingZKServer.getConnectString();
     configMap.put(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY, 
zkConnectionString);
-    configMap.put(GobblinClusterConfigurationKeys.CLUSTER_WORK_DIR, 
this.workPath.toString());
+    configMap.put(GobblinTaskRunner.CLUSTER_APP_WORK_DIR, 
this.workPath.toString());
     Config overrideConfig = ConfigFactory.parseMap(configMap);
 
     return overrideConfig.withFallback(config);
diff --git a/gobblin-cluster/src/test/resources/_workunits/store/workunit.wu 
b/gobblin-cluster/src/test/resources/_workunits/store/workunit.wu
deleted file mode 100644
index e69de29..0000000
diff --git a/gobblin-cluster/src/test/resources/clusterConf 
b/gobblin-cluster/src/test/resources/clusterConf
deleted file mode 100644
index 468d172..0000000
--- a/gobblin-cluster/src/test/resources/clusterConf
+++ /dev/null
@@ -1 +0,0 @@
-gobblin.cluster.workDir = <FILL IN THE PATH TO THE OWNING DIRECTORY>
\ No newline at end of file
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index 543381a..3367842 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -238,9 +238,7 @@ public class GobblinYarnAppLauncher {
     this.yarnClient = YarnClient.createYarnClient();
     this.yarnClient.init(this.yarnConfiguration);
 
-    this.fs = config.hasPath(ConfigurationKeys.FS_URI_KEY) ?
-        
FileSystem.get(URI.create(config.getString(ConfigurationKeys.FS_URI_KEY)), 
this.yarnConfiguration) :
-        FileSystem.get(this.yarnConfiguration);
+    this.fs = GobblinClusterUtils.buildFileSystem(config, 
this.yarnConfiguration);
     this.closer.register(this.fs);
 
     this.applicationStatusMonitor = Executors.newSingleThreadScheduledExecutor(
@@ -348,7 +346,7 @@ public class GobblinYarnAppLauncher {
         
!this.config.getBoolean(GobblinYarnConfigurationKeys.LOG_COPIER_DISABLE_DRIVER_COPY))
 {
       services.add(buildLogCopier(this.config,
         new Path(this.sinkLogRootDir, this.applicationName + Path.SEPARATOR + 
this.applicationId.get().toString()),
-        GobblinClusterUtils.getAppWorkDirPath(this.fs, this.applicationName, 
this.applicationId.get().toString())));
+        GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName, this.applicationId.get().toString())));
     }
     if (config.getBoolean(ConfigurationKeys.JOB_EXECINFO_SERVER_ENABLED_KEY)) {
       LOGGER.info("Starting the job execution info server since it is 
enabled");
@@ -604,8 +602,10 @@ public class GobblinYarnAppLauncher {
   }
 
   private Map<String, LocalResource> addAppMasterLocalResources(ApplicationId 
applicationId) throws IOException {
-    Path appWorkDir = GobblinClusterUtils.getAppWorkDirPath(this.fs, 
this.applicationName, applicationId.toString());
+    Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName, applicationId.toString());
+
     Path appMasterWorkDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.APP_MASTER_WORK_DIR_NAME);
+    LOGGER.info("Configured GobblinApplicationMaster work directory to: {}", 
appMasterWorkDir.toString());
 
     Map<String, LocalResource> appMasterResources = Maps.newHashMap();
     FileSystem localFs = FileSystem.getLocal(new Configuration());
@@ -639,8 +639,11 @@ public class GobblinYarnAppLauncher {
   }
 
   private void addContainerLocalResources(ApplicationId applicationId) throws 
IOException {
-    Path appWorkDir = GobblinClusterUtils.getAppWorkDirPath(this.fs, 
this.applicationName, applicationId.toString());
+    Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName, applicationId.toString());
+
     Path containerWorkDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
+    LOGGER.info("Configured Container work directory to: {}", 
containerWorkDir.toString());
+
     FileSystem localFs = FileSystem.getLocal(new Configuration());
 
     if (this.config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JARS_KEY)) {
@@ -850,7 +853,7 @@ public class GobblinYarnAppLauncher {
 
   @VisibleForTesting
   void cleanUpAppWorkDirectory(ApplicationId applicationId) throws IOException 
{
-    Path appWorkDir = GobblinClusterUtils.getAppWorkDirPath(this.fs, 
this.applicationName, applicationId.toString());
+    Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName, applicationId.toString());
     if (this.fs.exists(appWorkDir)) {
       LOGGER.info("Deleting application working directory " + appWorkDir);
       this.fs.delete(appWorkDir, true);
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
index 0c649ba..027be31 100644
--- a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
+++ b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnService.java
@@ -487,7 +487,7 @@ public class YarnService extends AbstractIdleService {
 
   protected ContainerLaunchContext newContainerLaunchContext(Container 
container, String helixInstanceName)
       throws IOException {
-    Path appWorkDir = GobblinClusterUtils.getAppWorkDirPath(this.fs, 
this.applicationName, this.applicationId);
+    Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, this.fs, 
this.applicationName, this.applicationId);
     Path containerWorkDir = new Path(appWorkDir, 
GobblinYarnConfigurationKeys.CONTAINER_WORK_DIR_NAME);
 
     Map<String, LocalResource> resourceMap = Maps.newHashMap();

Reply via email to