This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c02c293  [GOBBLIN-1031] Gobblin-on-Yarn locally running Azkaban job 
skeleton
c02c293 is described below

commit c02c29327eae10d1f79efbf5564053d58a280b93
Author: autumnust <le...@linkedin.com>
AuthorDate: Sat Feb 22 11:58:13 2020 -0800

    [GOBBLIN-1031] Gobblin-on-Yarn locally running Azkaban job skeleton
    
    Closes #2873 from autumnust/GobblinAppMasterTest
---
 .../gobblin/cluster/GobblinClusterManager.java     |   4 +-
 .../gobblin/cluster/GobblinHelixJobLauncher.java   |  14 ++-
 gobblin-modules/gobblin-azkaban/build.gradle       |   3 +
 .../AzkabanGobblinLocalYarnAppLauncher.java        |  49 ++++++++
 .../azkaban/AzkabanGobblinYarnAppLauncher.java     |  25 +++-
 .../apache/gobblin/azkaban/AzkabanJobRunner.java   | 117 ++++++++++++++++++
 .../azkaban/EmbeddedGobblinYarnAppLauncher.java    | 135 +++++++++++++++++++++
 .../src/main/resources/conf/gobblin_conf/app.btm   |  32 +++++
 .../conf/gobblin_conf/log4j-yarn.properties        |  24 ++++
 .../gobblin_jobs/kafka-hdfs-streaming-avro.conf    |  88 ++++++++++++++
 .../conf/jobs/kafka-streaming-on-yarn.job          |  53 ++++++++
 .../resources/conf/properties/common.properties    |  63 ++++++++++
 .../resources/conf/properties/local.properties     |  21 ++++
 .../extractor/extract/kafka/KafkaSource.java       |   2 +-
 .../gobblin/runtime/AbstractJobLauncher.java       |   2 +-
 .../org/apache/gobblin/util/logs/LogCopier.java    |   5 +-
 .../yarn/AbstractYarnAppSecurityManager.java       |   2 +-
 .../gobblin/yarn/GobblinYarnAppLauncher.java       |  14 ++-
 .../gobblin/yarn/GobblinYarnConfigurationKeys.java |   1 +
 .../apache/gobblin/yarn/GobblinYarnLogSource.java  |   6 +-
 .../gobblin/yarn/YarnAutoScalingManager.java       |   2 +
 .../gobblin/yarn/GobblinYarnAppLauncherTest.java   |   6 +-
 22 files changed, 642 insertions(+), 26 deletions(-)

diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
index 5900f64..9574a97 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterManager.java
@@ -234,7 +234,9 @@ public class GobblinClusterManager implements 
ApplicationLauncher, StandardMetri
   }
 
   /**
-   * Configure Helix quota-based task scheduling
+   * Configure Helix quota-based task scheduling.
+   * This config controls the number of tasks that are concurrently assigned 
to a single Helix instance.
+   * Reference: https://helix.apache.org/0.9.1-docs/quota_scheduling.html
    */
   @VisibleForTesting
   void configureHelixQuotaBasedTaskScheduling() {
diff --git 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
index cb2b434..418ce62 100644
--- 
a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
+++ 
b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinHelixJobLauncher.java
@@ -93,8 +93,8 @@ import org.apache.gobblin.util.SerializationUtils;
  * </p>
  *
  * <p>
- *   This class runs in the {@link GobblinClusterManager}. The actual task 
execution happens in the in the
- *   {@link GobblinTaskRunner}.
+ *   This class is instantiated by the {@link GobblinHelixJobScheduler} on 
every job submission to launch the Gobblin job.
+ *   The actual task execution happens in the {@link GobblinTaskRunner}, 
usually in a different process.
  * </p>
  *
  * @author Yinan Li
@@ -164,7 +164,7 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     Config stateStoreJobConfig = ConfigUtils.propertiesToConfig(jobProps)
         .withValue(ConfigurationKeys.STATE_STORE_FS_URI_KEY, 
ConfigValueFactory.fromAnyRef(
             new URI(appWorkDir.toUri().getScheme(), null, 
appWorkDir.toUri().getHost(),
-                appWorkDir.toUri().getPort(), null, null, null).toString()));
+                appWorkDir.toUri().getPort(), "/", null, null).toString()));
 
     this.stateStores = new StateStores(stateStoreJobConfig, appWorkDir,
         GobblinClusterConfigurationKeys.OUTPUT_TASK_STATE_DIR_NAME, appWorkDir,
@@ -371,6 +371,8 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
     this.jobListener = jobListener;
     boolean isLaunched = false;
     this.runningMap.putIfAbsent(this.jobContext.getJobName(), false);
+
+    Throwable errorInJobLaunching = null;
     try {
       if (this.runningMap.replace(this.jobContext.getJobName(), false, true)) {
         LOGGER.info ("Job {} will be executed, add into running map.", 
this.jobContext.getJobId());
@@ -379,12 +381,16 @@ public class GobblinHelixJobLauncher extends 
AbstractJobLauncher {
       } else {
         LOGGER.warn ("Job {} will not be executed because other jobs are still 
running.", this.jobContext.getJobId());
       }
+      // TODO: Better error handling
+    } catch (Throwable t) {
+      errorInJobLaunching = t;
     } finally {
       if (isLaunched) {
         if (this.runningMap.replace(this.jobContext.getJobName(), true, 
false)) {
           LOGGER.info ("Job {} is done, remove from running map.", 
this.jobContext.getJobId());
         } else {
-          throw new IllegalStateException("A launched job should have running 
state equal to true in the running map.");
+          throw errorInJobLaunching == null ? new IllegalStateException("A 
launched job should have running state equal to true in the running map.")
+              : new RuntimeException("Failure in launching job:", 
errorInJobLaunching);
         }
       }
     }
diff --git a/gobblin-modules/gobblin-azkaban/build.gradle 
b/gobblin-modules/gobblin-azkaban/build.gradle
index 2f1dde6..c529ace 100644
--- a/gobblin-modules/gobblin-azkaban/build.gradle
+++ b/gobblin-modules/gobblin-azkaban/build.gradle
@@ -28,6 +28,9 @@ dependencies {
   compile project(":gobblin-metrics-libs:gobblin-metrics")
   compile project(":gobblin-utility")
   compile project(":gobblin-yarn")
+  compile externalDependency.curatorFramework
+  compile externalDependency.hadoopYarnMiniCluster
+  compile externalDependency.curatorTest
 
   compile externalDependency.azkaban
   compile externalDependency.log4j
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinLocalYarnAppLauncher.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinLocalYarnAppLauncher.java
new file mode 100644
index 0000000..53ccafa
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinLocalYarnAppLauncher.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.azkaban;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+
+/**
+ * An extension of {@link AzkabanGobblinYarnAppLauncher} for locally-running 
Azkaban instances since it provides
+ * capability of changing yarn-resource related configuration in the way that 
could work with lighter hardware.
+ */
+public class AzkabanGobblinLocalYarnAppLauncher extends 
AzkabanGobblinYarnAppLauncher {
+  public AzkabanGobblinLocalYarnAppLauncher(String jobId, Properties 
gobblinProps)
+      throws IOException {
+    super(jobId, gobblinProps);
+  }
+
+  @Override
+  protected YarnConfiguration initYarnConf(Properties gobblinProps) {
+    YarnConfiguration yarnConfiguration = super.initYarnConf(gobblinProps);
+    if (gobblinProps.containsKey("yarn-site-address")) {
+      yarnConfiguration.addResource(new 
Path(gobblinProps.getProperty("yarn-site-address")));
+    } else {
+      yarnConfiguration.set("yarn.resourcemanager.connect.max-wait.ms", 
"10000");
+      yarnConfiguration.set("yarn.nodemanager.resource.memory-mb", "512");
+      yarnConfiguration.set("yarn.scheduler.maximum-allocation-mb", "1024");
+    }
+    return yarnConfiguration;
+  }
+}
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
index 6fa7eab..4747e89 100644
--- 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanGobblinYarnAppLauncher.java
@@ -23,6 +23,9 @@ import java.util.Properties;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.yarn.GobblinYarnAppLauncher;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.log4j.Logger;
 
@@ -31,10 +34,8 @@ import com.google.common.base.Charsets;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigRenderOptions;
 
-import org.apache.gobblin.util.ConfigUtils;
-import org.apache.gobblin.yarn.GobblinYarnAppLauncher;
-
 import azkaban.jobExecutor.AbstractJob;
+import lombok.Getter;
 
 
 /**
@@ -60,13 +61,25 @@ public class AzkabanGobblinYarnAppLauncher extends 
AbstractJob {
 
   private final GobblinYarnAppLauncher gobblinYarnAppLauncher;
 
-  public AzkabanGobblinYarnAppLauncher(String jobId, Properties props) throws 
IOException {
+  @Getter
+  private final YarnConfiguration yarnConfiguration;
+
+  public AzkabanGobblinYarnAppLauncher(String jobId, Properties gobblinProps) 
throws IOException {
     super(jobId, LOGGER);
-    Config gobblinConfig = ConfigUtils.propertiesToConfig(props);
+    Config gobblinConfig = ConfigUtils.propertiesToConfig(gobblinProps);
 
     outputConfigToFile(gobblinConfig);
 
-    this.gobblinYarnAppLauncher = new GobblinYarnAppLauncher(gobblinConfig, 
new YarnConfiguration());
+    yarnConfiguration = initYarnConf(gobblinProps);
+
+    this.gobblinYarnAppLauncher = new GobblinYarnAppLauncher(gobblinConfig, 
this.yarnConfiguration);
+  }
+
+  /**
+   * Extended class can override this method by providing their own YARN 
configuration.
+   */
+  protected YarnConfiguration initYarnConf(Properties gobblinProps) {
+    return new YarnConfiguration();
   }
 
   @Override
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobRunner.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobRunner.java
new file mode 100644
index 0000000..9ec3e0e
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/AzkabanJobRunner.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.azkaban;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import com.google.common.io.Files;
+
+import azkaban.jobExecutor.AbstractJob;
+import azkaban.utils.Props;
+import azkaban.utils.PropsUtils;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * Runs Azkaban jobs locally.
+ *
+ * Usage:
+ * Extend the class, in the constructor pass the list of relative paths to all 
common properties files, as well as list
+ * of job files to run.
+ *
+ * Execution:
+ * java -cp ... {@link AzkabanJobRunner} class-name root-directory
+ *
+ * Where class-name is the extension of {@link AzkabanJobRunner} that should 
be executed, and root-directory is the
+ * root directory of the repository.
+ *
+ * @author Issac Buenrostro
+ */
+@RequiredArgsConstructor
+public class AzkabanJobRunner {
+  private File baseDirectory = new File(".");
+  private final List<String> commonProps;
+  private final List<String> jobProps;
+  private final Map<String, String> overrides;
+
+  static void doMain(Class<? extends AzkabanJobRunner> klazz, String[] args)
+      throws Exception {
+    AzkabanJobRunner runner = klazz.newInstance();
+    if (args.length >= 1) {
+      runner.setBaseDirectory(new File(args[0]));
+    }
+    runner.run();
+  }
+
+  public static String getTempDirectory() {
+    File tmpDirectory = Files.createTempDir();
+    tmpDirectory.deleteOnExit();
+    return tmpDirectory.getAbsolutePath();
+  }
+
+  private void setBaseDirectory(File baseDirectory) {
+    this.baseDirectory = baseDirectory;
+  }
+
+  public void run()
+      throws IOException {
+
+    Props commonProps = new Props();
+    for (String commonPropsFile : this.commonProps) {
+      commonProps = new Props(commonProps, new File(baseDirectory, 
commonPropsFile));
+    }
+
+    for (String jobFile : this.jobProps) {
+      File file = new File(baseDirectory, jobFile);
+      Props jobProps = new Props(new Props(commonProps, file), this.overrides);
+      jobProps = PropsUtils.resolveProps(jobProps);
+      try {
+        AbstractJob job = constructAbstractJob(file.getName(), jobProps);
+        job.run();
+      } catch (Throwable t) {
+        throw new RuntimeException(t);
+      }
+    }
+  }
+
+  private AbstractJob constructAbstractJob(String name, Props jobProps) {
+    try {
+      return (AbstractJob) 
jobProps.getClass("job.class").getConstructor(String.class, Props.class)
+          .newInstance(name, jobProps);
+    } catch (ReflectiveOperationException roe) {
+      try {
+        return (AbstractJob) 
jobProps.getClass("job.class").getConstructor(String.class, Properties.class)
+            .newInstance(name, propsToProperties(jobProps));
+      } catch (ReflectiveOperationException exc) {
+        throw new RuntimeException(exc);
+      }
+    }
+  }
+
+  private Properties propsToProperties(Props props) {
+    Properties properties = new Properties();
+    for (String key : props.getKeySet()) {
+      properties.put(key, props.getString(key));
+    }
+    return properties;
+  }
+}
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/EmbeddedGobblinYarnAppLauncher.java
 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/EmbeddedGobblinYarnAppLauncher.java
new file mode 100644
index 0000000..0996342
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/azkaban/EmbeddedGobblinYarnAppLauncher.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.azkaban;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.lang.reflect.Field;
+import java.util.Map;
+
+import org.apache.gobblin.testing.AssertWithBackoff;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.testng.collections.Lists;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Closer;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Given a set up Azkaban job configuration, launch the Gobblin-on-Yarn job in 
a semi-embedded mode:
+ * - Uses external Kafka cluster and requires external Zookeeper(Non-embedded 
TestingServer) to be set up.
+ * The Kafka Cluster was intentionally set to be external due to the data 
availability. External ZK was unintentional
+ * as the helix version (0.9) being used cannot finish state transition in the 
Embedded ZK.
+ * TODO: Adding embedded Kafka cluster and set golden datasets for 
data-validation.
+ * - Uses MiniYARNCluster so YARN components don't have to be installed.
+ */
+@Slf4j
+public class EmbeddedGobblinYarnAppLauncher extends AzkabanJobRunner {
+  public static final String DYNAMIC_CONF_PATH = "dynamic.conf";
+  public static final String YARN_SITE_XML_PATH = "yarn-site.xml";
+  private static String zkString = "";
+  private static String fileAddress = "";
+
+  private static void setup(String[] args)
+      throws Exception {
+    // Parsing zk-string
+    Preconditions.checkArgument(args.length == 1);
+    zkString = args[0];
+
+    // Initialize necessary external components: Yarn and Helix
+    Closer closer = Closer.create();
+
+    // Set java home in environment since it isn't set on some systems
+    String javaHome = System.getProperty("java.home");
+    setEnv("JAVA_HOME", javaHome);
+
+    final YarnConfiguration clusterConf = new YarnConfiguration();
+    clusterConf.set("yarn.resourcemanager.connect.max-wait.ms", "10000");
+    clusterConf.set("yarn.nodemanager.resource.memory-mb", "512");
+    clusterConf.set("yarn.scheduler.maximum-allocation-mb", "1024");
+
+    MiniYARNCluster miniYARNCluster = closer.register(new 
MiniYARNCluster("TestCluster", 1, 1, 1));
+    miniYARNCluster.init(clusterConf);
+    miniYARNCluster.start();
+
+    // YARN client should not be started before the Resource Manager is up
+    AssertWithBackoff.create().logger(log).timeoutMs(10000).assertTrue(new 
Predicate<Void>() {
+      @Override
+      public boolean apply(Void input) {
+        return !clusterConf.get(YarnConfiguration.RM_ADDRESS).contains(":0");
+      }
+    }, "Waiting for RM");
+
+    try (PrintWriter pw = new PrintWriter(DYNAMIC_CONF_PATH, "UTF-8")) {
+      File dir = new File("target/dummydir");
+
+      // dummy directory specified in configuration
+      if (!dir.mkdir()) {
+        log.error("The dummy folder's creation is not successful");
+      }
+      dir.deleteOnExit();
+
+      pw.println("gobblin.cluster.zk.connection.string=\"" + zkString + "\"");
+      pw.println("jobconf.fullyQualifiedPath=\"" + dir.getAbsolutePath() + 
"\"");
+    }
+
+    // YARN config is dynamic and needs to be passed to other processes
+    try (OutputStream os = new FileOutputStream(new File(YARN_SITE_XML_PATH))) 
{
+      clusterConf.writeXml(os);
+    }
+
+    /** Have to pass the same yarn-site.xml to the GobblinYarnAppLauncher to 
initialize Yarn Client. */
+    fileAddress = new File(YARN_SITE_XML_PATH).getAbsolutePath();
+  }
+
+  static void setEnv(String key, String value) {
+    try {
+      Map<String, String> env = System.getenv();
+      Class<?> cl = env.getClass();
+      Field field = cl.getDeclaredField("m");
+      field.setAccessible(true);
+      Map<String, String> writableEnv = (Map<String, String>) field.get(env);
+      writableEnv.put(key, value);
+    } catch (Exception e) {
+      throw new IllegalStateException("Failed to set environment variable", e);
+    }
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    setup(args);
+    AzkabanJobRunner.doMain(EmbeddedGobblinYarnAppLauncher.class, args);
+  }
+
+  public EmbeddedGobblinYarnAppLauncher() {
+    
super(Lists.newArrayList("gobblin-modules/gobblin-azkaban/src/main/resources/conf/properties/common.properties",
+        
"gobblin-modules/gobblin-azkaban/src/main/resources/conf/properties/local.properties"),
+        
Lists.newArrayList("gobblin-modules/gobblin-azkaban/src/main/resources/conf/jobs/kafka-streaming-on-yarn.job"),
+        ImmutableMap.of("yarn.resourcemanager.connect.max-wait.ms", "10000", 
"gobblin.cluster.zk.connection.string",
+            EmbeddedGobblinYarnAppLauncher.zkString, 
"gobblin.cluster.job.conf.path",
+            
"gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs", 
"gobblin.yarn.conf.dir",
+            
"gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_conf", 
"yarn-site-address", fileAddress));
+  }
+}
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_conf/app.btm 
b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_conf/app.btm
new file mode 100644
index 0000000..59e6489
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_conf/app.btm
@@ -0,0 +1,32 @@
+RULE trace main entry
+CLASS GobblinYarnTaskRunner
+METHOD main
+AT ENTRY
+IF true
+DO traceln("entering main")
+ENDRULE
+
+RULE trace main exit
+CLASS GobblinYarnTaskRunner
+METHOD main
+AT EXIT
+IF true
+DO traceln("exiting main")
+ENDRULE
+
+RULE create countdown for converter
+CLASS LiKafkaConsumerRecordToGenericRecordConverter
+METHOD <init>
+IF TRUE
+DO createCountDown($0, 100000)
+ENDRULE
+
+RULE trace converter entry
+CLASS LiKafkaConsumerRecordToGenericRecordConverter
+METHOD convertRecordImpl
+AT ENTRY
+IF countDown($0)
+DO THROW new org.apache.gobblin.converter.DataConversionException("Injected 
exception")
+# The following line can be used to kill the JVM
+#DO traceln("killing JVM"), killJVM()
+ENDRULE
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_conf/log4j-yarn.properties
 
b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_conf/log4j-yarn.properties
new file mode 100755
index 0000000..c5a1b7e
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_conf/log4j-yarn.properties
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+log4j.rootLogger=INFO, loggerId
+log4j.appender.loggerId=org.apache.log4j.rolling.RollingFileAppender
+log4j.appender.loggerId.rollingPolicy=org.apache.log4j.rolling.TimeBasedRollingPolicy
+log4j.appender.loggerId.rollingPolicy.ActiveFileName=${gobblin.yarn.app.container.log.dir}/${gobblin.yarn.app.container.log.file}
+log4j.appender.loggerId.rollingPolicy.FileNamePattern=${gobblin.yarn.app.container.log.dir}/${gobblin.yarn.app.container.log.file}.%d{yyyy-MM-dd-HH}
+log4j.appender.loggerId.layout=org.apache.log4j.PatternLayout
+log4j.appender.loggerId.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss z} 
%-5p [%t] %C %X{tableName} - %m%n
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf
 
b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf
new file mode 100644
index 0000000..71c64dc
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/gobblin_jobs/kafka-hdfs-streaming-avro.conf
@@ -0,0 +1,88 @@
+# A sample skeleton that reads from a Kafka topic and writes to Local FS in a 
streaming manner
+# This sample job works with Embedded Gobblin using LocalJobLauncher instead 
of going through Yarn approach.
+
+job.name=LocalKafkaStreaming
+job.group=streaming
+job.description=A getting started example for Gobblin streaming to Kafka
+job.lock.enabled=false
+
+# Flag to enable StreamModelTaskRunner
+task.execution.synchronousExecutionModel=false
+gobblin.task.is.single.branch.synchronous=true
+taskexecutor.threadpool.size=1
+fork.record.queue.capacity=1
+
+# Streaming-source specific configurations
+source.class=org.apache.gobblin.source.extractor.extract.kafka.UniversalKafkaSource
+gobblin.source.kafka.extractorType=org.apache.gobblin.prototype.kafka.KafkaAvroBinaryStreamingExtractor
+kafka.workunit.size.estimator.type=CUSTOM
+kafka.workunit.size.estimator.customizedType=org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.UnitKafkaWorkUnitSizeEstimator
+kafka.workunit.packer.type=CUSTOM
+kafka.workunit.packer.customizedType=org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.KafkaTopicGroupingWorkUnitPacker
+extract.namespace=org.apache.gobblin.streaming.test
+
+# Configure watermark storage for streaming, using FS-based for local testing
+streaming.watermarkStateStore.type=fs
+streaming.watermark.commitIntervalMillis=2000
+
+# Converter configs
+# Default Generic Record based pipeline
+recordStreamProcessor.classes="org.apache.gobblin.prototype.kafka.GenericRecordBasedKafkaSchemaChangeInjector,org.apache.gobblin.prototype.kafka.LiKafkaConsumerRecordToGenericRecordConverter"
+
+# Record-metadata decoration into main record
+gobblin.kafka.converter.recordMetadata.enable=true
+
+# Writer configs
+writer.builder.class=org.apache.gobblin.writer.AvroDataWriterBuilder
+writer.partitioner.class=org.apache.gobblin.writer.partitioner.TimeBasedAvroWriterPartitioner
+writer.output.format=AVRO
+writer.partition.columns=header.time
+writer.partition.pattern=yyyy/MM/dd
+writer.destination.type=HDFS
+writer.staging.dir=/tmp/gobblin/streaming/writer-staging
+writer.output.dir=/tmp/gobblin/streaming/writer-output
+writer.closeOnFlush=true
+
+state.store.enabled=false
+
+# Publisher config
+data.publisher.type=org.apache.gobblin.publisher.NoopPublisher
+data.publisher.final.dir=/tmp/gobblin/kafka/publish
+flush.data.publisher.class=org.apache.gobblin.prototype.kafka.TimePartitionedStreamingDataPublisher
+###Config that controls intervals between flushes (and consequently, data 
publish)
+stream.flush.interval.secs=60
+
+### Following are Kafka Upstream related configurations
+# Kafka source configurations
+topic.whitelist=
+bootstrap.with.offset=EARLIEST
+source.kafka.fetchTimeoutMillis=3000
+kafka.consumer.maxPollRecords=100
+
+#Kafka broker/schema registry configs
+kafka.schema.registry.url=
+kafka.schema.registry.class=
+kafka.schemaRegistry.class=
+kafka.schemaRegistry.url=
+kafka.brokers=
+
+#Kafka SSL configs
+security.protocol = SSL
+ssl.protocol = TLS
+ssl.trustmanager.algorithm =
+ssl.keymanager.algorithm =
+ssl.truststore.type =
+ssl.truststore.location =
+ssl.truststore.password =
+ssl.keystore.type =
+ssl.keystore.password =
+ssl.key.password =
+ssl.secure.random.implementation =
+ssl.keystore.location=<path to your kafka certs>
+
+metrics.enabled=false
+
+# Only Required for Local-testing
+kafka.consumer.runtimeIngestionPropsEnabled=false
+# Limit single mappers for ease of debugging
+mr.job.max.mappers = 1
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/jobs/kafka-streaming-on-yarn.job
 
b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/jobs/kafka-streaming-on-yarn.job
new file mode 100644
index 0000000..05a5fad
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/jobs/kafka-streaming-on-yarn.job
@@ -0,0 +1,53 @@
+# This job starts up a Gobblin on YARN application master
+# that runs jobs specified in the gobblin_jobs directory
+job.name=kafka-streaming-on-yarn
+gobblin.yarn.app.master.memory.mbs=1024
+gobblin.yarn.app.master.cores=1
+gobblin.yarn.app.report.interval.minutes=5
+gobblin.yarn.max.get.app.report.failures=4
+gobblin.yarn.email.notification.on.shutdown=false
+
+#Set the minimum number of containers to 1 for ease of observing.
+gobblin.yarn.initial.containers=1
+gobblin.yarn.autoScaling.minContainers=1
+
+gobblin.yarn.container.memory.mbs=1024
+gobblin.yarn.container.jvmMemoryOverheadMbs=600
+gobblin.yarn.container.cores=1
+gobblin.yarn.container.affinity.enabled=true
+gobblin.yarn.helix.instance.max.retries=2
+
+# Use config set in the Azkaban job for initializing the yarn containers
+gobblin.yarn.akabanConfigOutputDir=./gobblin_config
+gobblin.yarn.akabanConfigOutputPath=${gobblin.yarn.akabanConfigOutputDir}/application.conf
+
+gobblin.yarn.conf.dir=./conf
+gobblin.yarn.lib.jars.dir=<path to where the gobblin libraries sit>
+gobblin.yarn.app.master.files.local=<Path to 
yarn-site.xml>,${gobblin.yarn.conf.dir}/app.btm,${gobblin.yarn.conf.dir}/log4j-yarn.properties,${gobblin.yarn.akabanConfigOutputDir}/application.conf
+gobblin.yarn.container.files.local=${gobblin.yarn.app.master.files.local}
+gobblin.yarn.log.copier.disable.driver.copy=true
+gobblin.yarn.app.master.serviceClasses=org.apache.gobblin.yarn.YarnAutoScalingManager
+
+# Cluster configuration properties
+# gobblin.cluster.helix.cluster.name=${gobblin.yarn.app.name}
+
+# Job Configuration manager properties
+gobblin.cluster.job.configuration.manager=org.apache.gobblin.cluster.FsJobConfigurationManager
+gobblin.cluster.specConsumer.class=org.apache.gobblin.runtime.api.FsSpecConsumer
+
+# This config is to restrict assignment to one task per container
+gobblin.cluster.helixTaskQuotaConfig=DEFAULT:1,UNUSED:39
+
+job.execinfo.server.enabled=false
+admin.server.enabled=false
+
+# File system URIs
+# writer.fs.uri=${fs.uri}
+# state.store.fs.uri=${fs.uri}
+
+# state.store.dir=${gobblin.yarn.work.dir}/state-store
+# qualitychecker.row.err.file=${gobblin.yarn.work.dir}/err
+job.lock.enabled=false
+# job.lock.dir=${gobblin.yarn.work.dir}/locks
+
+writer.include.record.count.in.file.names=true
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/properties/common.properties
 
b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/properties/common.properties
new file mode 100644
index 0000000..bc676ec
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/properties/common.properties
@@ -0,0 +1,63 @@
+type=hadoopJava
+job.class=org.apache.gobblin.azkaban.AzkabanGobblinLocalYarnAppLauncher
+job.name=GobblinKafkaStreaming
+job.group=GobblinKafkaStreaming
+job.lock.enabled=false
+encrypt.key.loc=/jobs/kafkaetl/gobblin/master
+cleanup.staging.data.per.task=false
+user.to.proxy=kafkaetl
+
+# This assumes all dependent jars are put into the 'lib' directory
+job.jars=lib/*,resources/gobblin-site.xml
+classpath=lib/*,/export/apps/hadoop/latest/lib/*,${hive.jars}
+
+# MR Configurations
+jvmArgsMem=-XX:MaxPermSize=128M
+jvmArgsGc=-XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled 
-XX:+ScavengeBeforeFullGC -XX:+CMSScavengeBeforeRemark
+jvmArgsGcLog=-XX:+PrintGCDetails -XX:+PrintGCDateStamps 
-XX:+PrintTenuringDistribution -Xloggc:gc.log
+jvmArgsPerf=-XX:+UseCompressedOops
+jvmArgsError=-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/grid/a/mapred/tmp/
+jvm.args=${jvmArgsMem} ${jvmArgsGc} ${jvmArgsGcLog} ${jvmArgsPerf} 
${jvmArgsError}
+
+# Common directories
+gobblin.dataset.root.dir=${root.data.location}/tracking/streaming_parallel
+# 
task.data.root.dir=${gobblin.dataset.root.dir}/_working/${root.project.name}/task-data
+job.work.dir=/tmp/${root.project.name}/${job.name}
+qualitychecker.row.err.file=${home.dir}/gobblin/${root.project.name}/err
+job.lock.dir=${job.work.dir}/locks
+state.store.dir=${home.dir}/gobblin/${root.project.name}/state-store
+mr.job.root.dir=${home.dir}/gobblin/${root.project.name}/working
+mr.job.lock.dir=${home.dir}/gobblin/${root.project.name}/locks
+writer.staging.dir=${job.work.dir}/task-staging
+writer.output.dir=${job.work.dir}/task-output
+
+# Compaction specific directories
+compaction.input.dir=${gobblin.dataset.root.dir}
+compaction.dest.dir=${gobblin.dataset.root.dir}
+compaction.input.subdir=hourly
+compaction.dest.subdir=daily
+compaction.tmp.dest.dir=/tmp/${root.project.name}/${job.name}
+
+# FS URIs
+source.filebased.fs.uri=${fs.uri}
+writer.fs.uri=${fs.uri}
+compaction.file.system.uri=${fs.uri}
+state.store.fs.uri=${fs.uri}
+
+# Helix Configuration
+gobblin.cluster.helix.cluster.name=${gobblin.yarn.app.name}
+gobblin.cluster.helix.overwrite=false
+helix.job.timeout.seconds=9223372036854774
+helix.task.timeout.seconds=9223372036854774
+
+# Yarn Configuration
+gobblin.yarn.app.name=${root.project.name}-${grid.name}
+# No. of ms to wait between sending a SIGTERM and SIGKILL to a container
+yarn.nodemanager.sleep-delay-before-sigkill.ms=30000
+gobblin.yarn.work.dir=/tmp/${root.project.name}/${job.name}
+gobblin.yarn.logs.sink.root.dir=${logs.dir}/gobblin/${root.project.name}/logs
+
+# Gobblin Cluster
+gobblin.cluster.specConsumer.path=${home.dir}/streaming/specs
+gobblin.cluster.workflow.expirySeconds=9223372036854774
+gobblin.cluster.job.conf.path=./jobs
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-azkaban/src/main/resources/conf/properties/local.properties
 
b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/properties/local.properties
new file mode 100644
index 0000000..d07f636
--- /dev/null
+++ 
b/gobblin-modules/gobblin-azkaban/src/main/resources/conf/properties/local.properties
@@ -0,0 +1,21 @@
+# Misc. deployment/cluster specific variables
+user.name=yourname
+logs.user.name=yourname
+root.project.name=gobblin-kafka-streaming-local
+grid.name=cluster-name
+
+
+# Cluster specific directory configurations
+# home.dir and root.data.location should be unique per cluster deployment
+
+home.dir=/jobs/${user.name}
+logs.dir=/jobs/${logs.user.name}
+root.data.location=/data
+
+fs.uri=file:///
+
+# Yarn Configuration
+gobblin.yarn.app.queue=default
+
+# Gobblin Cluster
+gobblin.yarn.app.name=${root.project.name}-${grid.name}-1
\ No newline at end of file
diff --git 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
index f2ecf9d..8a35fa2 100644
--- 
a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
+++ 
b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java
@@ -288,7 +288,7 @@ public abstract class KafkaSource<S, D> extends 
EventBasedSource<S, D> {
           client.close();
         }
       } catch (IOException e) {
-        throw new RuntimeException("Exception closing kafkaConsumerClient");
+        throw new RuntimeException("Exception closing kafkaConsumerClient", e);
       }
     }
   }
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index bd01a51..8ec6fed 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -530,7 +530,7 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
         }
       } catch (Throwable t) {
         jobState.setState(JobState.RunningState.FAILED);
-        String errMsg = "Failed to launch and run job " + jobId;
+        String errMsg = "Failed to launch and run job " + jobId + " due to" + 
t.getMessage();
         LOG.error(errMsg + ": " + t, t);
         this.jobContext.getJobState().setJobFailureException(t);
       } finally {
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
index 1b72be9..d516c80 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/logs/LogCopier.java
@@ -64,7 +64,7 @@ import org.apache.gobblin.util.FileListUtils;
 
 
 /**
- * A utility class that periodically reads log files in a source log file 
directory for changes
+ * A utility service that periodically reads log files in a source log file 
directory for changes
  * since the last reads and appends the changes to destination log files with 
the same names as
  * the source log files in a destination log directory. The source and 
destination log files
  * can be on different {@link FileSystem}s.
@@ -507,7 +507,7 @@ public class LogCopier extends AbstractScheduledService {
     }
 
     /**
-     * Copy changes for a single log file.
+     * Copy log files that have been rolled over.
      */
     private void copyChangesOfLogFile(Path srcFile, Path destFile) throws 
IOException {
       LOGGER.info("Copying changes from {} to {}", srcFile.toString(), 
destFile.toString());
@@ -516,7 +516,6 @@ public class LogCopier extends AbstractScheduledService {
         return;
       }
 
-      // We need to use fsDataInputStream in the finally clause so it has to 
be defined outside try-catch-finally
       FSDataInputStream fsDataInputStream = null;
 
       try (Closer closer = Closer.create()) {
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
index d6da93e..2a0410e 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/AbstractYarnAppSecurityManager.java
@@ -53,7 +53,7 @@ import org.apache.gobblin.util.ExecutorsUtils;
 /**
  * <p>
  *   The super class for key management
- *   This class uses a scheduled task to do re-login to refetch token on a
+ *   This class uses a scheduled task to do re-login to re-fetch token on a
  *   configurable schedule. It also uses a second scheduled task
  *   to renew the delegation token after each login. Both the re-login 
interval and the token
  *   renewing interval are configurable.
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
index e0a66e8..6770510 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnAppLauncher.java
@@ -656,6 +656,10 @@ public class GobblinYarnAppLauncher {
   private void addLibJars(Path srcLibJarDir, Optional<Map<String, 
LocalResource>> resourceMap, Path destDir)
       throws IOException {
     FileSystem localFs = FileSystem.getLocal(this.yarnConfiguration);
+    if (! this.fs.exists(srcLibJarDir)) {
+      throw new IllegalStateException(String.format("The library directory[%s] 
are not being found, abort the application", srcLibJarDir));
+    }
+
     FileStatus[] libJarFiles = localFs.listStatus(srcLibJarDir);
     if (libJarFiles == null || libJarFiles.length == 0) {
       return;
@@ -687,9 +691,13 @@ public class GobblinYarnAppLauncher {
     for (String localFilePath : SPLITTER.split(localFilePathList)) {
       Path srcFilePath = new Path(localFilePath);
       Path destFilePath = new Path(destDir, srcFilePath.getName());
-      this.fs.copyFromLocalFile(srcFilePath, destFilePath);
-      if (resourceMap.isPresent()) {
-        YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, 
LocalResourceType.FILE, resourceMap.get());
+      if (fs.exists(srcFilePath)) {
+        this.fs.copyFromLocalFile(srcFilePath, destFilePath);
+        if (resourceMap.isPresent()) {
+          YarnHelixUtils.addFileAsLocalResource(this.fs, destFilePath, 
LocalResourceType.FILE, resourceMap.get());
+        }
+      } else {
+        LOGGER.warn(String.format("The request file %s doesn't exist", 
srcFilePath));
       }
     }
   }
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
index 224bad8..16c82a5 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnConfigurationKeys.java
@@ -88,6 +88,7 @@ public class GobblinYarnConfigurationKeys {
   public static final String TOKEN_RENEW_INTERVAL_IN_MINUTES = 
GOBBLIN_YARN_PREFIX + "token.renew.interval.minutes";
   public static final Long DEFAULT_TOKEN_RENEW_INTERVAL_IN_MINUTES = 
Long.MAX_VALUE;
   // Resource/dependencies configuration properties.
+  // Missing this configuration should throw fatal exceptions to avoid 
harder-to-debug situation from Yarn container side.
   public static final String LIB_JARS_DIR_KEY = GOBBLIN_YARN_PREFIX + 
"lib.jars.dir";
 
   public static final String LIB_JARS_DIR_NAME = "_libjars";
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
index 21858e0..64127ac 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/GobblinYarnLogSource.java
@@ -41,11 +41,6 @@ import org.apache.gobblin.util.logs.LogCopier;
 /**
  * A base class for container processes that are sources of Gobblin Yarn 
application logs.
  *
- * <p>
- *   The source log files are supposed to be on the local {@link FileSystem} 
and will
- *   be copied to a given destination {@link FileSystem}, which is typically 
HDFS.
- * </p>
- *
  * @author Yinan Li
  */
 class GobblinYarnLogSource {
@@ -67,6 +62,7 @@ class GobblinYarnLogSource {
 
   /**
    * Build a {@link LogCopier} instance used to copy the logs out from this 
{@link GobblinYarnLogSource}.
+   * TODO: This is duplicated to the 
org.apache.gobblin.yarn.GobblinYarnAppLauncher#buildLogCopier(com.typesafe.config.Config,
 org.apache.hadoop.fs.Path, org.apache.hadoop.fs.Path)
    *
    * @param config the {@link Config} use to create the {@link LogCopier}
    * @param containerId the {@link ContainerId} of the container the {@link 
LogCopier} runs in
diff --git 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
index 5be2a4b..0742f54 100644
--- 
a/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
+++ 
b/gobblin-yarn/src/main/java/org/apache/gobblin/yarn/YarnAutoScalingManager.java
@@ -196,6 +196,8 @@ public class YarnAutoScalingManager extends 
AbstractIdleService {
       // adjust the number of target containers based on the configured min 
and max container values.
       numTargetContainers = Math.max(this.minContainers, 
Math.min(this.maxContainers, numTargetContainers));
 
+      log.info("There are {} containers being requested", numTargetContainers);
+
       this.yarnService.requestTargetNumberOfContainers(numTargetContainers, 
inUseInstances);
     }
   }
diff --git 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
index 9c37748..0d2f3a5 100644
--- 
a/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
+++ 
b/gobblin-yarn/src/test/java/org/apache/gobblin/yarn/GobblinYarnAppLauncherTest.java
@@ -116,7 +116,7 @@ public class GobblinYarnAppLauncherTest implements 
HelixMessageTestBase {
 
   private final Closer closer = Closer.create();
 
-  private static void setEnv(String key, String value) {
+  public static void setEnv(String key, String value) {
     try {
       Map<String, String> env = System.getenv();
       Class<?> cl = env.getClass();
@@ -250,6 +250,10 @@ public class GobblinYarnAppLauncherTest implements 
HelixMessageTestBase {
    */
   @Test(enabled=false, groups = { "disabledOnTravis" }, dependsOnMethods = 
"testCreateHelixCluster")
   public void testSetupAndSubmitApplication() throws Exception {
+    HelixUtils.createGobblinHelixCluster(
+        
this.config.getString(GobblinClusterConfigurationKeys.ZK_CONNECTION_STRING_KEY),
+        
this.config.getString(GobblinClusterConfigurationKeys.HELIX_CLUSTER_NAME_KEY));
+
     this.gobblinYarnAppLauncher.startYarnClient();
     this.applicationId = 
this.gobblinYarnAppLauncher.setupAndSubmitApplication();
 

Reply via email to