http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/StopCommand.java
----------------------------------------------------------------------
diff --git 
a/drill-yarn/src/main/java/org/apache/drill/yarn/client/StopCommand.java 
b/drill-yarn/src/main/java/org/apache/drill/yarn/client/StopCommand.java
new file mode 100644
index 0000000..95f7bf3
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/StopCommand.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.client;
+
+import org.apache.drill.yarn.core.DoYUtil;
+import org.apache.drill.yarn.core.DrillOnYarnConfig;
+import org.apache.drill.yarn.core.YarnClientException;
+import org.apache.drill.yarn.core.YarnRMClient;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+
+import com.typesafe.config.Config;
+
+/**
+ * Perform a semi-graceful shutdown of the Drill-on-YARN AM. We send a message
+ * to the AM to request shutdown because the YARN-provided message just kills
+ * the AM. (There seems to be no way to get YARN to call its own
+ * AMRMClientAsync.CallbackHandler.onShutdownRequest message.) The AM, however,
+ * cannot gracefully shut down the drill-bits because Drill itself has no
+ * graceful shutdown. But, at least this technique gives the AM a fighting
+ * chance to do graceful shutdown in the future.
+ */
+
+public class StopCommand extends ClientCommand {
+  /**
+   * Poll the YARN RM to check the stop status of the AM. Periodically poll,
+   * waiting to get an app state that indicates app completion.
+   */
+
+  private static class StopMonitor {
+    StatusCommand.Reporter reporter;
+    private YarnApplicationState state;
+    private int pollWaitSec;
+    private int shutdownWaitSec;
+
+    StopMonitor(Config config, StatusCommand.Reporter reporter) {
+      this.reporter = reporter;
+      pollWaitSec = config.getInt(DrillOnYarnConfig.CLIENT_POLL_SEC);
+      if (pollWaitSec < 1) {
+        pollWaitSec = 1;
+      }
+      shutdownWaitSec = config.getInt(DrillOnYarnConfig.CLIENT_STOP_WAIT_SEC);
+    }
+
+    boolean run(boolean verbose) throws ClientException {
+      System.out.print("Stopping...");
+      try {
+        int attemptCount = shutdownWaitSec / pollWaitSec;
+        for (int attempt = 0; attempt < attemptCount; attempt++) {
+          if (!poll()) {
+            break;
+          }
+        }
+      } catch (ClientException e) {
+        System.out.println();
+        throw e;
+      }
+      if (reporter.isStopped()) {
+        System.out.println(" Stopped.");
+        reporter.showFinalStatus();
+        return true;
+      } else {
+        System.out.println();
+        System.out.println(
+            "Application Master is slow to stop, use YARN to check status.");
+        return false;
+      }
+    }
+
+    private boolean poll() throws ClientException {
+      try {
+        Thread.sleep(pollWaitSec * 1000);
+      } catch (InterruptedException e) {
+        return false;
+      }
+      reporter.getReport();
+      if (reporter.isStopped()) {
+        return false;
+      }
+      YarnApplicationState newState = reporter.getState();
+      if (newState == state) {
+        System.out.print(".");
+        return true;
+      }
+      updateState(newState);
+      return true;
+    }
+
+    private void updateState(YarnApplicationState newState) {
+      YarnApplicationState oldState = state;
+      state = newState;
+      if (oldState == null) {
+        return;
+      }
+      System.out.println();
+      System.out.print("Application State: ");
+      System.out.println(state.toString());
+      System.out.print("Stopping...");
+    }
+  }
+
+  private Config config;
+  private YarnRMClient client;
+
+  @Override
+  public void run() throws ClientException {
+    config = DrillOnYarnConfig.config();
+    client = getClient();
+    System.out
+        .println("Stopping Application ID: " + client.getAppId().toString());
+
+    // First get an application report to ensure that the AM is,
+    // in fact, running, and to get the HTTP endpoint.
+
+    StatusCommand.Reporter reporter = new StatusCommand.Reporter(client);
+    try {
+      reporter.getReport();
+    } catch (ClientException e) {
+      reporter = null;
+    }
+
+    // Handle the case of an already stopped app.
+
+    boolean stopped = true;
+    if (reporter == null || reporter.isStopped()) {
+      System.out.println("Application is not running.");
+    } else {
+      // Try to stop the server by sending a STOP REST request.
+
+      if (opts.force) {
+        System.out.println("Forcing shutdown");
+      } else {
+        stopped = gracefulStop(reporter.getAmUrl());
+      }
+
+      // If that did not work, then forcibly kill the AM.
+      // YARN will forcibly kill the AM's containers.
+      // Not pretty, but it works.
+
+      if (opts.force || !stopped) {
+        forcefulStop();
+      }
+
+      // Wait for the AM to stop. The AM may refuse to stop in
+      // the time allowed to wait.
+
+      stopped = new StopMonitor(config, reporter).run(opts.verbose);
+    }
+
+    // If the AM is gone because it started out dead or
+    // we killed it, then forget its App Id.
+
+    if (stopped) {
+      removeAppIdFile();
+    }
+  }
+
+  /**
+   * Do a graceful shutdown by using the AM's REST API call to request stop.
+   * Include the master key with the request to differentiate this request from
+   * accidental uses of the stop REST API.
+   *
+   * @param report
+   * @return
+   */
+
+  private boolean gracefulStop(String baseUrl) {
+    try {
+      if (DoYUtil.isBlank(baseUrl)) {
+        return false;
+      }
+      SimpleRestClient restClient = new SimpleRestClient();
+      String tail = "rest/stop";
+      String masterKey = config.getString(DrillOnYarnConfig.HTTP_REST_KEY);
+      if (!DoYUtil.isBlank(masterKey)) {
+        tail += "?key=" + masterKey;
+      }
+      if (opts.verbose) {
+        System.out.println("Stopping with POST " + baseUrl + "/" + tail);
+      }
+      String result = restClient.send(baseUrl, tail, true);
+      if (result.contains("\"ok\"")) {
+        return true;
+      }
+      System.err.println(
+          "Failed to stop the application master. Response = " + result);
+      return false;
+    } catch (ClientException e) {
+      System.err.println(e.getMessage());
+      System.out.println("Resorting to forced kill");
+      return false;
+    }
+  }
+
+  /**
+   * If the graceful approach did not work, resort to a forceful request. This
+   * asks the AM's NM to kill the AM process.
+   *
+   * @throws ClientException
+   */
+
+  private void forcefulStop() throws ClientException {
+    try {
+      client.killApplication();
+    } catch (YarnClientException e) {
+      throw new ClientException("Failed to stop application master", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/package-info.java
----------------------------------------------------------------------
diff --git 
a/drill-yarn/src/main/java/org/apache/drill/yarn/client/package-info.java 
b/drill-yarn/src/main/java/org/apache/drill/yarn/client/package-info.java
new file mode 100644
index 0000000..c03c2fa
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/package-info.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implements a "YARN client" for Drill-on-YARN. The client uploads files to
+ * DFS, then requests that YARN start the Application Master. Much fiddling
+ * about is required to support this, such as zipping up the user's 
configuration,
+ * creating a local file with the app id so we can get app status and shut down
+ * the app, etc.
+ * <p>
+ * Divided into a main program ({@link DrillOnYarn}) and a series of commands.
+ * Some commands are further divided into tasks. Builds on the
+ * YARN and DFS facades defined in the core module.
+ */
+
+package org.apache.drill.yarn.client;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/AppSpec.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/AppSpec.java 
b/drill-yarn/src/main/java/org/apache/drill/yarn/core/AppSpec.java
new file mode 100644
index 0000000..34d4ad1
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/AppSpec.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.core;
+
+import java.io.IOException;
+import java.io.PrintStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Abstract description of a remote process launch that describes the many
+ * details needed to launch a process on a remote node. The YARN launch
+ * specification is a mess to work with; this class provides a simpler facade 
to
+ * gather the information, then turns around and builds the required YARN
+ * object.
+ * <p>
+ * Based on <a href="https://github.com/hortonworks/simple-yarn-app";>Simple 
YARN
+ * App</a>.
+ */
+
+public class AppSpec extends LaunchSpec {
+
+  static final private Log LOG = LogFactory.getLog(LaunchSpec.class);
+
+  /**
+   * The memory required in the allocated container, in MB.
+   */
+
+  public int memoryMb;
+
+  /**
+   * The number of YARN "vcores" (roughly equivalent to CPUs) to allocate to 
the
+   * process.
+   */
+
+  public int vCores = 1;
+
+  /**
+   * The number of disk resources (that is, disk channels) used by the process.
+   * Available only on some YARN distributions. Fractional values allowed.
+   */
+
+  public double disks;
+
+  /**
+   * The name of the application given to YARN. Appears in the YARN admin UI.
+   */
+
+  public String appName;
+
+  /**
+   * The YARN queue in which to place the application launch request.
+   */
+
+  public String queueName = "default";
+
+  public int priority = 1;
+
+  /**
+   * Whether to run the AM in unmanaged mode. Leave this false for production
+   * code.
+   */
+
+  public boolean unmanaged;
+
+  /**
+   * Optional node label expression for the launch. Selects the nodes on which
+   * the task can run.
+   */
+
+  public String nodeLabelExpr;
+
+  /**
+   * Given this generic description of an application, create the detailed YARN
+   * application submission context required to launch the application.
+   *
+   * @param conf
+   *          the YARN configuration obtained by reading the Hadoop
+   *          configuration files
+   * @param app
+   *          the YARN definition of the client application to be populated 
from
+   *          this generic description
+   * @return the completed application launch context for the given application
+   * @throws IOException
+   *           if localized resources are not found in the distributed file
+   *           system (such as HDFS)
+   */
+
+  public ApplicationSubmissionContext createAppLaunchContext(
+      YarnConfiguration conf, YarnClientApplication app) throws IOException {
+    ContainerLaunchContext amContainer = createLaunchContext(conf);
+
+    // Finally, set-up ApplicationSubmissionContext for the application
+    ApplicationSubmissionContext appContext = app
+        .getApplicationSubmissionContext();
+    appContext.setApplicationName(appName); // application name
+    appContext.setAMContainerSpec(amContainer);
+    appContext.setResource(getCapability());
+    appContext.setQueue(queueName); // queue
+    appContext.setPriority(Priority.newInstance(priority));
+    if (!DoYUtil.isBlank(nodeLabelExpr)) {
+      LOG.info(
+          "Requesting to run the AM using node expression: " + nodeLabelExpr);
+      appContext.setNodeLabelExpression(nodeLabelExpr);
+    }
+
+    appContext.setUnmanagedAM(unmanaged);
+
+    // Only try the AM once. It will fail if things are misconfigured. Retrying
+    // is unlikely
+    // to fix the configuration problem.
+
+    appContext.setMaxAppAttempts(1);
+
+    // TODO: Security tokens
+
+    return appContext;
+  }
+
+  public Resource getCapability() {
+
+    // Set up resource type requirements for ApplicationMaster
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(memoryMb);
+    capability.setVirtualCores(vCores);
+    DoYUtil.callSetDiskIfExists(capability, disks);
+    return capability;
+  }
+
+  @Override
+  public void dump(PrintStream out) {
+    out.print("Memory (MB): ");
+    out.println(memoryMb);
+    out.print("Vcores: ");
+    out.println(vCores);
+    out.print("Disks: ");
+    out.println(disks);
+    out.print("Application Name: ");
+    out.println(appName);
+    out.print("Queue: ");
+    out.println(queueName);
+    out.print("Priority: ");
+    out.println(priority);
+    super.dump(out);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/ClusterDef.java
----------------------------------------------------------------------
diff --git 
a/drill-yarn/src/main/java/org/apache/drill/yarn/core/ClusterDef.java 
b/drill-yarn/src/main/java/org/apache/drill/yarn/core/ClusterDef.java
new file mode 100644
index 0000000..223b606
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/ClusterDef.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.core;
+
+import java.io.PrintStream;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.yarn.appMaster.TaskSpec;
+import org.mortbay.log.Log;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigList;
+import com.typesafe.config.ConfigValue;
+
+public class ClusterDef {
+  // The following keys are relative to the cluster group definition
+
+  public static final String GROUP_NAME = "name";
+  public static final String GROUP_TYPE = "type";
+  public static final String GROUP_SIZE = "count";
+
+  // For the labeled pool
+
+  public static final String DRILLBIT_LABEL = "drillbit-label-expr";
+  public static final String AM_LABEL = "am-label-expr";
+
+  /**
+   * Defined cluster tier types. The value of the type appears as the value of
+   * the {@link $CLUSTER_TYPE} parameter in the config file.
+   */
+
+  public enum GroupType {
+    BASIC,
+    LABELED;
+
+    public static GroupType toEnum(String value) {
+      return GroupType.valueOf( value.toUpperCase() );
+    }
+
+    public String toValue() {
+      return name().toLowerCase();
+    }
+  }
+
+  public static class ClusterGroup {
+    private final String name;
+    private final int count;
+    private final GroupType type;
+
+    public ClusterGroup( Map<String, Object> group, int index, GroupType type 
) {
+      this.type = type;
+
+      // Config system has already parsed the value. We insist that the value,
+      // when parsed, was interpreted as an integer. That is, the value had
+      // to be, say 10. Not "10", not 10.0, but just a plain integer.
+
+      try {
+        count = (Integer) group.get(GROUP_SIZE);
+      } catch (ClassCastException e) {
+        throw new IllegalArgumentException(
+            "Expected an integer for " + GROUP_SIZE + " for tier " + index);
+      }
+      Object nameValue = group.get(GROUP_NAME);
+      String theName = null;
+      if (nameValue != null) {
+        theName = nameValue.toString();
+      }
+      if (DoYUtil.isBlank(theName)) {
+        theName = "tier-" + Integer.toString(index);
+      }
+      name = theName;
+    }
+
+
+    public String getName( ) { return name; }
+    public int getCount( ) { return count; }
+    public GroupType getType( ) { return type; }
+
+    public void getPairs(int index, List<NameValuePair> pairs) {
+      String key = DrillOnYarnConfig.append(DrillOnYarnConfig.CLUSTERS,
+          Integer.toString(index));
+      addPairs(pairs, key);
+    }
+
+    protected void addPairs(List<NameValuePair> pairs, String key) {
+      pairs.add(
+          new NameValuePair(DrillOnYarnConfig.append(key, GROUP_NAME), name));
+      pairs.add(
+          new NameValuePair(DrillOnYarnConfig.append(key, GROUP_TYPE), type));
+      pairs.add(
+          new NameValuePair(DrillOnYarnConfig.append(key, GROUP_SIZE), count));
+    }
+
+    public void dump(String prefix, PrintStream out) {
+      out.print(prefix);
+      out.print("name = ");
+      out.println(name);
+      out.print(prefix);
+      out.print("type = ");
+      out.println(type.toValue());
+      out.print(prefix);
+      out.print("count = ");
+      out.println(count);
+    }
+
+    public void modifyTaskSpec(TaskSpec taskSpec) {
+    }
+  }
+
+  public static class BasicGroup extends ClusterGroup {
+
+    public BasicGroup(Map<String, Object> pool, int index) {
+      super(pool, index, GroupType.BASIC);
+    }
+
+  }
+
+  public static class LabeledGroup extends ClusterGroup {
+
+    private final String drillbitLabelExpr;
+
+    public LabeledGroup(Map<String, Object> pool, int index) {
+      super(pool, index, GroupType.LABELED);
+      drillbitLabelExpr = (String) pool.get(DRILLBIT_LABEL);
+      if (drillbitLabelExpr == null) {
+        Log.warn("Labeled pool is missing the drillbit label expression ("
+            + DRILLBIT_LABEL + "), will treat pool as basic.");
+      }
+    }
+
+    public String getLabelExpr( ) { return drillbitLabelExpr; }
+
+    @Override
+    public void dump(String prefix, PrintStream out) {
+      out.print(prefix);
+      out.print("Drillbit label expr = ");
+      out.println((drillbitLabelExpr == null) ? "<none>" : drillbitLabelExpr);
+    }
+
+    @Override
+    protected void addPairs(List<NameValuePair> pairs, String key) {
+      super.addPairs(pairs, key);
+      pairs.add(new NameValuePair(DrillOnYarnConfig.append(key, 
DRILLBIT_LABEL),
+          drillbitLabelExpr));
+    }
+
+    @Override
+    public void modifyTaskSpec(TaskSpec taskSpec) {
+      taskSpec.containerSpec.nodeLabelExpr = drillbitLabelExpr;
+    }
+  }
+
+  /**
+   * Deserialize a node tier from the configuration file.
+   *
+   * @param n
+   * @return
+   */
+
+  public static ClusterGroup getCluster(Config config, int n) {
+    int index = n + 1;
+    ConfigList tiers = config.getList(DrillOnYarnConfig.CLUSTERS);
+    ConfigValue value = tiers.get(n);
+    if ( value == null ) {
+      throw new IllegalArgumentException( "If cluster group is provided, it 
cannot be null: group " + index );
+    }
+    @SuppressWarnings("unchecked")
+    Map<String, Object> tier = (Map<String, Object>) value.unwrapped();
+    String type;
+    try {
+      type = tier.get(GROUP_TYPE).toString();
+    } catch (NullPointerException e) {
+      throw new IllegalArgumentException(
+          "Pool type is required for cluster group " + index);
+    }
+    GroupType groupType = GroupType.toEnum(type);
+    if (groupType == null) {
+      throw new IllegalArgumentException(
+          "Undefined type for cluster group " + index + ": " + type);
+    }
+    ClusterGroup tierDef;
+    switch (groupType) {
+    case BASIC:
+      tierDef = new BasicGroup( tier, index );
+      break;
+    case LABELED:
+      tierDef = new LabeledGroup( tier, index );
+      break;
+    default:
+      assert false;
+      throw new IllegalStateException(
+          "Undefined cluster group type: " + groupType);
+    }
+    return tierDef;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/ContainerRequestSpec.java
----------------------------------------------------------------------
diff --git 
a/drill-yarn/src/main/java/org/apache/drill/yarn/core/ContainerRequestSpec.java 
b/drill-yarn/src/main/java/org/apache/drill/yarn/core/ContainerRequestSpec.java
new file mode 100644
index 0000000..99a22d7
--- /dev/null
+++ 
b/drill-yarn/src/main/java/org/apache/drill/yarn/core/ContainerRequestSpec.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.core;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.drill.yarn.appMaster.Scheduler;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Describes a container request in terms of priority, memory, cores and
+ * placement preference. This is a simplified version of the YARN
+ * ContainerRequest structure. This structure is easier to use within the app,
+ * then is translated to the YARN structure when needed.
+ */
+
+public class ContainerRequestSpec {
+  static final Log LOG = LogFactory.getLog(ContainerRequestSpec.class);
+
+  /**
+   * Application-specific priority. Drill-on-Yarn uses the priority to 
associate
+   * YARN requests with a {@link Scheduler}. When the resource allocation
+   * arrives, we use the priority to trace back to the scheduler that requested
+   * it, and from there to the task to be run in the allocation.
+   * <p>
+   * For this reason, the priority is set by the Drill-on-YARN application; it
+   * is not a user-adjustable value.
+   */
+
+  public int priority = 0;
+
+  /**
+   * Memory, in MB, required by the container.
+   */
+
+  public int memoryMb;
+
+  /**
+   * Number of "virtual cores" required by the task. YARN allocates whole CPU
+   * cores and does not support fractional allocations.
+   */
+
+  public int vCores = 1;
+
+  /**
+   * Number of virtual disks (channels, spindles) to request. Not supported in
+   * Apache YARN, is supported in selected distributions.
+   */
+
+  public double disks;
+
+  /**
+   * Node label expression to apply to this request.
+   */
+
+  public String nodeLabelExpr;
+
+  public List<String> racks = new ArrayList<>();
+  public List<String> hosts = new ArrayList<>();
+
+  /**
+   * Create a YARN ContainerRequest object from the information in this object.
+   *
+   * @return
+   */
+  public ContainerRequest makeRequest() {
+    assert memoryMb != 0;
+
+    Priority priorityRec = Records.newRecord(Priority.class);
+    priorityRec.setPriority(priority);
+
+    Resource capability = Records.newRecord(Resource.class);
+    capability.setMemory(memoryMb);
+    capability.setVirtualCores(vCores);
+    DoYUtil.callSetDiskIfExists(capability, disks);
+
+    boolean relaxLocality = true;
+    String nodeArr[] = null;
+    if (!hosts.isEmpty()) {
+      nodeArr = new String[hosts.size()];
+      hosts.toArray(nodeArr);
+      relaxLocality = false;
+    }
+    String rackArr[] = null;
+    if (!racks.isEmpty()) {
+      nodeArr = new String[racks.size()];
+      racks.toArray(rackArr);
+      relaxLocality = false;
+    }
+    String nodeExpr = null;
+    if (!DoYUtil.isBlank(nodeLabelExpr)) {
+      nodeExpr = nodeLabelExpr;
+      LOG.info("Requesting a container using node expression: " + nodeExpr);
+    }
+
+    // YARN is fragile. To (potentially) pass a node expression, we must use 
the
+    // 5-argument constructor. The fourth argument (relax locality) MUST be set
+    // to true if we omit the rack and node specs. (Else we get a runtime
+    // error.
+
+    return new ContainerRequest(capability, nodeArr, rackArr, priorityRec,
+        relaxLocality, nodeExpr);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/DfsFacade.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/DfsFacade.java 
b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DfsFacade.java
new file mode 100644
index 0000000..09e88ae
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DfsFacade.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.core;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.URL;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.typesafe.config.Config;
+
+/**
+ * Facade to the distributed file system (DFS) system that implements
+ * Drill-on-YARN related operations. Some operations are used by both the 
client
+ * and AM applications.
+ */
+
+public class DfsFacade {
+  public static class DfsFacadeException extends Exception {
+    private static final long serialVersionUID = 1L;
+
+    public DfsFacadeException(String msg) {
+      super(msg);
+    }
+
+    public DfsFacadeException(String msg, Exception e) {
+      super(msg, e);
+    }
+  }
+
+  private FileSystem fs;
+  private Configuration yarnConf;
+  private Config config;
+  private boolean localize;
+
+  public DfsFacade(Config config) {
+    this.config = config;
+    localize = config.getBoolean(DrillOnYarnConfig.LOCALIZE_DRILL);
+  }
+
+  public boolean isLocalized() {
+    return localize;
+  }
+
+  public void connect() throws DfsFacadeException {
+    loadYarnConfig();
+    String dfsConnection = config.getString(DrillOnYarnConfig.DFS_CONNECTION);
+    try {
+      if (DoYUtil.isBlank(dfsConnection)) {
+        fs = FileSystem.get(yarnConf);
+      } else {
+        URI uri;
+        try {
+          uri = new URI(dfsConnection);
+        } catch (URISyntaxException e) {
+          throw new DfsFacadeException(
+              "Illformed DFS connection: " + dfsConnection, e);
+        }
+        fs = FileSystem.get(uri, yarnConf);
+      }
+    } catch (IOException e) {
+      throw new DfsFacadeException("Failed to create the DFS", e);
+    }
+  }
+
+  /**
+   * Lazy loading of YARN configuration since it takes a long time to load.
+   * (YARN provides no caching, sadly.)
+   */
+
+  private void loadYarnConfig() {
+    if (yarnConf == null) {
+      yarnConf = new YarnConfiguration();
+      // On some distributions, lack of proper configuration causes
+      // DFS to default to the local file system. So, a local file
+      // system generally means that the config is wrong, or running
+      // the wrong build of Drill for the user's environment.
+      URI fsUri = FileSystem.getDefaultUri( yarnConf );
+      if(fsUri.toString().startsWith("file:/")) {
+        System.err.println("Warning: Default DFS URI is for a local file 
system: " + fsUri);
+      }
+    }
+  }
+
+  public static class Localizer {
+    private final DfsFacade dfs;
+    protected File localArchivePath;
+    protected Path dfsArchivePath;
+    FileStatus fileStatus;
+    private String label;
+
+    /**
+     * Resources to be localized (downloaded) to each AM or drillbit node.
+     */
+
+    public Localizer(DfsFacade dfs, File archivePath, String label) {
+      this(dfs, archivePath, dfs.getUploadPath(archivePath), label);
+    }
+
+    public Localizer(DfsFacade dfs, File archivePath, String destName,
+        String label) {
+      this(dfs, archivePath, dfs.getUploadPath(destName), label);
+    }
+
+    public Localizer(DfsFacade dfs, String destPath) {
+      this( dfs, null, new Path(destPath), null );
+    }
+
+    public Localizer(DfsFacade dfs, File archivePath, Path destPath, String 
label) {
+      this.dfs = dfs;
+      dfsArchivePath = destPath;
+      this.label = label;
+      localArchivePath = archivePath;
+    }
+
+    public String getBaseName() {
+      return localArchivePath.getName();
+    }
+
+    public String getDestPath() {
+      return dfsArchivePath.toString();
+    }
+
+    public void upload() throws DfsFacadeException {
+      dfs.uploadArchive(localArchivePath, dfsArchivePath, label);
+      fileStatus = null;
+    }
+
+    /**
+     * The client may check file status multiple times. Cache it here so we
+     * only retrieve the status once. Cache it here so that the client
+     * doen't have to do the caching.
+     *
+     * @return
+     * @throws DfsFacadeException
+     */
+
+    private FileStatus getStatus() throws DfsFacadeException {
+      if (fileStatus == null) {
+        fileStatus = dfs.getFileStatus(dfsArchivePath);
+      }
+      return fileStatus;
+    }
+
+    public void defineResources(Map<String, LocalResource> resources,
+        String key) throws DfsFacadeException {
+      // Put the application archive, visible to only the application.
+      // Because it is an archive, it will be expanded by YARN prior to launch
+      // of the AM.
+
+      LocalResource drillResource = dfs.makeResource(dfsArchivePath,
+          getStatus(), LocalResourceType.ARCHIVE,
+          LocalResourceVisibility.APPLICATION);
+      resources.put(key, drillResource);
+    }
+
+    public boolean filesMatch() {
+      FileStatus status;
+      try {
+        status = getStatus();
+      } catch (DfsFacadeException e) {
+
+        // An exception is DFS's way of tell us the file does
+        // not exist.
+
+        return false;
+      }
+      return status.getLen() == localArchivePath.length();
+    }
+
+    public String getLabel() {
+      return label;
+    }
+
+    public boolean destExists() throws IOException {
+      return dfs.exists(dfsArchivePath);
+    }
+  }
+
+  public boolean exists(Path path) throws IOException {
+    return fs.exists(path);
+  }
+
+  public Path getUploadPath(File localArchiveFile) {
+    return getUploadPath(localArchiveFile.getName());
+  }
+
+  public Path getUploadPath(String baseName) {
+    String dfsDirStr = config.getString(DrillOnYarnConfig.DFS_APP_DIR);
+
+    Path appDir;
+    if (dfsDirStr.startsWith("/")) {
+      appDir = new Path(dfsDirStr);
+    } else {
+      Path home = fs.getHomeDirectory();
+      appDir = new Path(home, dfsDirStr);
+    }
+    return new Path(appDir, baseName);
+  }
+
+  public void uploadArchive(File localArchiveFile, Path destPath, String label)
+      throws DfsFacadeException {
+    // Create the application upload directory if it does not yet exist.
+
+    String dfsDirStr = config.getString(DrillOnYarnConfig.DFS_APP_DIR);
+    Path appDir = new Path(dfsDirStr);
+    try {
+      // If the directory does not exist, create it, giving this user
+      // (only) read and write access.
+
+      if (!fs.isDirectory(appDir)) {
+        fs.mkdirs(appDir, new FsPermission(FsAction.READ_WRITE, FsAction.NONE, 
FsAction.NONE));
+      }
+    } catch (IOException e) {
+      throw new DfsFacadeException(
+          "Failed to create DFS directory: " + dfsDirStr, e);
+    }
+
+    // The file must be an archive type so YARN knows to extract its contents.
+
+    String baseName = localArchiveFile.getName();
+    if (DrillOnYarnConfig.findSuffix(baseName) == null) {
+      throw new DfsFacadeException(
+          label + " archive must be .tar.gz, .tgz or .zip: " + baseName);
+    }
+
+    Path srcPath = new Path(localArchiveFile.getAbsolutePath());
+
+    // Do the upload, replacing the old archive.
+
+    try {
+      // TODO: Specify file permissions and owner.
+
+      fs.copyFromLocalFile(false, true, srcPath, destPath);
+    } catch (IOException e) {
+      throw new DfsFacadeException(
+          "Failed to upload " + label + " archive to DFS: "
+              + localArchiveFile.getAbsolutePath() + " --> " + destPath,
+          e);
+    }
+  }
+
+  private FileStatus getFileStatus(Path dfsPath) throws DfsFacadeException {
+    try {
+      return fs.getFileStatus(dfsPath);
+    } catch (IOException e) {
+      throw new DfsFacadeException(
+          "Failed to get DFS status for file: " + dfsPath, e);
+    }
+  }
+
+  /**
+   * Create a local resource definition for YARN. A local resource is one that
+   * must be localized onto the remote node prior to running a command on that
+   * node.
+   * <p>
+   * YARN uses the size and timestamp are used to check if the file has changed
+   * on HDFS to check if YARN can use an existing copy, if any.
+   * <p>
+   * Resources are made public.
+   *
+   * @param conf
+   *          Configuration created from the Hadoop config files, in this case,
+   *          identifies the target file system.
+   * @param resourcePath
+   *          the path (relative or absolute) to the file on the configured 
file
+   *          system (usually HDFS).
+   * @return a YARN local resource records that contains information about 
path,
+   *         size, type, resource and so on that YARN requires.
+   * @throws IOException
+   *           if the resource does not exist on the configured file system
+   */
+
+  public LocalResource makeResource(Path dfsPath, FileStatus dfsFileStatus,
+      LocalResourceType type, LocalResourceVisibility visibility)
+      throws DfsFacadeException {
+    URL destUrl;
+    try {
+      destUrl = ConverterUtils.getYarnUrlFromPath(
+          FileContext.getFileContext().makeQualified(dfsPath));
+    } catch (UnsupportedFileSystemException e) {
+      throw new DfsFacadeException(
+          "Unable to convert dfs file to a URL: " + dfsPath.toString(), e);
+    }
+    LocalResource resource = LocalResource.newInstance(destUrl, type,
+        visibility, dfsFileStatus.getLen(),
+        dfsFileStatus.getModificationTime());
+    return resource;
+  }
+
+  public void removeDrillFile(String fileName) throws DfsFacadeException {
+    Path destPath = getUploadPath(fileName);
+    try {
+      fs.delete(destPath, false);
+    } catch (IOException e) {
+      throw new DfsFacadeException(
+          "Failed to delete file: " + destPath.toString(), e);
+    }
+
+    // Remove the Drill directory, but only if it is now empty.
+
+    Path dir = destPath.getParent();
+    try {
+      RemoteIterator<FileStatus> iter = fs.listStatusIterator(dir);
+      if (!iter.hasNext()) {
+        fs.delete(dir, false);
+      }
+    } catch (IOException e) {
+      throw new DfsFacadeException(
+          "Failed to delete directory: " + dir.toString(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/DoYUtil.java
----------------------------------------------------------------------
diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/DoYUtil.java 
b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DoYUtil.java
new file mode 100644
index 0000000..3c1d17d
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DoYUtil.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.core;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.CodeSource;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Container;
+
+public class DoYUtil {
+  static final private Log LOG = LogFactory.getLog(DoYUtil.class);
+
+  private DoYUtil() {
+  }
+
+  public static String join(String separator, List<String> list) {
+    StringBuilder buf = new StringBuilder();
+    String sep = "";
+    for (String item : list) {
+      buf.append(sep);
+      buf.append(item);
+      sep = separator;
+    }
+    return buf.toString();
+  }
+
+  public static void addNonEmpty(List<String> list, String value) {
+    if ( ! isBlank( value ) ) {
+      list.add(value.trim( ));
+    }
+  }
+
+  public static boolean isBlank(String str) {
+    return str == null || str.trim().isEmpty();
+  }
+
+  public static String toIsoTime(long timestamp) {
+
+    // Uses old-style dates rather than java.time because
+    // the code still must compile for JDK 7.
+
+    DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    fmt.setTimeZone(TimeZone.getDefault());
+    return fmt.format(new Date(timestamp));
+  }
+
+  public static String labelContainer(Container container) {
+    StringBuilder buf = new StringBuilder()
+        .append("[id: ")
+        .append(container.getId())
+        .append(", host: ")
+        .append(container.getNodeId().getHost())
+        .append(", priority: ")
+        .append(container.getPriority())
+        .append("]");
+    return buf.toString();
+  }
+
+  /**
+   * Utility method to display YARN container information in a useful way for
+   * log messages.
+   *
+   * @param container
+   * @return
+   */
+
+  public static String describeContainer(Container container) {
+    StringBuilder buf = new StringBuilder()
+        .append("[id: ")
+        .append(container.getId())
+        .append(", host: ")
+        .append(container.getNodeId().getHost())
+        .append(", priority: ")
+        .append(container.getPriority())
+        .append(", memory: ")
+        .append(container.getResource().getMemory())
+        .append(" MB, vcores: ")
+        .append(container.getResource().getVirtualCores())
+        .append("]");
+    return buf.toString();
+  }
+
+  /**
+   * The tracking URL given to YARN is a redirect URL. When giving the URL to
+   * the user, "unwrap" that redirect URL to get the actual site URL.
+   *
+   * @param trackingUrl
+   * @return
+   */
+
+  public static String unwrapAmUrl(String trackingUrl) {
+    return trackingUrl.replace("/redirect", "/");
+  }
+
+  public static Object dynamicCall(Object target, String fnName, Object args[],
+      Class<?> types[]) {
+
+    // First, look for the method using the names and types provided.
+
+    final String methodLabel = target.getClass().getName() + "." + fnName;
+    Method m;
+    try {
+      m = target.getClass().getMethod(fnName, types);
+    } catch (NoSuchMethodException e) {
+
+      // Ignore, but log: the method does not exist in this distribution.
+
+      StringBuilder buf = new StringBuilder();
+      if (types != null) {
+        String sep = "";
+        for (Class<?> type : types) {
+          buf.append(sep);
+          buf.append(type.getName());
+          sep = ",";
+        }
+      }
+      LOG.trace("Not supported in this YARN distribution: " + methodLabel + "("
+          + buf.toString() + ")");
+      CodeSource src = target.getClass().getProtectionDomain().getCodeSource();
+      if (src != null) {
+        java.net.URL jar = src.getLocation();
+        LOG.trace("Class found in URL: " + jar.toString());
+      }
+      return null;
+    } catch (SecurityException e) {
+      LOG.error("Security prevents dynamic method calls", e);
+      return null;
+    }
+
+    // Next, call the method with the arguments provided.
+
+    Object ret = null;
+    try {
+      ret = m.invoke(target, args);
+    } catch (IllegalAccessException | IllegalArgumentException
+        | InvocationTargetException e) {
+      LOG.error("Failed to dynamically call " + methodLabel, e);
+      return null;
+    }
+    StringBuilder buf = new StringBuilder();
+    if (args != null) {
+      String sep = "";
+      for (Object arg : args) {
+        buf.append(sep);
+        buf.append(arg == null ? "null" : arg.toString());
+        sep = ",";
+      }
+    }
+    LOG.trace(
+        "Successfully called " + methodLabel + "( " + buf.toString() + ")");
+
+    // Return any return value. Will be null if the method is returns void.
+
+    return ret;
+  }
+
+  public static void callSetDiskIfExists(Object target, double arg) {
+    dynamicCall(target, "setDisks", new Object[] { arg },
+        new Class<?>[] { Double.TYPE });
+  }
+
+  public static double callGetDiskIfExists(Object target) {
+    Object ret = dynamicCall(target, "getDisks", null, null);
+    return (ret == null) ? 0.0 : (Double) ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/DoyConfigException.java
----------------------------------------------------------------------
diff --git 
a/drill-yarn/src/main/java/org/apache/drill/yarn/core/DoyConfigException.java 
b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DoyConfigException.java
new file mode 100644
index 0000000..422b89b
--- /dev/null
+++ 
b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DoyConfigException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.core;
+
+public class DoyConfigException extends Exception {
+  private static final long serialVersionUID = 1L;
+
+  public DoyConfigException(String msg) {
+    super(msg);
+  }
+
+  public DoyConfigException(String msg, Exception e) {
+    super(msg, e);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/DrillOnYarnConfig.java
----------------------------------------------------------------------
diff --git 
a/drill-yarn/src/main/java/org/apache/drill/yarn/core/DrillOnYarnConfig.java 
b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DrillOnYarnConfig.java
new file mode 100644
index 0000000..38ecd1c
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DrillOnYarnConfig.java
@@ -0,0 +1,841 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.core;
+
+import java.io.File;
+import java.io.PrintStream;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.drill.common.config.CommonConstants;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.scanner.ClassPathScanner;
+import org.apache.drill.common.scanner.persistence.ScanResult;
+import org.apache.drill.exec.ExecConstants;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigException;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Configuration used within the Drill-on-YARN code. Configuration comes from 
four
+ * sources (in order of precedence):
+ * <ol>
+ * <li>System properties</li>
+ * <li>$SITE_DIR/drill-on-yarn.conf</li>
+ * <li>Distribution-specific properties in 
$SITE_HOME/conf/doy-distrib.conf</li>
+ * <li>Drill-on-YARN defaults in drill-on-yarn-defaults.conf. (Which should be
+ * disjoint from the Drill settings.)</li>
+ * <li>Drill properties (via the Drill override system)</li>
+ * </ol>
+ * <p>
+ * Defines constants for each property, including some defined in Drill. This 
provides
+ * a uniform property access interface even if some properties migrate between 
DoY and
+ * Drill proper.
+ */
+
+public class DrillOnYarnConfig {
+  public static final String DEFAULTS_FILE_NAME = 
"drill-on-yarn-defaults.conf";
+  public static final String DISTRIB_FILE_NAME = "doy-distrib.conf";
+  public static final String CONFIG_FILE_NAME = "drill-on-yarn.conf";
+
+  public static final String DRILL_ON_YARN_PARENT = "drill.yarn";
+  public static final String DOY_CLIENT_PARENT = append(DRILL_ON_YARN_PARENT, 
"client");
+  public static final String DOY_AM_PARENT = append(DRILL_ON_YARN_PARENT, 
"am");
+  public static final String DOY_DRILLBIT_PARENT = 
append(DRILL_ON_YARN_PARENT, "drillbit");
+  public static final String FILES_PARENT = append(DRILL_ON_YARN_PARENT, 
"drill-install");
+  public static final String DFS_PARENT = append(DRILL_ON_YARN_PARENT, "dfs");
+  public static final String HTTP_PARENT = append(DRILL_ON_YARN_PARENT, 
"http");
+  public static final String YARN_PARENT = append(DRILL_ON_YARN_PARENT, 
"yarn");
+  public static final String HADOOP_PARENT = append(DRILL_ON_YARN_PARENT, 
"hadoop");
+  public static final String CLIENT_PARENT = append(DRILL_ON_YARN_PARENT, 
"client");
+
+  public static final String APP_NAME = append(DRILL_ON_YARN_PARENT, 
"app-name");
+  public static final String CLUSTER_ID = ExecConstants.SERVICE_NAME;
+
+  public static final String DFS_CONNECTION = append(DFS_PARENT, "connection");
+  public static final String DFS_APP_DIR = append(DFS_PARENT, "app-dir");
+
+  public static final String YARN_QUEUE = append(YARN_PARENT, "queue");
+  public static final String YARN_PRIORITY = append(YARN_PARENT, "priority");
+
+  public static final String DRILL_ARCHIVE_PATH = append(FILES_PARENT, 
"client-path");
+  public static final String DRILL_DIR_NAME = append(FILES_PARENT, "dir-name");
+
+  /**
+   * Key used for the Drill archive file in the AM launch config.
+   */
+
+  public static final String DRILL_ARCHIVE_KEY = append(FILES_PARENT, 
"drill-key");
+  public static final String SITE_ARCHIVE_KEY = append(FILES_PARENT, 
"site-key");
+  public static final String LOCALIZE_DRILL = append(FILES_PARENT, "localize");
+  public static final String CONF_AS_SITE = append(FILES_PARENT, 
"conf-as-site");
+  public static final String DRILL_HOME = append(FILES_PARENT, "drill-home");
+  public static final String SITE_DIR = append(FILES_PARENT, "site-dir");
+  public static final String JAVA_LIB_PATH = append(FILES_PARENT, 
"library-path");
+
+  public static final String HADOOP_HOME = append(HADOOP_PARENT, "home");
+  public static final String HADOOP_CLASSPATH = append(HADOOP_PARENT, 
"class-path");
+  public static final String HBASE_CLASSPATH = append(HADOOP_PARENT, 
"hbase-class-path");
+
+  public static final String MEMORY_KEY = "memory-mb";
+  public static final String VCORES_KEY = "vcores";
+  public static final String DISKS_KEY = "disks";
+  public static final String VM_ARGS_KEY = "vm-args";
+  public static final String HEAP_KEY = "heap";
+
+  public static final String AM_MEMORY = append(DOY_AM_PARENT, MEMORY_KEY);
+  public static final String AM_VCORES = append(DOY_AM_PARENT, VCORES_KEY);
+  public static final String AM_DISKS = append(DOY_AM_PARENT, DISKS_KEY);
+  public static final String AM_NODE_LABEL_EXPR = append(DOY_AM_PARENT, 
"node-label-expr");
+  public static final String AM_HEAP = append(DOY_AM_PARENT, HEAP_KEY);
+  public static final String AM_VM_ARGS = append(DOY_AM_PARENT, VM_ARGS_KEY);
+  public static final String AM_POLL_PERIOD_MS = append(DOY_AM_PARENT, 
"poll-ms");
+  public static final String AM_TICK_PERIOD_MS = append(DOY_AM_PARENT, 
"tick-ms");
+  public static final String AM_PREFIX_CLASSPATH = append(DOY_AM_PARENT, 
"prefix-class-path");
+  public static final String AM_CLASSPATH = append(DOY_AM_PARENT, 
"class-path");
+  public static final String AM_DEBUG_LAUNCH = append(DOY_AM_PARENT, 
"debug-launch");
+  public static final String AM_ENABLE_AUTO_SHUTDOWN = append(DOY_AM_PARENT, 
"auto-shutdown");
+
+  public static final String DRILLBIT_MEMORY = append(DOY_DRILLBIT_PARENT, 
MEMORY_KEY);
+  public static final String DRILLBIT_VCORES = append(DOY_DRILLBIT_PARENT, 
VCORES_KEY);
+  public static final String DRILLBIT_DISKS = append(DOY_DRILLBIT_PARENT, 
DISKS_KEY);
+  public static final String DRILLBIT_VM_ARGS = append(DOY_DRILLBIT_PARENT, 
VM_ARGS_KEY);
+  public static final String DRILLBIT_HEAP = append(DOY_DRILLBIT_PARENT, 
HEAP_KEY);
+  public static final String DRILLBIT_DIRECT_MEM = append(DOY_DRILLBIT_PARENT, 
"max-direct-memory");
+  public static final String DRILLBIT_CODE_CACHE = append(DOY_DRILLBIT_PARENT, 
"code-cache");
+  public static final String DRILLBIT_LOG_GC = append(DOY_DRILLBIT_PARENT, 
"log-gc");
+  public static final String DRILLBIT_PREFIX_CLASSPATH = append( 
DOY_DRILLBIT_PARENT, "prefix-class-path");
+  public static final String DRILLBIT_EXTN_CLASSPATH = append( 
DOY_DRILLBIT_PARENT, "extn-class-path");
+  public static final String DRILLBIT_CLASSPATH = append(DOY_DRILLBIT_PARENT, 
"class-path");
+  public static final String DRILLBIT_MAX_RETRIES = 
append(DOY_DRILLBIT_PARENT, "max-retries");
+  public static final String DRILLBIT_DEBUG_LAUNCH = 
append(DOY_DRILLBIT_PARENT, "debug-launch");
+  public static final String DRILLBIT_HTTP_PORT = ExecConstants.HTTP_PORT;
+  public static final String DISABLE_YARN_LOGS = append(DOY_DRILLBIT_PARENT, 
"disable-yarn-logs");
+  public static final String DRILLBIT_USER_PORT = 
ExecConstants.INITIAL_USER_PORT;
+  public static final String DRILLBIT_BIT_PORT = 
ExecConstants.INITIAL_BIT_PORT;
+  public static final String DRILLBIT_USE_HTTPS = 
ExecConstants.HTTP_ENABLE_SSL;
+  public static final String DRILLBIT_MAX_EXTRA_NODES = 
append(DOY_DRILLBIT_PARENT, "max-extra-nodes");
+  public static final String DRILLBIT_REQUEST_TIMEOUT_SEC = 
append(DOY_DRILLBIT_PARENT, "request-timeout-secs");
+
+  public static final String ZK_CONNECT = ExecConstants.ZK_CONNECTION;
+  public static final String ZK_ROOT = ExecConstants.ZK_ROOT;
+  public static final String ZK_FAILURE_TIMEOUT_MS = ExecConstants.ZK_TIMEOUT;
+  public static final String ZK_RETRY_COUNT = ExecConstants.ZK_RETRY_TIMES;
+  public static final String ZK_RETRY_DELAY_MS = ExecConstants.ZK_RETRY_DELAY;
+
+  // Names selected to be parallel to Drillbit HTTP config.
+
+  public static final String HTTP_ENABLED = append(HTTP_PARENT, "enabled");
+  public static final String HTTP_ENABLE_SSL = append(HTTP_PARENT, 
"ssl-enabled");
+  public static final String HTTP_PORT = append(HTTP_PARENT, "port");
+  public static final String HTTP_AUTH_TYPE = append(HTTP_PARENT, "auth-type");
+  public static final String HTTP_REST_KEY = append(HTTP_PARENT, "rest-key");
+  public static final String HTTP_SESSION_MAX_IDLE_SECS = append(HTTP_PARENT, 
"session-max-idle-secs");
+  public static final String HTTP_DOCS_LINK = append(HTTP_PARENT, "docs-link");
+  public static final String HTTP_REFRESH_SECS = append(HTTP_PARENT, 
"refresh-secs");
+  public static final String HTTP_USER_NAME = append(HTTP_PARENT, "user-name");
+  public static final String HTTP_PASSWORD = append(HTTP_PARENT, "password");
+
+  public static final String AUTH_TYPE_NONE = "none";
+  public static final String AUTH_TYPE_DRILL = "drill";
+  public static final String AUTH_TYPE_SIMPLE = "simple";
+
+  public static final String CLIENT_POLL_SEC = append(CLIENT_PARENT, 
"poll-sec");
+  public static final String CLIENT_START_WAIT_SEC = append(CLIENT_PARENT, 
"start-wait-sec");
+  public static final String CLIENT_STOP_WAIT_SEC = append(CLIENT_PARENT, 
"stop-wait-sec");
+
+  public static final String CLUSTERS = append(DRILL_ON_YARN_PARENT, 
"cluster");
+
+  /**
+   * Name of the subdirectory of the container directory that will hold
+   * localized Drill distribution files. This name must be consistent between 
AM
+   * launch request and AM launch, and between Drillbit launch request and
+   * Drillbit launch. This name is fixed; there is no reason for the user to
+   * change it as it is visible only in the YARN container environment.
+   */
+
+  public static String LOCAL_DIR_NAME = "drill";
+
+  // Environment variables used to pass information from the Drill-on-YARN
+  // Client to the AM, or from the AM to the Drillbit launch script.
+
+  public static final String APP_ID_ENV_VAR = "DRILL_AM_APP_ID";
+  public static final String DRILL_ARCHIVE_ENV_VAR = "DRILL_ARCHIVE";
+  public static final String SITE_ARCHIVE_ENV_VAR = "SITE_ARCHIVE";
+  public static final String DRILL_HOME_ENV_VAR = "DRILL_HOME";
+  public static final String DRILL_SITE_ENV_VAR = "DRILL_CONF_DIR";
+  public static final String AM_HEAP_ENV_VAR = "DRILL_AM_HEAP";
+  public static final String AM_JAVA_OPTS_ENV_VAR = "DRILL_AM_JAVA_OPTS";
+  public static final String DRILL_CLASSPATH_ENV_VAR = "DRILL_CLASSPATH";
+  public static final String DRILL_CLASSPATH_PREFIX_ENV_VAR = 
"DRILL_CLASSPATH_PREFIX";
+  public static final String DOY_LIBPATH_ENV_VAR = "DOY_JAVA_LIB_PATH";
+  public static final String DRILL_DEBUG_ENV_VAR = "DRILL_DEBUG";
+
+  /**
+   * Special value for the DRILL_DIR_NAME parameter to indicate to use the base
+   * name of the archive as the Drill home path.
+   */
+
+  private static final Object BASE_NAME_MARKER = "<base>";
+
+  /**
+   * The name of the Drill site archive stored in dfs. Since the archive is
+   * created by the client as a temp file, it's local name has no meaning; we
+   * use this standard name on dfs.
+   */
+
+  public static final String SITE_ARCHIVE_NAME = "site.tar.gz";
+
+  protected static DrillOnYarnConfig instance;
+  private File drillSite;
+  private File drillHome;
+  private static DrillConfig drillConfig;
+  private Config config;
+  private ScanResult classPathScan;
+
+  public static String append(String parent, String key) {
+    return parent + "." + key;
+  }
+
+  // Protected only to allow creating a test version of this class.
+
+  protected DrillOnYarnConfig( ) {
+  }
+
+  public static DrillOnYarnConfig load() throws DoyConfigException {
+    instance = new DrillOnYarnConfig();
+    instance.doLoad(Thread.currentThread().getContextClassLoader());
+    return instance;
+  }
+
+  /**
+   * Load the config.
+   * @param cl class loader to use for resource searches (except defaults).
+   * Allows test to specify a specialized version.
+   * <p>
+   * Implemented in a way that allows unit testing. The parseUrl( ) methods
+   * let us mock the files; the load( ) methods seem to not actually use the
+   * provided class loader.
+   *
+   * @throws DoyConfigException
+   */
+  protected void doLoad(ClassLoader cl) throws DoyConfigException {
+    Config drillConfig = loadDrillConfig();
+
+    // Resolution order, larger numbers take precedence.
+    // 1. Drill-on-YARN defaults.
+    // File is at root of the package tree.
+
+    URL url = DrillOnYarnConfig.class.getResource(DEFAULTS_FILE_NAME);
+    if (url == null) {
+      throw new IllegalStateException(
+          "Drill-on-YARN defaults file is required: " + DEFAULTS_FILE_NAME);
+    }
+    config = ConfigFactory.parseURL(url).withFallback(drillConfig);
+
+    // 2. Optional distribution-specific configuration-file.
+    // (Lets a vendor, for example, specify the default DFS upload location
+    // without tinkering with the user's own settings.
+
+    url = cl.getResource(DISTRIB_FILE_NAME);
+    if (url != null) {
+      config = ConfigFactory.parseURL(url).withFallback(config);
+    }
+
+    // 3. User's Drill-on-YARN configuration.
+    // Optional since defaults are fine & ZK comes from drill-override.conf.
+
+    url = cl.getResource(CONFIG_FILE_NAME);
+    if (url != null) {
+      config = ConfigFactory.parseURL(url).withFallback(config);
+    }
+
+    // 4. System properties
+    // Allows -Dfoo=bar on the command line.
+    // But, note that substitutions are NOT allowed in system properties!
+
+    config = ConfigFactory.systemProperties().withFallback(config);
+
+    // Resolution allows ${foo.bar} syntax in values, but only for values
+    // from config files, not from system properties.
+
+    config = config.resolve();
+  }
+
+  private static Config loadDrillConfig() {
+    drillConfig = DrillConfig
+        .create(CommonConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME);
+    return drillConfig.resolve();
+  }
+
+  public DrillConfig getDrillConfig() {
+    return drillConfig;
+  }
+
+  /**
+   * Return Drill's class path scan. This is used only in the main thread 
during
+   * initialization. Not needed by the client, so done in an unsynchronized,
+   * lazy fashion.
+   *
+   * @return
+   */
+
+  public ScanResult getClassPathScan() {
+    if (classPathScan == null) {
+      classPathScan = ClassPathScanner.fromPrescan(drillConfig);
+    }
+    return classPathScan;
+  }
+
+  /**
+   * Obtain Drill home from the DRILL_HOME environment variable set by
+   * drill-config.sh, which is called from drill-on-yarn.sh. When debugging,
+   * DRILL_HOME must be set in the environment.
+   * <p>
+   * This information is required only by the client to prepare for uploads to
+   * DFS.
+   *
+   * @throws DoyConfigException
+   */
+
+  public void setClientPaths() throws DoyConfigException {
+    setClientDrillHome();
+    setSiteDir();
+  }
+
+  private void setClientDrillHome() throws DoyConfigException {
+    // Try the environment variable that should have been
+    // set in drill-on-yarn.sh (for the client) or in the
+    // launch environment (for the AM.)
+
+    String homeDir = getEnv(DRILL_HOME_ENV_VAR);
+
+    // For ease in debugging, allow setting the Drill home in
+    // drill-on-yarn.conf.
+    // This setting is also used for a non-localized run.
+
+    if (DoYUtil.isBlank(homeDir)) {
+      homeDir = config.getString(DRILL_HOME);
+    }
+    if (DoYUtil.isBlank(homeDir)) {
+      throw new DoyConfigException(
+          "The DRILL_HOME environment variable must point to your Drill 
install.");
+    }
+    drillHome = new File(homeDir);
+  }
+
+  /**
+   * All environment variable access goes through this function to allow unit
+   * tests to replace this function to set test values. (The Java environment 
is
+   * immutable, so it is not possible for unit tests to change the actual
+   * environment.)
+   *
+   * @param key
+   * @return
+   */
+
+  protected String getEnv(String key) {
+    return System.getenv(key);
+  }
+
+  /**
+   * On both the client and the AM, the site directory is optional. If 
provided,
+   * it was set with the --config (or --site) option to the script that 
launched
+   * the client or AM. In both cases, the script sets the drill.yarn.siteDir
+   * system property (and leaks the DRILL_HOME environment variable.)
+   * <p>
+   * For ease of debugging, if neither of those are set, this method uses the
+   * location of the drill-on-yarn configuration file to infer the site
+   * directory.
+   * <p>
+   * On the client, the site directory will be the "original" directory that
+   * contains the user's "master" files. On the AM, the site directory is a
+   * localized version of the client directory. Because of the way tar works,
+   * both the client and AM site directories have the same name; though the 
path
+   * to that name obviously differs.
+   *
+   * @throws DoyConfigException
+   */
+
+  private void setSiteDir() throws DoyConfigException {
+    // The site directory is the one where the config file lives.
+    // This should have been set in an environment variable by the launch
+    // script.
+
+    String sitePath = getEnv("DRILL_CONF_DIR");
+    if (!DoYUtil.isBlank(sitePath)) {
+      drillSite = new File(sitePath);
+    } else {
+
+      // Otherwise, let's guess it from the config file. This version assists
+      // in debugging as it reduces setup steps.
+
+      ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+      if (classLoader == null) {
+        classLoader = DrillOnYarnConfig.class.getClassLoader();
+      }
+
+      URL url = 
classLoader.getResource(CommonConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME);
+      if (url == null) {
+        throw new DoyConfigException(
+            "Drill configuration file is missing: " + 
CommonConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME);
+      }
+      File confFile;
+      try {
+        java.nio.file.Path confPath = Paths.get(url.toURI());
+        confFile = confPath.toFile();
+      } catch (URISyntaxException e) {
+        throw new DoyConfigException(
+            "Invalid path to Drill-on-YARN configuration file: "
+                + url.toString(),
+            e);
+      }
+      drillSite = confFile.getParentFile();
+    }
+
+    // Verify that the site directory is not just $DRILL_HOME/conf.
+    // Since the calling script does not differentiate between the two cases.
+    // But, treat $DRILL_HOME/conf as the site directory if:
+    // 1. The conf-as-site property is true, or
+    // 2. The Drill archive resides within $DRILL_HOME.
+    //
+    // The above situations occur in certain distributions that
+    // ship the archive inside the site directory and don't use a
+    // site directory.
+
+    if (drillHome.equals(drillSite.getParentFile())
+        && !config.getBoolean(CONF_AS_SITE)) {
+      String drillArchivePath = config
+          .getString(DrillOnYarnConfig.DRILL_ARCHIVE_PATH);
+      if (!DoYUtil.isBlank(drillArchivePath)) {
+        File archiveFile = new File(drillArchivePath);
+        if (!archiveFile.isAbsolute() && !archiveFile.getAbsolutePath()
+            .startsWith(drillHome.getAbsolutePath())) {
+          drillSite = null;
+        }
+      }
+    }
+  }
+
+  /**
+   * Retrieve the AM Drill home location from the DRILL_HOME variable set in 
the
+   * drill-am.sh launch script.
+   *
+   * @throws DoyConfigException
+   */
+
+  public void setAmDrillHome() throws DoyConfigException {
+    String drillHomeStr = getEnv(DRILL_HOME_ENV_VAR);
+    drillHome = new File(drillHomeStr);
+    setSiteDir();
+  }
+
+  public Config getConfig() {
+    return instance.config;
+  }
+
+  public static DrillOnYarnConfig instance() {
+    assert instance != null;
+    return instance;
+  }
+
+  public static Config config() {
+    return instance().getConfig();
+  }
+
+  /**
+   * Return the Drill home on this machine as inferred from the config file
+   * contents or location.
+   *
+   * @return
+   */
+
+  public File getLocalDrillHome() {
+    return drillHome;
+  }
+
+  public void dump() {
+    dump(System.out);
+  }
+
+  private static final String keys[] = {
+    // drill.yarn
+
+    APP_NAME,
+    CLUSTER_ID,
+
+    // drill.yarn.dfs
+
+    DFS_CONNECTION,
+    DFS_APP_DIR,
+
+    // drill.yarn.hadoop
+
+    HADOOP_HOME,
+    HADOOP_CLASSPATH,
+    HBASE_CLASSPATH,
+
+    // drill.yarn.yarn
+
+    YARN_QUEUE,
+    YARN_PRIORITY,
+
+    // drill.yarn.drill-install
+
+    DRILL_ARCHIVE_PATH,
+    DRILL_DIR_NAME,
+    LOCALIZE_DRILL,
+    CONF_AS_SITE,
+    DRILL_HOME,
+    DRILL_ARCHIVE_KEY,
+    SITE_ARCHIVE_KEY,
+    JAVA_LIB_PATH,
+
+    // drill.yarn.client
+
+    CLIENT_POLL_SEC,
+    CLIENT_START_WAIT_SEC,
+    CLIENT_STOP_WAIT_SEC,
+
+    // drill.yarn.am
+
+    AM_MEMORY,
+    AM_VCORES,
+    AM_DISKS,
+    AM_NODE_LABEL_EXPR,
+    AM_VM_ARGS,
+    AM_HEAP,
+    AM_POLL_PERIOD_MS,
+    AM_TICK_PERIOD_MS,
+    AM_PREFIX_CLASSPATH,
+    AM_CLASSPATH,
+    AM_DEBUG_LAUNCH,
+    AM_ENABLE_AUTO_SHUTDOWN,
+
+    // drill.yarn.zk
+
+    ZK_CONNECT,
+    ZK_ROOT,
+    ZK_RETRY_COUNT,
+    ZK_RETRY_DELAY_MS,
+    ZK_FAILURE_TIMEOUT_MS,
+
+    // drill.yarn.drillbit
+
+    DRILLBIT_MEMORY,
+    DRILLBIT_VCORES,
+    DRILLBIT_DISKS,
+    DRILLBIT_VM_ARGS,
+    DRILLBIT_HEAP,
+    DRILLBIT_DIRECT_MEM,
+    DRILLBIT_CODE_CACHE,
+    DRILLBIT_PREFIX_CLASSPATH,
+    DRILLBIT_EXTN_CLASSPATH,
+    DRILLBIT_CLASSPATH,
+    DRILLBIT_MAX_RETRIES,
+    DRILLBIT_DEBUG_LAUNCH,
+    DRILLBIT_MAX_EXTRA_NODES,
+    DRILLBIT_REQUEST_TIMEOUT_SEC,
+    DISABLE_YARN_LOGS,
+    DRILLBIT_HTTP_PORT,
+    DRILLBIT_USER_PORT,
+    DRILLBIT_BIT_PORT,
+    DRILLBIT_USE_HTTPS,
+
+    // drill.yarn.http
+
+    HTTP_ENABLED,
+    HTTP_ENABLE_SSL,
+    HTTP_PORT,
+    HTTP_AUTH_TYPE,
+    HTTP_SESSION_MAX_IDLE_SECS,
+    HTTP_DOCS_LINK,
+    HTTP_REFRESH_SECS,
+    // Do not include AM_REST_KEY: it is supposed to be secret.
+    // Same is true of HTTP_USER_NAME and HTTP_PASSWORD
+  };
+
+  private static String envVars[] = {
+      APP_ID_ENV_VAR,
+      DRILL_HOME_ENV_VAR,
+      DRILL_SITE_ENV_VAR,
+      AM_HEAP_ENV_VAR,
+      AM_JAVA_OPTS_ENV_VAR,
+      DRILL_CLASSPATH_PREFIX_ENV_VAR,
+      DRILL_CLASSPATH_ENV_VAR,
+      DRILL_ARCHIVE_ENV_VAR,
+      SITE_ARCHIVE_ENV_VAR,
+      DRILL_DEBUG_ENV_VAR
+  };
+
+  private void dump(PrintStream out) {
+    for (String key : keys) {
+      out.print(key);
+      out.print(" = ");
+      try {
+        out.println(config.getString(key));
+      } catch (ConfigException.Missing e) {
+        out.println("<missing>");
+      }
+    }
+    out.print(CLUSTERS);
+    out.println("[");
+    for (int i = 0; i < clusterGroupCount(); i++) {
+      ClusterDef.ClusterGroup cluster = ClusterDef.getCluster(config, i);
+      out.print(i);
+      out.println(" = {");
+      cluster.dump("  ", out);
+      out.println("  }");
+    }
+    out.println("]");
+  }
+
+  public void dumpEnv(PrintStream out) {
+    out.print("environment");
+    out.println("[");
+    for (String envVar : envVars) {
+      String value = getEnv(envVar);
+      out.print(envVar);
+      out.print(" = ");
+      if (value == null) {
+        out.print("<unset>");
+      } else {
+        out.print("\"");
+        out.print(value);
+        out.print("\"");
+      }
+      out.println();
+    }
+    out.println("]");
+  }
+
+  public List<NameValuePair> getPairs() {
+    List<NameValuePair> pairs = new ArrayList<>();
+    for (String key : keys) {
+      pairs.add(new NameValuePair(key, config.getString(key)));
+    }
+    for (int i = 0; i < clusterGroupCount(); i++) {
+      ClusterDef.ClusterGroup pool = ClusterDef.getCluster(config, i);
+      pool.getPairs(i, pairs);
+    }
+
+    // Add environment variables as "pseudo" properties,
+    // prefixed with "envt.".
+
+    for (String envVar : envVars) {
+      pairs.add(new NameValuePair("envt." + envVar, getEnv(envVar)));
+    }
+    return pairs;
+  }
+
+  public static String clusterGroupKey(int index, String key) {
+    return CLUSTERS + "." + index + "." + key;
+  }
+
+  public int clusterGroupCount() {
+    return config.getList(CLUSTERS).size();
+  }
+
+  private static String suffixes[] = { ".tar.gz", ".tgz", ".zip" };
+
+  public static String findSuffix(String baseName) {
+    baseName = baseName.toLowerCase();
+    for (String extn : suffixes) {
+      if (baseName.endsWith(extn)) {
+        return extn;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Get the location of Drill home on a remote machine, relative to the
+   * container working directory. Used when constructing a launch context.
+   * Assumes either the absolute path from the config file, or a constructed
+   * path to the localized Drill on the remote node. YARN examples use "./foo"
+   * to refer to container resources. But, since we cannot be sure when such a
+   * path is evaluated, we explicitly use YARN's PWD environment variable to 
get
+   * the absolute path.
+   *
+   * @return the remote path, with the "$PWD" environment variable.
+   * @throws DoyConfigException
+   */
+
+  public String getRemoteDrillHome() throws DoyConfigException {
+    // If the application is not localized, then the user can tell us the 
remote
+    // path in the config file. Otherwise, we assume that the remote path is 
the
+    // same as the local path.
+
+    if (!config.getBoolean(LOCALIZE_DRILL)) {
+      String drillHomePath = config.getString(DRILL_HOME);
+      if (DoYUtil.isBlank(drillHomePath)) {
+        drillHomePath = drillHome.getAbsolutePath();
+      }
+      return drillHomePath;
+    }
+
+    // The application is localized. Work out the location within the container
+    // directory. The path starts with the "key" we specify when uploading the
+    // Drill archive; YARN expands the archive into a folder of that name.
+
+    String drillHome = "$PWD/" + config.getString(DRILL_ARCHIVE_KEY);
+
+    String home = config.getString(DRILL_DIR_NAME);
+    if (DoYUtil.isBlank(home)) {
+      // Assume the archive expands without a subdirectory.
+    }
+
+    // If the special "<base>" marker is used, assume that the path depends
+    // on the name of the archive, which we know from the config file.
+
+    else if (home.equals(BASE_NAME_MARKER)) {
+
+      // Otherwise, assume that the archive expands to a directory with the
+      // same name as the archive itself (minus the archive suffix.)
+
+      String drillArchivePath = config
+          .getString(DrillOnYarnConfig.DRILL_ARCHIVE_PATH);
+      if (DoYUtil.isBlank(drillArchivePath)) {
+        throw new DoyConfigException("Required config property not set: "
+            + DrillOnYarnConfig.DRILL_ARCHIVE_PATH);
+      }
+      File localArchiveFile = new File(drillArchivePath);
+      home = localArchiveFile.getName();
+      String suffix = findSuffix(home);
+      if (suffix == null) {
+        throw new DoyConfigException(DrillOnYarnConfig.DRILL_ARCHIVE_PATH
+            + " does not name a valid archive: " + drillArchivePath);
+      }
+      drillHome += "/" + home.substring(0, home.length() - suffix.length());
+    } else {
+      // If the user told us the name of the directory within the archive,
+      // use it.
+
+      drillHome += "/" + home;
+    }
+    return drillHome;
+  }
+
+  /**
+   * Get the optional remote site directory name. This name will include the
+   * absolute path for a non-localized application. It will return the path
+   * relative to the container for a localized application. In the localized
+   * case, the site archive is tar'ed relative to the site directory so that 
its
+   * contents are unarchived directly into the YARN-provided folder (with the
+   * name of the archive) key. That is, if the site directory on the client is
+   * /var/drill/my-site, the contents of the tar file will be
+   * "./drill-override.conf", etc., and the remote location is
+   * $PWD/site-key/drill-override.conf, where site-key is the key name used to
+   * localize the site archive.
+   *
+   * @return
+   */
+
+  public String getRemoteSiteDir() {
+    // If the application does not use a site directory, then return null.
+
+    if (!hasSiteDir()) {
+      return null;
+    }
+
+    // If the application is not localized, then use the remote site path
+    // provided in the config file. Otherwise, assume that the remote path
+    // is the same as the local path.
+
+    if (!config.getBoolean(LOCALIZE_DRILL)) {
+      String drillSitePath = config.getString(SITE_DIR);
+      if (DoYUtil.isBlank(drillSitePath)) {
+        drillSitePath = drillSite.getAbsolutePath();
+      }
+      return drillSitePath;
+    }
+
+    // Work out the site directory name as above for the Drill directory.
+    // The caller must include a archive subdirectory name if required.
+
+    return "$PWD/" + config.getString(SITE_ARCHIVE_KEY);
+  }
+
+  /**
+   * Return the app ID file to use for this client run. The file is in the
+   * directory that holds the site dir (if a site dir is used), else the
+   * directory that holds Drill home (otherwise.) Not that the file does NOT go
+   * into the site dir or Drill home as we upload these directories (via
+   * archives) to DFS so we don't want to change them by adding a file.
+   * <p>
+   * It turns out that Drill allows two distinct clusters to share the same ZK
+   * root and/or cluster ID (just not the same combination), so the file name
+   * contains both parts.
+   *
+   * @param clusterId
+   * @return
+   */
+
+  public File getLocalAppIdFile() {
+    String rootDir = config.getString(DrillOnYarnConfig.ZK_ROOT);
+    String clusterId = config.getString(DrillOnYarnConfig.CLUSTER_ID);
+    String key = rootDir + "-" + clusterId;
+    String appIdFileName = key + ".appid";
+    File appIdDir;
+    if (hasSiteDir()) {
+      appIdDir = drillSite.getParentFile();
+    } else {
+      appIdDir = drillHome.getParentFile();
+    }
+    return new File(appIdDir, appIdFileName);
+  }
+
+  public boolean hasSiteDir() {
+    return drillSite != null;
+  }
+
+  public File getLocalSiteDir() {
+    return drillSite;
+  }
+
+  /**
+   * Returns the DFS path to the localized Drill archive. This is an AM-only
+   * method as it relies on an environment variable set by the client. It is 
set
+   * only if the application is localized, it is not set for a non-localized
+   * run.
+   *
+   * @return
+   */
+
+  public String getDrillArchiveDfsPath() {
+    return getEnv(DrillOnYarnConfig.DRILL_ARCHIVE_ENV_VAR);
+  }
+
+  /**
+   * Returns the DFS path to the localized site archive. This is an AM-only
+   * method as it relies on an environment variable set by the client. This
+   * variable is optional; if not set then the AM can infer that the 
application
+   * does not use a site archive (configuration files reside in
+   * $DRILL_HOME/conf), or the application is not localized.
+   *
+   * @return
+   */
+
+  public String getSiteArchiveDfsPath() {
+    return getEnv(DrillOnYarnConfig.SITE_ARCHIVE_ENV_VAR);
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/LaunchSpec.java
----------------------------------------------------------------------
diff --git 
a/drill-yarn/src/main/java/org/apache/drill/yarn/core/LaunchSpec.java 
b/drill-yarn/src/main/java/org/apache/drill/yarn/core/LaunchSpec.java
new file mode 100644
index 0000000..6c22874
--- /dev/null
+++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/LaunchSpec.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.yarn.core;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Abstract description of a remote process launch that describes the many
+ * details needed to launch a process on a remote node.
+ * <p>
+ * Based on <a href="https://github.com/hortonworks/simple-yarn-app";>Simple 
YARN
+ * App</a>.
+ */
+
+public class LaunchSpec {
+  /**
+   * List of (key, file) pairs to be localized to the node before running the
+   * command. The file must exist in a distributed file system (such as HDFS)
+   * visible to both the client and remote node. Typically, the path is 
relative
+   * or absolute within the file system defined by the fs.defaultFS parameter 
in
+   * core-site.xml.
+   * <p>
+   * TODO: Can the value also be a URL such as
+   * <p>
+   * <code>hdfs://somehost:1234//path/to/file
+   * <p>
+   * The key is used as (what?).
+   */
+
+  public Map<String, LocalResource> resources = new HashMap<>();
+
+  /**
+   * Defines environment variables to be set on the remote host before 
launching
+   * the remote app. Note: do not set CLASSPATH here; use {@link #classPath}
+   * instead.
+   */
+
+  public Map<String, String> env = new HashMap<>();
+
+  /**
+   * Set to the name of the OS command to run when we wish to run a non-Java
+   * command.
+   */
+
+  public String command;
+
+  /**
+   * Set to the name of the Java main class (the one with the main method) when
+   * we wish to run a Java command.
+   */
+
+  public String mainClass;
+
+  /**
+   * Set to the application-specific class path for the Java application. These
+   * values are added to the Hadoop-provided values. These items are relative 
to
+   * (what?), use (what variables) to refer to the localized application
+   * directory.
+   */
+
+  public List<String> classPath = new ArrayList<>();
+
+  /**
+   * Optional VM arguments to pass to the JVM when running a Java class; 
ignored
+   * when running an OS command.
+   */
+
+  public List<String> vmArgs = new ArrayList<>();
+
+  /**
+   * Arguments to the remote command.
+   */
+
+  public List<String> cmdArgs = new ArrayList<>();
+
+  public LaunchSpec() {
+  }
+
+  /**
+   * Create the command line to run on the remote node. The command can either
+   * be a simple OS command (if the {@link #command} member is set) or can be a
+   * Java class (if the {@link #mainClass} member is set. If the command is
+   * Java, then we pass along optional Java VM arguments.
+   * <p>
+   * In all cases we append arguments to the command itself, and redirect 
stdout
+   * and stderr to log files.
+   *
+   * @return the complete command string
+   */
+
+  public String getCommand() {
+    List<String> cmd = new ArrayList<>();
+    if (command != null) {
+      cmd.add(command);
+    } else {
+      assert mainClass != null;
+
+      // JAVA_HOME is provided by YARN.
+
+      cmd.add(Environment.JAVA_HOME.$$() + "/bin/java");
+      cmd.addAll(vmArgs);
+      if (!classPath.isEmpty()) {
+        cmd.add("-cp");
+        cmd.add(DoYUtil.join(":", classPath));
+      }
+      cmd.add(mainClass);
+    }
+    cmd.addAll(cmdArgs);
+    cmd.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
+    cmd.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
+
+    // Java 8
+    // return String.join( " ", cmd );
+    return DoYUtil.join(" ", cmd);
+  }
+
+  /**
+   * Given this generic description of an application, create the detailed YARN
+   * application submission context required to launch the application.
+   *
+   * @param conf
+   *          the YARN configuration obtained by reading the Hadoop
+   *          configuration files
+   * @return the completed application launch context for the given application
+   * @throws IOException
+   *           if localized resources are not found in the distributed file
+   *           system (such as HDFS)
+   */
+
+  public ContainerLaunchContext createLaunchContext(YarnConfiguration conf)
+      throws IOException {
+    // Set up the container launch context
+    ContainerLaunchContext container = Records
+        .newRecord(ContainerLaunchContext.class);
+
+    // Set up the list of commands to run. Here, we assume that we run only
+    // one command.
+
+    container.setCommands(Collections.singletonList(getCommand()));
+
+    // Add localized resources
+
+    container.setLocalResources(resources);
+
+    // Environment.
+
+    container.setEnvironment(env);
+
+    return container;
+  }
+
+  public void dump(PrintStream out) {
+    if (command != null) {
+      out.print("Command: ");
+      out.println(command);
+    }
+    if (mainClass != null) {
+      out.print("Main Class: ");
+      out.println(mainClass);
+      out.println("VM Args:");
+      if (vmArgs.isEmpty()) {
+        out.println("  None");
+      } else {
+        for (String item : vmArgs) {
+          out.print("  ");
+          out.println(item);
+        }
+      }
+      out.println("Class Path:");
+      if (classPath.isEmpty()) {
+        out.println("  None");
+      } else {
+        for (String item : classPath) {
+          out.print("  ");
+          out.println(item);
+        }
+      }
+    }
+    out.println("Program Args:");
+    if (cmdArgs.isEmpty()) {
+      out.println("  None");
+    } else {
+      for (String item : cmdArgs) {
+        out.print("  ");
+        out.println(item);
+      }
+    }
+    out.println("Environment:");
+    if (env.isEmpty()) {
+      out.println("  None");
+    } else {
+      for (String key : env.keySet()) {
+        out.print("  ");
+        out.print(key);
+        out.print("=");
+        out.println(env.get(key));
+      }
+    }
+    out.println("Resources: ");
+    if (resources.isEmpty()) {
+      out.println("  None");
+    } else {
+      for (String key : resources.keySet()) {
+        out.print("  Key: ");
+        out.println(key);
+        LocalResource resource = resources.get(key);
+        out.print("   URL: ");
+        out.println(resource.getResource().toString());
+        out.print("   Size: ");
+        out.println(resource.getSize());
+        out.print("   Timestamp: ");
+        out.println(DoYUtil.toIsoTime(resource.getTimestamp()));
+        out.print("   Type: ");
+        out.println(resource.getType().toString());
+        out.print("   Visiblity: ");
+        out.println(resource.getVisibility().toString());
+      }
+    }
+  }
+}

Reply via email to