http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/StopCommand.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/client/StopCommand.java b/drill-yarn/src/main/java/org/apache/drill/yarn/client/StopCommand.java new file mode 100644 index 0000000..95f7bf3 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/StopCommand.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.client; + +import org.apache.drill.yarn.core.DoYUtil; +import org.apache.drill.yarn.core.DrillOnYarnConfig; +import org.apache.drill.yarn.core.YarnClientException; +import org.apache.drill.yarn.core.YarnRMClient; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; + +import com.typesafe.config.Config; + +/** + * Perform a semi-graceful shutdown of the Drill-on-YARN AM. We send a message + * to the AM to request shutdown because the YARN-provided message just kills + * the AM. (There seems to be no way to get YARN to call its own + * AMRMClientAsync.CallbackHandler.onShutdownRequest message.) The AM, however, + * cannot gracefully shut down the drill-bits because Drill itself has no + * graceful shutdown. But, at least this technique gives the AM a fighting + * chance to do graceful shutdown in the future. + */ + +public class StopCommand extends ClientCommand { + /** + * Poll the YARN RM to check the stop status of the AM. Periodically poll, + * waiting to get an app state that indicates app completion. + */ + + private static class StopMonitor { + StatusCommand.Reporter reporter; + private YarnApplicationState state; + private int pollWaitSec; + private int shutdownWaitSec; + + StopMonitor(Config config, StatusCommand.Reporter reporter) { + this.reporter = reporter; + pollWaitSec = config.getInt(DrillOnYarnConfig.CLIENT_POLL_SEC); + if (pollWaitSec < 1) { + pollWaitSec = 1; + } + shutdownWaitSec = config.getInt(DrillOnYarnConfig.CLIENT_STOP_WAIT_SEC); + } + + boolean run(boolean verbose) throws ClientException { + System.out.print("Stopping..."); + try { + int attemptCount = shutdownWaitSec / pollWaitSec; + for (int attempt = 0; attempt < attemptCount; attempt++) { + if (!poll()) { + break; + } + } + } catch (ClientException e) { + System.out.println(); + throw e; + } + if (reporter.isStopped()) { + System.out.println(" Stopped."); + reporter.showFinalStatus(); + return true; + } else { + System.out.println(); + System.out.println( + "Application Master is slow to stop, use YARN to check status."); + return false; + } + } + + private boolean poll() throws ClientException { + try { + Thread.sleep(pollWaitSec * 1000); + } catch (InterruptedException e) { + return false; + } + reporter.getReport(); + if (reporter.isStopped()) { + return false; + } + YarnApplicationState newState = reporter.getState(); + if (newState == state) { + System.out.print("."); + return true; + } + updateState(newState); + return true; + } + + private void updateState(YarnApplicationState newState) { + YarnApplicationState oldState = state; + state = newState; + if (oldState == null) { + return; + } + System.out.println(); + System.out.print("Application State: "); + System.out.println(state.toString()); + System.out.print("Stopping..."); + } + } + + private Config config; + private YarnRMClient client; + + @Override + public void run() throws ClientException { + config = DrillOnYarnConfig.config(); + client = getClient(); + System.out + .println("Stopping Application ID: " + client.getAppId().toString()); + + // First get an application report to ensure that the AM is, + // in fact, running, and to get the HTTP endpoint. + + StatusCommand.Reporter reporter = new StatusCommand.Reporter(client); + try { + reporter.getReport(); + } catch (ClientException e) { + reporter = null; + } + + // Handle the case of an already stopped app. + + boolean stopped = true; + if (reporter == null || reporter.isStopped()) { + System.out.println("Application is not running."); + } else { + // Try to stop the server by sending a STOP REST request. + + if (opts.force) { + System.out.println("Forcing shutdown"); + } else { + stopped = gracefulStop(reporter.getAmUrl()); + } + + // If that did not work, then forcibly kill the AM. + // YARN will forcibly kill the AM's containers. + // Not pretty, but it works. + + if (opts.force || !stopped) { + forcefulStop(); + } + + // Wait for the AM to stop. The AM may refuse to stop in + // the time allowed to wait. + + stopped = new StopMonitor(config, reporter).run(opts.verbose); + } + + // If the AM is gone because it started out dead or + // we killed it, then forget its App Id. + + if (stopped) { + removeAppIdFile(); + } + } + + /** + * Do a graceful shutdown by using the AM's REST API call to request stop. + * Include the master key with the request to differentiate this request from + * accidental uses of the stop REST API. + * + * @param report + * @return + */ + + private boolean gracefulStop(String baseUrl) { + try { + if (DoYUtil.isBlank(baseUrl)) { + return false; + } + SimpleRestClient restClient = new SimpleRestClient(); + String tail = "rest/stop"; + String masterKey = config.getString(DrillOnYarnConfig.HTTP_REST_KEY); + if (!DoYUtil.isBlank(masterKey)) { + tail += "?key=" + masterKey; + } + if (opts.verbose) { + System.out.println("Stopping with POST " + baseUrl + "/" + tail); + } + String result = restClient.send(baseUrl, tail, true); + if (result.contains("\"ok\"")) { + return true; + } + System.err.println( + "Failed to stop the application master. Response = " + result); + return false; + } catch (ClientException e) { + System.err.println(e.getMessage()); + System.out.println("Resorting to forced kill"); + return false; + } + } + + /** + * If the graceful approach did not work, resort to a forceful request. This + * asks the AM's NM to kill the AM process. + * + * @throws ClientException + */ + + private void forcefulStop() throws ClientException { + try { + client.killApplication(); + } catch (YarnClientException e) { + throw new ClientException("Failed to stop application master", e); + } + } +}
http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/client/package-info.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/client/package-info.java b/drill-yarn/src/main/java/org/apache/drill/yarn/client/package-info.java new file mode 100644 index 0000000..c03c2fa --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/client/package-info.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Implements a "YARN client" for Drill-on-YARN. The client uploads files to + * DFS, then requests that YARN start the Application Master. Much fiddling + * about is required to support this, such as zipping up the user's configuration, + * creating a local file with the app id so we can get app status and shut down + * the app, etc. + * <p> + * Divided into a main program ({@link DrillOnYarn}) and a series of commands. + * Some commands are further divided into tasks. Builds on the + * YARN and DFS facades defined in the core module. + */ + +package org.apache.drill.yarn.client; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/AppSpec.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/AppSpec.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/AppSpec.java new file mode 100644 index 0000000..34d4ad1 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/AppSpec.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.core; + +import java.io.IOException; +import java.io.PrintStream; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.Records; + +/** + * Abstract description of a remote process launch that describes the many + * details needed to launch a process on a remote node. The YARN launch + * specification is a mess to work with; this class provides a simpler facade to + * gather the information, then turns around and builds the required YARN + * object. + * <p> + * Based on <a href="https://github.com/hortonworks/simple-yarn-app">Simple YARN + * App</a>. + */ + +public class AppSpec extends LaunchSpec { + + static final private Log LOG = LogFactory.getLog(LaunchSpec.class); + + /** + * The memory required in the allocated container, in MB. + */ + + public int memoryMb; + + /** + * The number of YARN "vcores" (roughly equivalent to CPUs) to allocate to the + * process. + */ + + public int vCores = 1; + + /** + * The number of disk resources (that is, disk channels) used by the process. + * Available only on some YARN distributions. Fractional values allowed. + */ + + public double disks; + + /** + * The name of the application given to YARN. Appears in the YARN admin UI. + */ + + public String appName; + + /** + * The YARN queue in which to place the application launch request. + */ + + public String queueName = "default"; + + public int priority = 1; + + /** + * Whether to run the AM in unmanaged mode. Leave this false for production + * code. + */ + + public boolean unmanaged; + + /** + * Optional node label expression for the launch. Selects the nodes on which + * the task can run. + */ + + public String nodeLabelExpr; + + /** + * Given this generic description of an application, create the detailed YARN + * application submission context required to launch the application. + * + * @param conf + * the YARN configuration obtained by reading the Hadoop + * configuration files + * @param app + * the YARN definition of the client application to be populated from + * this generic description + * @return the completed application launch context for the given application + * @throws IOException + * if localized resources are not found in the distributed file + * system (such as HDFS) + */ + + public ApplicationSubmissionContext createAppLaunchContext( + YarnConfiguration conf, YarnClientApplication app) throws IOException { + ContainerLaunchContext amContainer = createLaunchContext(conf); + + // Finally, set-up ApplicationSubmissionContext for the application + ApplicationSubmissionContext appContext = app + .getApplicationSubmissionContext(); + appContext.setApplicationName(appName); // application name + appContext.setAMContainerSpec(amContainer); + appContext.setResource(getCapability()); + appContext.setQueue(queueName); // queue + appContext.setPriority(Priority.newInstance(priority)); + if (!DoYUtil.isBlank(nodeLabelExpr)) { + LOG.info( + "Requesting to run the AM using node expression: " + nodeLabelExpr); + appContext.setNodeLabelExpression(nodeLabelExpr); + } + + appContext.setUnmanagedAM(unmanaged); + + // Only try the AM once. It will fail if things are misconfigured. Retrying + // is unlikely + // to fix the configuration problem. + + appContext.setMaxAppAttempts(1); + + // TODO: Security tokens + + return appContext; + } + + public Resource getCapability() { + + // Set up resource type requirements for ApplicationMaster + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(memoryMb); + capability.setVirtualCores(vCores); + DoYUtil.callSetDiskIfExists(capability, disks); + return capability; + } + + @Override + public void dump(PrintStream out) { + out.print("Memory (MB): "); + out.println(memoryMb); + out.print("Vcores: "); + out.println(vCores); + out.print("Disks: "); + out.println(disks); + out.print("Application Name: "); + out.println(appName); + out.print("Queue: "); + out.println(queueName); + out.print("Priority: "); + out.println(priority); + super.dump(out); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/ClusterDef.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/ClusterDef.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/ClusterDef.java new file mode 100644 index 0000000..223b606 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/ClusterDef.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.core; + +import java.io.PrintStream; +import java.util.List; +import java.util.Map; + +import org.apache.drill.yarn.appMaster.TaskSpec; +import org.mortbay.log.Log; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigList; +import com.typesafe.config.ConfigValue; + +public class ClusterDef { + // The following keys are relative to the cluster group definition + + public static final String GROUP_NAME = "name"; + public static final String GROUP_TYPE = "type"; + public static final String GROUP_SIZE = "count"; + + // For the labeled pool + + public static final String DRILLBIT_LABEL = "drillbit-label-expr"; + public static final String AM_LABEL = "am-label-expr"; + + /** + * Defined cluster tier types. The value of the type appears as the value of + * the {@link $CLUSTER_TYPE} parameter in the config file. + */ + + public enum GroupType { + BASIC, + LABELED; + + public static GroupType toEnum(String value) { + return GroupType.valueOf( value.toUpperCase() ); + } + + public String toValue() { + return name().toLowerCase(); + } + } + + public static class ClusterGroup { + private final String name; + private final int count; + private final GroupType type; + + public ClusterGroup( Map<String, Object> group, int index, GroupType type ) { + this.type = type; + + // Config system has already parsed the value. We insist that the value, + // when parsed, was interpreted as an integer. That is, the value had + // to be, say 10. Not "10", not 10.0, but just a plain integer. + + try { + count = (Integer) group.get(GROUP_SIZE); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + "Expected an integer for " + GROUP_SIZE + " for tier " + index); + } + Object nameValue = group.get(GROUP_NAME); + String theName = null; + if (nameValue != null) { + theName = nameValue.toString(); + } + if (DoYUtil.isBlank(theName)) { + theName = "tier-" + Integer.toString(index); + } + name = theName; + } + + + public String getName( ) { return name; } + public int getCount( ) { return count; } + public GroupType getType( ) { return type; } + + public void getPairs(int index, List<NameValuePair> pairs) { + String key = DrillOnYarnConfig.append(DrillOnYarnConfig.CLUSTERS, + Integer.toString(index)); + addPairs(pairs, key); + } + + protected void addPairs(List<NameValuePair> pairs, String key) { + pairs.add( + new NameValuePair(DrillOnYarnConfig.append(key, GROUP_NAME), name)); + pairs.add( + new NameValuePair(DrillOnYarnConfig.append(key, GROUP_TYPE), type)); + pairs.add( + new NameValuePair(DrillOnYarnConfig.append(key, GROUP_SIZE), count)); + } + + public void dump(String prefix, PrintStream out) { + out.print(prefix); + out.print("name = "); + out.println(name); + out.print(prefix); + out.print("type = "); + out.println(type.toValue()); + out.print(prefix); + out.print("count = "); + out.println(count); + } + + public void modifyTaskSpec(TaskSpec taskSpec) { + } + } + + public static class BasicGroup extends ClusterGroup { + + public BasicGroup(Map<String, Object> pool, int index) { + super(pool, index, GroupType.BASIC); + } + + } + + public static class LabeledGroup extends ClusterGroup { + + private final String drillbitLabelExpr; + + public LabeledGroup(Map<String, Object> pool, int index) { + super(pool, index, GroupType.LABELED); + drillbitLabelExpr = (String) pool.get(DRILLBIT_LABEL); + if (drillbitLabelExpr == null) { + Log.warn("Labeled pool is missing the drillbit label expression (" + + DRILLBIT_LABEL + "), will treat pool as basic."); + } + } + + public String getLabelExpr( ) { return drillbitLabelExpr; } + + @Override + public void dump(String prefix, PrintStream out) { + out.print(prefix); + out.print("Drillbit label expr = "); + out.println((drillbitLabelExpr == null) ? "<none>" : drillbitLabelExpr); + } + + @Override + protected void addPairs(List<NameValuePair> pairs, String key) { + super.addPairs(pairs, key); + pairs.add(new NameValuePair(DrillOnYarnConfig.append(key, DRILLBIT_LABEL), + drillbitLabelExpr)); + } + + @Override + public void modifyTaskSpec(TaskSpec taskSpec) { + taskSpec.containerSpec.nodeLabelExpr = drillbitLabelExpr; + } + } + + /** + * Deserialize a node tier from the configuration file. + * + * @param n + * @return + */ + + public static ClusterGroup getCluster(Config config, int n) { + int index = n + 1; + ConfigList tiers = config.getList(DrillOnYarnConfig.CLUSTERS); + ConfigValue value = tiers.get(n); + if ( value == null ) { + throw new IllegalArgumentException( "If cluster group is provided, it cannot be null: group " + index ); + } + @SuppressWarnings("unchecked") + Map<String, Object> tier = (Map<String, Object>) value.unwrapped(); + String type; + try { + type = tier.get(GROUP_TYPE).toString(); + } catch (NullPointerException e) { + throw new IllegalArgumentException( + "Pool type is required for cluster group " + index); + } + GroupType groupType = GroupType.toEnum(type); + if (groupType == null) { + throw new IllegalArgumentException( + "Undefined type for cluster group " + index + ": " + type); + } + ClusterGroup tierDef; + switch (groupType) { + case BASIC: + tierDef = new BasicGroup( tier, index ); + break; + case LABELED: + tierDef = new LabeledGroup( tier, index ); + break; + default: + assert false; + throw new IllegalStateException( + "Undefined cluster group type: " + groupType); + } + return tierDef; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/ContainerRequestSpec.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/ContainerRequestSpec.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/ContainerRequestSpec.java new file mode 100644 index 0000000..99a22d7 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/ContainerRequestSpec.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.core; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.drill.yarn.appMaster.Scheduler; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest; +import org.apache.hadoop.yarn.util.Records; + +/** + * Describes a container request in terms of priority, memory, cores and + * placement preference. This is a simplified version of the YARN + * ContainerRequest structure. This structure is easier to use within the app, + * then is translated to the YARN structure when needed. + */ + +public class ContainerRequestSpec { + static final Log LOG = LogFactory.getLog(ContainerRequestSpec.class); + + /** + * Application-specific priority. Drill-on-Yarn uses the priority to associate + * YARN requests with a {@link Scheduler}. When the resource allocation + * arrives, we use the priority to trace back to the scheduler that requested + * it, and from there to the task to be run in the allocation. + * <p> + * For this reason, the priority is set by the Drill-on-YARN application; it + * is not a user-adjustable value. + */ + + public int priority = 0; + + /** + * Memory, in MB, required by the container. + */ + + public int memoryMb; + + /** + * Number of "virtual cores" required by the task. YARN allocates whole CPU + * cores and does not support fractional allocations. + */ + + public int vCores = 1; + + /** + * Number of virtual disks (channels, spindles) to request. Not supported in + * Apache YARN, is supported in selected distributions. + */ + + public double disks; + + /** + * Node label expression to apply to this request. + */ + + public String nodeLabelExpr; + + public List<String> racks = new ArrayList<>(); + public List<String> hosts = new ArrayList<>(); + + /** + * Create a YARN ContainerRequest object from the information in this object. + * + * @return + */ + public ContainerRequest makeRequest() { + assert memoryMb != 0; + + Priority priorityRec = Records.newRecord(Priority.class); + priorityRec.setPriority(priority); + + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(memoryMb); + capability.setVirtualCores(vCores); + DoYUtil.callSetDiskIfExists(capability, disks); + + boolean relaxLocality = true; + String nodeArr[] = null; + if (!hosts.isEmpty()) { + nodeArr = new String[hosts.size()]; + hosts.toArray(nodeArr); + relaxLocality = false; + } + String rackArr[] = null; + if (!racks.isEmpty()) { + nodeArr = new String[racks.size()]; + racks.toArray(rackArr); + relaxLocality = false; + } + String nodeExpr = null; + if (!DoYUtil.isBlank(nodeLabelExpr)) { + nodeExpr = nodeLabelExpr; + LOG.info("Requesting a container using node expression: " + nodeExpr); + } + + // YARN is fragile. To (potentially) pass a node expression, we must use the + // 5-argument constructor. The fourth argument (relax locality) MUST be set + // to true if we omit the rack and node specs. (Else we get a runtime + // error. + + return new ContainerRequest(capability, nodeArr, rackArr, priorityRec, + relaxLocality, nodeExpr); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/DfsFacade.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/DfsFacade.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DfsFacade.java new file mode 100644 index 0000000..09e88ae --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DfsFacade.java @@ -0,0 +1,345 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.core; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import com.typesafe.config.Config; + +/** + * Facade to the distributed file system (DFS) system that implements + * Drill-on-YARN related operations. Some operations are used by both the client + * and AM applications. + */ + +public class DfsFacade { + public static class DfsFacadeException extends Exception { + private static final long serialVersionUID = 1L; + + public DfsFacadeException(String msg) { + super(msg); + } + + public DfsFacadeException(String msg, Exception e) { + super(msg, e); + } + } + + private FileSystem fs; + private Configuration yarnConf; + private Config config; + private boolean localize; + + public DfsFacade(Config config) { + this.config = config; + localize = config.getBoolean(DrillOnYarnConfig.LOCALIZE_DRILL); + } + + public boolean isLocalized() { + return localize; + } + + public void connect() throws DfsFacadeException { + loadYarnConfig(); + String dfsConnection = config.getString(DrillOnYarnConfig.DFS_CONNECTION); + try { + if (DoYUtil.isBlank(dfsConnection)) { + fs = FileSystem.get(yarnConf); + } else { + URI uri; + try { + uri = new URI(dfsConnection); + } catch (URISyntaxException e) { + throw new DfsFacadeException( + "Illformed DFS connection: " + dfsConnection, e); + } + fs = FileSystem.get(uri, yarnConf); + } + } catch (IOException e) { + throw new DfsFacadeException("Failed to create the DFS", e); + } + } + + /** + * Lazy loading of YARN configuration since it takes a long time to load. + * (YARN provides no caching, sadly.) + */ + + private void loadYarnConfig() { + if (yarnConf == null) { + yarnConf = new YarnConfiguration(); + // On some distributions, lack of proper configuration causes + // DFS to default to the local file system. So, a local file + // system generally means that the config is wrong, or running + // the wrong build of Drill for the user's environment. + URI fsUri = FileSystem.getDefaultUri( yarnConf ); + if(fsUri.toString().startsWith("file:/")) { + System.err.println("Warning: Default DFS URI is for a local file system: " + fsUri); + } + } + } + + public static class Localizer { + private final DfsFacade dfs; + protected File localArchivePath; + protected Path dfsArchivePath; + FileStatus fileStatus; + private String label; + + /** + * Resources to be localized (downloaded) to each AM or drillbit node. + */ + + public Localizer(DfsFacade dfs, File archivePath, String label) { + this(dfs, archivePath, dfs.getUploadPath(archivePath), label); + } + + public Localizer(DfsFacade dfs, File archivePath, String destName, + String label) { + this(dfs, archivePath, dfs.getUploadPath(destName), label); + } + + public Localizer(DfsFacade dfs, String destPath) { + this( dfs, null, new Path(destPath), null ); + } + + public Localizer(DfsFacade dfs, File archivePath, Path destPath, String label) { + this.dfs = dfs; + dfsArchivePath = destPath; + this.label = label; + localArchivePath = archivePath; + } + + public String getBaseName() { + return localArchivePath.getName(); + } + + public String getDestPath() { + return dfsArchivePath.toString(); + } + + public void upload() throws DfsFacadeException { + dfs.uploadArchive(localArchivePath, dfsArchivePath, label); + fileStatus = null; + } + + /** + * The client may check file status multiple times. Cache it here so we + * only retrieve the status once. Cache it here so that the client + * doen't have to do the caching. + * + * @return + * @throws DfsFacadeException + */ + + private FileStatus getStatus() throws DfsFacadeException { + if (fileStatus == null) { + fileStatus = dfs.getFileStatus(dfsArchivePath); + } + return fileStatus; + } + + public void defineResources(Map<String, LocalResource> resources, + String key) throws DfsFacadeException { + // Put the application archive, visible to only the application. + // Because it is an archive, it will be expanded by YARN prior to launch + // of the AM. + + LocalResource drillResource = dfs.makeResource(dfsArchivePath, + getStatus(), LocalResourceType.ARCHIVE, + LocalResourceVisibility.APPLICATION); + resources.put(key, drillResource); + } + + public boolean filesMatch() { + FileStatus status; + try { + status = getStatus(); + } catch (DfsFacadeException e) { + + // An exception is DFS's way of tell us the file does + // not exist. + + return false; + } + return status.getLen() == localArchivePath.length(); + } + + public String getLabel() { + return label; + } + + public boolean destExists() throws IOException { + return dfs.exists(dfsArchivePath); + } + } + + public boolean exists(Path path) throws IOException { + return fs.exists(path); + } + + public Path getUploadPath(File localArchiveFile) { + return getUploadPath(localArchiveFile.getName()); + } + + public Path getUploadPath(String baseName) { + String dfsDirStr = config.getString(DrillOnYarnConfig.DFS_APP_DIR); + + Path appDir; + if (dfsDirStr.startsWith("/")) { + appDir = new Path(dfsDirStr); + } else { + Path home = fs.getHomeDirectory(); + appDir = new Path(home, dfsDirStr); + } + return new Path(appDir, baseName); + } + + public void uploadArchive(File localArchiveFile, Path destPath, String label) + throws DfsFacadeException { + // Create the application upload directory if it does not yet exist. + + String dfsDirStr = config.getString(DrillOnYarnConfig.DFS_APP_DIR); + Path appDir = new Path(dfsDirStr); + try { + // If the directory does not exist, create it, giving this user + // (only) read and write access. + + if (!fs.isDirectory(appDir)) { + fs.mkdirs(appDir, new FsPermission(FsAction.READ_WRITE, FsAction.NONE, FsAction.NONE)); + } + } catch (IOException e) { + throw new DfsFacadeException( + "Failed to create DFS directory: " + dfsDirStr, e); + } + + // The file must be an archive type so YARN knows to extract its contents. + + String baseName = localArchiveFile.getName(); + if (DrillOnYarnConfig.findSuffix(baseName) == null) { + throw new DfsFacadeException( + label + " archive must be .tar.gz, .tgz or .zip: " + baseName); + } + + Path srcPath = new Path(localArchiveFile.getAbsolutePath()); + + // Do the upload, replacing the old archive. + + try { + // TODO: Specify file permissions and owner. + + fs.copyFromLocalFile(false, true, srcPath, destPath); + } catch (IOException e) { + throw new DfsFacadeException( + "Failed to upload " + label + " archive to DFS: " + + localArchiveFile.getAbsolutePath() + " --> " + destPath, + e); + } + } + + private FileStatus getFileStatus(Path dfsPath) throws DfsFacadeException { + try { + return fs.getFileStatus(dfsPath); + } catch (IOException e) { + throw new DfsFacadeException( + "Failed to get DFS status for file: " + dfsPath, e); + } + } + + /** + * Create a local resource definition for YARN. A local resource is one that + * must be localized onto the remote node prior to running a command on that + * node. + * <p> + * YARN uses the size and timestamp are used to check if the file has changed + * on HDFS to check if YARN can use an existing copy, if any. + * <p> + * Resources are made public. + * + * @param conf + * Configuration created from the Hadoop config files, in this case, + * identifies the target file system. + * @param resourcePath + * the path (relative or absolute) to the file on the configured file + * system (usually HDFS). + * @return a YARN local resource records that contains information about path, + * size, type, resource and so on that YARN requires. + * @throws IOException + * if the resource does not exist on the configured file system + */ + + public LocalResource makeResource(Path dfsPath, FileStatus dfsFileStatus, + LocalResourceType type, LocalResourceVisibility visibility) + throws DfsFacadeException { + URL destUrl; + try { + destUrl = ConverterUtils.getYarnUrlFromPath( + FileContext.getFileContext().makeQualified(dfsPath)); + } catch (UnsupportedFileSystemException e) { + throw new DfsFacadeException( + "Unable to convert dfs file to a URL: " + dfsPath.toString(), e); + } + LocalResource resource = LocalResource.newInstance(destUrl, type, + visibility, dfsFileStatus.getLen(), + dfsFileStatus.getModificationTime()); + return resource; + } + + public void removeDrillFile(String fileName) throws DfsFacadeException { + Path destPath = getUploadPath(fileName); + try { + fs.delete(destPath, false); + } catch (IOException e) { + throw new DfsFacadeException( + "Failed to delete file: " + destPath.toString(), e); + } + + // Remove the Drill directory, but only if it is now empty. + + Path dir = destPath.getParent(); + try { + RemoteIterator<FileStatus> iter = fs.listStatusIterator(dir); + if (!iter.hasNext()) { + fs.delete(dir, false); + } + } catch (IOException e) { + throw new DfsFacadeException( + "Failed to delete directory: " + dir.toString(), e); + } + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/DoYUtil.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/DoYUtil.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DoYUtil.java new file mode 100644 index 0000000..3c1d17d --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DoYUtil.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.core; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.security.CodeSource; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.TimeZone; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Container; + +public class DoYUtil { + static final private Log LOG = LogFactory.getLog(DoYUtil.class); + + private DoYUtil() { + } + + public static String join(String separator, List<String> list) { + StringBuilder buf = new StringBuilder(); + String sep = ""; + for (String item : list) { + buf.append(sep); + buf.append(item); + sep = separator; + } + return buf.toString(); + } + + public static void addNonEmpty(List<String> list, String value) { + if ( ! isBlank( value ) ) { + list.add(value.trim( )); + } + } + + public static boolean isBlank(String str) { + return str == null || str.trim().isEmpty(); + } + + public static String toIsoTime(long timestamp) { + + // Uses old-style dates rather than java.time because + // the code still must compile for JDK 7. + + DateFormat fmt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + fmt.setTimeZone(TimeZone.getDefault()); + return fmt.format(new Date(timestamp)); + } + + public static String labelContainer(Container container) { + StringBuilder buf = new StringBuilder() + .append("[id: ") + .append(container.getId()) + .append(", host: ") + .append(container.getNodeId().getHost()) + .append(", priority: ") + .append(container.getPriority()) + .append("]"); + return buf.toString(); + } + + /** + * Utility method to display YARN container information in a useful way for + * log messages. + * + * @param container + * @return + */ + + public static String describeContainer(Container container) { + StringBuilder buf = new StringBuilder() + .append("[id: ") + .append(container.getId()) + .append(", host: ") + .append(container.getNodeId().getHost()) + .append(", priority: ") + .append(container.getPriority()) + .append(", memory: ") + .append(container.getResource().getMemory()) + .append(" MB, vcores: ") + .append(container.getResource().getVirtualCores()) + .append("]"); + return buf.toString(); + } + + /** + * The tracking URL given to YARN is a redirect URL. When giving the URL to + * the user, "unwrap" that redirect URL to get the actual site URL. + * + * @param trackingUrl + * @return + */ + + public static String unwrapAmUrl(String trackingUrl) { + return trackingUrl.replace("/redirect", "/"); + } + + public static Object dynamicCall(Object target, String fnName, Object args[], + Class<?> types[]) { + + // First, look for the method using the names and types provided. + + final String methodLabel = target.getClass().getName() + "." + fnName; + Method m; + try { + m = target.getClass().getMethod(fnName, types); + } catch (NoSuchMethodException e) { + + // Ignore, but log: the method does not exist in this distribution. + + StringBuilder buf = new StringBuilder(); + if (types != null) { + String sep = ""; + for (Class<?> type : types) { + buf.append(sep); + buf.append(type.getName()); + sep = ","; + } + } + LOG.trace("Not supported in this YARN distribution: " + methodLabel + "(" + + buf.toString() + ")"); + CodeSource src = target.getClass().getProtectionDomain().getCodeSource(); + if (src != null) { + java.net.URL jar = src.getLocation(); + LOG.trace("Class found in URL: " + jar.toString()); + } + return null; + } catch (SecurityException e) { + LOG.error("Security prevents dynamic method calls", e); + return null; + } + + // Next, call the method with the arguments provided. + + Object ret = null; + try { + ret = m.invoke(target, args); + } catch (IllegalAccessException | IllegalArgumentException + | InvocationTargetException e) { + LOG.error("Failed to dynamically call " + methodLabel, e); + return null; + } + StringBuilder buf = new StringBuilder(); + if (args != null) { + String sep = ""; + for (Object arg : args) { + buf.append(sep); + buf.append(arg == null ? "null" : arg.toString()); + sep = ","; + } + } + LOG.trace( + "Successfully called " + methodLabel + "( " + buf.toString() + ")"); + + // Return any return value. Will be null if the method is returns void. + + return ret; + } + + public static void callSetDiskIfExists(Object target, double arg) { + dynamicCall(target, "setDisks", new Object[] { arg }, + new Class<?>[] { Double.TYPE }); + } + + public static double callGetDiskIfExists(Object target) { + Object ret = dynamicCall(target, "getDisks", null, null); + return (ret == null) ? 0.0 : (Double) ret; + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/DoyConfigException.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/DoyConfigException.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DoyConfigException.java new file mode 100644 index 0000000..422b89b --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DoyConfigException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.core; + +public class DoyConfigException extends Exception { + private static final long serialVersionUID = 1L; + + public DoyConfigException(String msg) { + super(msg); + } + + public DoyConfigException(String msg, Exception e) { + super(msg, e); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/DrillOnYarnConfig.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/DrillOnYarnConfig.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DrillOnYarnConfig.java new file mode 100644 index 0000000..38ecd1c --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/DrillOnYarnConfig.java @@ -0,0 +1,841 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.core; + +import java.io.File; +import java.io.PrintStream; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; + +import org.apache.drill.common.config.CommonConstants; +import org.apache.drill.common.config.DrillConfig; +import org.apache.drill.common.scanner.ClassPathScanner; +import org.apache.drill.common.scanner.persistence.ScanResult; +import org.apache.drill.exec.ExecConstants; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigException; +import com.typesafe.config.ConfigFactory; + +/** + * Configuration used within the Drill-on-YARN code. Configuration comes from four + * sources (in order of precedence): + * <ol> + * <li>System properties</li> + * <li>$SITE_DIR/drill-on-yarn.conf</li> + * <li>Distribution-specific properties in $SITE_HOME/conf/doy-distrib.conf</li> + * <li>Drill-on-YARN defaults in drill-on-yarn-defaults.conf. (Which should be + * disjoint from the Drill settings.)</li> + * <li>Drill properties (via the Drill override system)</li> + * </ol> + * <p> + * Defines constants for each property, including some defined in Drill. This provides + * a uniform property access interface even if some properties migrate between DoY and + * Drill proper. + */ + +public class DrillOnYarnConfig { + public static final String DEFAULTS_FILE_NAME = "drill-on-yarn-defaults.conf"; + public static final String DISTRIB_FILE_NAME = "doy-distrib.conf"; + public static final String CONFIG_FILE_NAME = "drill-on-yarn.conf"; + + public static final String DRILL_ON_YARN_PARENT = "drill.yarn"; + public static final String DOY_CLIENT_PARENT = append(DRILL_ON_YARN_PARENT, "client"); + public static final String DOY_AM_PARENT = append(DRILL_ON_YARN_PARENT, "am"); + public static final String DOY_DRILLBIT_PARENT = append(DRILL_ON_YARN_PARENT, "drillbit"); + public static final String FILES_PARENT = append(DRILL_ON_YARN_PARENT, "drill-install"); + public static final String DFS_PARENT = append(DRILL_ON_YARN_PARENT, "dfs"); + public static final String HTTP_PARENT = append(DRILL_ON_YARN_PARENT, "http"); + public static final String YARN_PARENT = append(DRILL_ON_YARN_PARENT, "yarn"); + public static final String HADOOP_PARENT = append(DRILL_ON_YARN_PARENT, "hadoop"); + public static final String CLIENT_PARENT = append(DRILL_ON_YARN_PARENT, "client"); + + public static final String APP_NAME = append(DRILL_ON_YARN_PARENT, "app-name"); + public static final String CLUSTER_ID = ExecConstants.SERVICE_NAME; + + public static final String DFS_CONNECTION = append(DFS_PARENT, "connection"); + public static final String DFS_APP_DIR = append(DFS_PARENT, "app-dir"); + + public static final String YARN_QUEUE = append(YARN_PARENT, "queue"); + public static final String YARN_PRIORITY = append(YARN_PARENT, "priority"); + + public static final String DRILL_ARCHIVE_PATH = append(FILES_PARENT, "client-path"); + public static final String DRILL_DIR_NAME = append(FILES_PARENT, "dir-name"); + + /** + * Key used for the Drill archive file in the AM launch config. + */ + + public static final String DRILL_ARCHIVE_KEY = append(FILES_PARENT, "drill-key"); + public static final String SITE_ARCHIVE_KEY = append(FILES_PARENT, "site-key"); + public static final String LOCALIZE_DRILL = append(FILES_PARENT, "localize"); + public static final String CONF_AS_SITE = append(FILES_PARENT, "conf-as-site"); + public static final String DRILL_HOME = append(FILES_PARENT, "drill-home"); + public static final String SITE_DIR = append(FILES_PARENT, "site-dir"); + public static final String JAVA_LIB_PATH = append(FILES_PARENT, "library-path"); + + public static final String HADOOP_HOME = append(HADOOP_PARENT, "home"); + public static final String HADOOP_CLASSPATH = append(HADOOP_PARENT, "class-path"); + public static final String HBASE_CLASSPATH = append(HADOOP_PARENT, "hbase-class-path"); + + public static final String MEMORY_KEY = "memory-mb"; + public static final String VCORES_KEY = "vcores"; + public static final String DISKS_KEY = "disks"; + public static final String VM_ARGS_KEY = "vm-args"; + public static final String HEAP_KEY = "heap"; + + public static final String AM_MEMORY = append(DOY_AM_PARENT, MEMORY_KEY); + public static final String AM_VCORES = append(DOY_AM_PARENT, VCORES_KEY); + public static final String AM_DISKS = append(DOY_AM_PARENT, DISKS_KEY); + public static final String AM_NODE_LABEL_EXPR = append(DOY_AM_PARENT, "node-label-expr"); + public static final String AM_HEAP = append(DOY_AM_PARENT, HEAP_KEY); + public static final String AM_VM_ARGS = append(DOY_AM_PARENT, VM_ARGS_KEY); + public static final String AM_POLL_PERIOD_MS = append(DOY_AM_PARENT, "poll-ms"); + public static final String AM_TICK_PERIOD_MS = append(DOY_AM_PARENT, "tick-ms"); + public static final String AM_PREFIX_CLASSPATH = append(DOY_AM_PARENT, "prefix-class-path"); + public static final String AM_CLASSPATH = append(DOY_AM_PARENT, "class-path"); + public static final String AM_DEBUG_LAUNCH = append(DOY_AM_PARENT, "debug-launch"); + public static final String AM_ENABLE_AUTO_SHUTDOWN = append(DOY_AM_PARENT, "auto-shutdown"); + + public static final String DRILLBIT_MEMORY = append(DOY_DRILLBIT_PARENT, MEMORY_KEY); + public static final String DRILLBIT_VCORES = append(DOY_DRILLBIT_PARENT, VCORES_KEY); + public static final String DRILLBIT_DISKS = append(DOY_DRILLBIT_PARENT, DISKS_KEY); + public static final String DRILLBIT_VM_ARGS = append(DOY_DRILLBIT_PARENT, VM_ARGS_KEY); + public static final String DRILLBIT_HEAP = append(DOY_DRILLBIT_PARENT, HEAP_KEY); + public static final String DRILLBIT_DIRECT_MEM = append(DOY_DRILLBIT_PARENT, "max-direct-memory"); + public static final String DRILLBIT_CODE_CACHE = append(DOY_DRILLBIT_PARENT, "code-cache"); + public static final String DRILLBIT_LOG_GC = append(DOY_DRILLBIT_PARENT, "log-gc"); + public static final String DRILLBIT_PREFIX_CLASSPATH = append( DOY_DRILLBIT_PARENT, "prefix-class-path"); + public static final String DRILLBIT_EXTN_CLASSPATH = append( DOY_DRILLBIT_PARENT, "extn-class-path"); + public static final String DRILLBIT_CLASSPATH = append(DOY_DRILLBIT_PARENT, "class-path"); + public static final String DRILLBIT_MAX_RETRIES = append(DOY_DRILLBIT_PARENT, "max-retries"); + public static final String DRILLBIT_DEBUG_LAUNCH = append(DOY_DRILLBIT_PARENT, "debug-launch"); + public static final String DRILLBIT_HTTP_PORT = ExecConstants.HTTP_PORT; + public static final String DISABLE_YARN_LOGS = append(DOY_DRILLBIT_PARENT, "disable-yarn-logs"); + public static final String DRILLBIT_USER_PORT = ExecConstants.INITIAL_USER_PORT; + public static final String DRILLBIT_BIT_PORT = ExecConstants.INITIAL_BIT_PORT; + public static final String DRILLBIT_USE_HTTPS = ExecConstants.HTTP_ENABLE_SSL; + public static final String DRILLBIT_MAX_EXTRA_NODES = append(DOY_DRILLBIT_PARENT, "max-extra-nodes"); + public static final String DRILLBIT_REQUEST_TIMEOUT_SEC = append(DOY_DRILLBIT_PARENT, "request-timeout-secs"); + + public static final String ZK_CONNECT = ExecConstants.ZK_CONNECTION; + public static final String ZK_ROOT = ExecConstants.ZK_ROOT; + public static final String ZK_FAILURE_TIMEOUT_MS = ExecConstants.ZK_TIMEOUT; + public static final String ZK_RETRY_COUNT = ExecConstants.ZK_RETRY_TIMES; + public static final String ZK_RETRY_DELAY_MS = ExecConstants.ZK_RETRY_DELAY; + + // Names selected to be parallel to Drillbit HTTP config. + + public static final String HTTP_ENABLED = append(HTTP_PARENT, "enabled"); + public static final String HTTP_ENABLE_SSL = append(HTTP_PARENT, "ssl-enabled"); + public static final String HTTP_PORT = append(HTTP_PARENT, "port"); + public static final String HTTP_AUTH_TYPE = append(HTTP_PARENT, "auth-type"); + public static final String HTTP_REST_KEY = append(HTTP_PARENT, "rest-key"); + public static final String HTTP_SESSION_MAX_IDLE_SECS = append(HTTP_PARENT, "session-max-idle-secs"); + public static final String HTTP_DOCS_LINK = append(HTTP_PARENT, "docs-link"); + public static final String HTTP_REFRESH_SECS = append(HTTP_PARENT, "refresh-secs"); + public static final String HTTP_USER_NAME = append(HTTP_PARENT, "user-name"); + public static final String HTTP_PASSWORD = append(HTTP_PARENT, "password"); + + public static final String AUTH_TYPE_NONE = "none"; + public static final String AUTH_TYPE_DRILL = "drill"; + public static final String AUTH_TYPE_SIMPLE = "simple"; + + public static final String CLIENT_POLL_SEC = append(CLIENT_PARENT, "poll-sec"); + public static final String CLIENT_START_WAIT_SEC = append(CLIENT_PARENT, "start-wait-sec"); + public static final String CLIENT_STOP_WAIT_SEC = append(CLIENT_PARENT, "stop-wait-sec"); + + public static final String CLUSTERS = append(DRILL_ON_YARN_PARENT, "cluster"); + + /** + * Name of the subdirectory of the container directory that will hold + * localized Drill distribution files. This name must be consistent between AM + * launch request and AM launch, and between Drillbit launch request and + * Drillbit launch. This name is fixed; there is no reason for the user to + * change it as it is visible only in the YARN container environment. + */ + + public static String LOCAL_DIR_NAME = "drill"; + + // Environment variables used to pass information from the Drill-on-YARN + // Client to the AM, or from the AM to the Drillbit launch script. + + public static final String APP_ID_ENV_VAR = "DRILL_AM_APP_ID"; + public static final String DRILL_ARCHIVE_ENV_VAR = "DRILL_ARCHIVE"; + public static final String SITE_ARCHIVE_ENV_VAR = "SITE_ARCHIVE"; + public static final String DRILL_HOME_ENV_VAR = "DRILL_HOME"; + public static final String DRILL_SITE_ENV_VAR = "DRILL_CONF_DIR"; + public static final String AM_HEAP_ENV_VAR = "DRILL_AM_HEAP"; + public static final String AM_JAVA_OPTS_ENV_VAR = "DRILL_AM_JAVA_OPTS"; + public static final String DRILL_CLASSPATH_ENV_VAR = "DRILL_CLASSPATH"; + public static final String DRILL_CLASSPATH_PREFIX_ENV_VAR = "DRILL_CLASSPATH_PREFIX"; + public static final String DOY_LIBPATH_ENV_VAR = "DOY_JAVA_LIB_PATH"; + public static final String DRILL_DEBUG_ENV_VAR = "DRILL_DEBUG"; + + /** + * Special value for the DRILL_DIR_NAME parameter to indicate to use the base + * name of the archive as the Drill home path. + */ + + private static final Object BASE_NAME_MARKER = "<base>"; + + /** + * The name of the Drill site archive stored in dfs. Since the archive is + * created by the client as a temp file, it's local name has no meaning; we + * use this standard name on dfs. + */ + + public static final String SITE_ARCHIVE_NAME = "site.tar.gz"; + + protected static DrillOnYarnConfig instance; + private File drillSite; + private File drillHome; + private static DrillConfig drillConfig; + private Config config; + private ScanResult classPathScan; + + public static String append(String parent, String key) { + return parent + "." + key; + } + + // Protected only to allow creating a test version of this class. + + protected DrillOnYarnConfig( ) { + } + + public static DrillOnYarnConfig load() throws DoyConfigException { + instance = new DrillOnYarnConfig(); + instance.doLoad(Thread.currentThread().getContextClassLoader()); + return instance; + } + + /** + * Load the config. + * @param cl class loader to use for resource searches (except defaults). + * Allows test to specify a specialized version. + * <p> + * Implemented in a way that allows unit testing. The parseUrl( ) methods + * let us mock the files; the load( ) methods seem to not actually use the + * provided class loader. + * + * @throws DoyConfigException + */ + protected void doLoad(ClassLoader cl) throws DoyConfigException { + Config drillConfig = loadDrillConfig(); + + // Resolution order, larger numbers take precedence. + // 1. Drill-on-YARN defaults. + // File is at root of the package tree. + + URL url = DrillOnYarnConfig.class.getResource(DEFAULTS_FILE_NAME); + if (url == null) { + throw new IllegalStateException( + "Drill-on-YARN defaults file is required: " + DEFAULTS_FILE_NAME); + } + config = ConfigFactory.parseURL(url).withFallback(drillConfig); + + // 2. Optional distribution-specific configuration-file. + // (Lets a vendor, for example, specify the default DFS upload location + // without tinkering with the user's own settings. + + url = cl.getResource(DISTRIB_FILE_NAME); + if (url != null) { + config = ConfigFactory.parseURL(url).withFallback(config); + } + + // 3. User's Drill-on-YARN configuration. + // Optional since defaults are fine & ZK comes from drill-override.conf. + + url = cl.getResource(CONFIG_FILE_NAME); + if (url != null) { + config = ConfigFactory.parseURL(url).withFallback(config); + } + + // 4. System properties + // Allows -Dfoo=bar on the command line. + // But, note that substitutions are NOT allowed in system properties! + + config = ConfigFactory.systemProperties().withFallback(config); + + // Resolution allows ${foo.bar} syntax in values, but only for values + // from config files, not from system properties. + + config = config.resolve(); + } + + private static Config loadDrillConfig() { + drillConfig = DrillConfig + .create(CommonConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME); + return drillConfig.resolve(); + } + + public DrillConfig getDrillConfig() { + return drillConfig; + } + + /** + * Return Drill's class path scan. This is used only in the main thread during + * initialization. Not needed by the client, so done in an unsynchronized, + * lazy fashion. + * + * @return + */ + + public ScanResult getClassPathScan() { + if (classPathScan == null) { + classPathScan = ClassPathScanner.fromPrescan(drillConfig); + } + return classPathScan; + } + + /** + * Obtain Drill home from the DRILL_HOME environment variable set by + * drill-config.sh, which is called from drill-on-yarn.sh. When debugging, + * DRILL_HOME must be set in the environment. + * <p> + * This information is required only by the client to prepare for uploads to + * DFS. + * + * @throws DoyConfigException + */ + + public void setClientPaths() throws DoyConfigException { + setClientDrillHome(); + setSiteDir(); + } + + private void setClientDrillHome() throws DoyConfigException { + // Try the environment variable that should have been + // set in drill-on-yarn.sh (for the client) or in the + // launch environment (for the AM.) + + String homeDir = getEnv(DRILL_HOME_ENV_VAR); + + // For ease in debugging, allow setting the Drill home in + // drill-on-yarn.conf. + // This setting is also used for a non-localized run. + + if (DoYUtil.isBlank(homeDir)) { + homeDir = config.getString(DRILL_HOME); + } + if (DoYUtil.isBlank(homeDir)) { + throw new DoyConfigException( + "The DRILL_HOME environment variable must point to your Drill install."); + } + drillHome = new File(homeDir); + } + + /** + * All environment variable access goes through this function to allow unit + * tests to replace this function to set test values. (The Java environment is + * immutable, so it is not possible for unit tests to change the actual + * environment.) + * + * @param key + * @return + */ + + protected String getEnv(String key) { + return System.getenv(key); + } + + /** + * On both the client and the AM, the site directory is optional. If provided, + * it was set with the --config (or --site) option to the script that launched + * the client or AM. In both cases, the script sets the drill.yarn.siteDir + * system property (and leaks the DRILL_HOME environment variable.) + * <p> + * For ease of debugging, if neither of those are set, this method uses the + * location of the drill-on-yarn configuration file to infer the site + * directory. + * <p> + * On the client, the site directory will be the "original" directory that + * contains the user's "master" files. On the AM, the site directory is a + * localized version of the client directory. Because of the way tar works, + * both the client and AM site directories have the same name; though the path + * to that name obviously differs. + * + * @throws DoyConfigException + */ + + private void setSiteDir() throws DoyConfigException { + // The site directory is the one where the config file lives. + // This should have been set in an environment variable by the launch + // script. + + String sitePath = getEnv("DRILL_CONF_DIR"); + if (!DoYUtil.isBlank(sitePath)) { + drillSite = new File(sitePath); + } else { + + // Otherwise, let's guess it from the config file. This version assists + // in debugging as it reduces setup steps. + + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + if (classLoader == null) { + classLoader = DrillOnYarnConfig.class.getClassLoader(); + } + + URL url = classLoader.getResource(CommonConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME); + if (url == null) { + throw new DoyConfigException( + "Drill configuration file is missing: " + CommonConstants.CONFIG_OVERRIDE_RESOURCE_PATHNAME); + } + File confFile; + try { + java.nio.file.Path confPath = Paths.get(url.toURI()); + confFile = confPath.toFile(); + } catch (URISyntaxException e) { + throw new DoyConfigException( + "Invalid path to Drill-on-YARN configuration file: " + + url.toString(), + e); + } + drillSite = confFile.getParentFile(); + } + + // Verify that the site directory is not just $DRILL_HOME/conf. + // Since the calling script does not differentiate between the two cases. + // But, treat $DRILL_HOME/conf as the site directory if: + // 1. The conf-as-site property is true, or + // 2. The Drill archive resides within $DRILL_HOME. + // + // The above situations occur in certain distributions that + // ship the archive inside the site directory and don't use a + // site directory. + + if (drillHome.equals(drillSite.getParentFile()) + && !config.getBoolean(CONF_AS_SITE)) { + String drillArchivePath = config + .getString(DrillOnYarnConfig.DRILL_ARCHIVE_PATH); + if (!DoYUtil.isBlank(drillArchivePath)) { + File archiveFile = new File(drillArchivePath); + if (!archiveFile.isAbsolute() && !archiveFile.getAbsolutePath() + .startsWith(drillHome.getAbsolutePath())) { + drillSite = null; + } + } + } + } + + /** + * Retrieve the AM Drill home location from the DRILL_HOME variable set in the + * drill-am.sh launch script. + * + * @throws DoyConfigException + */ + + public void setAmDrillHome() throws DoyConfigException { + String drillHomeStr = getEnv(DRILL_HOME_ENV_VAR); + drillHome = new File(drillHomeStr); + setSiteDir(); + } + + public Config getConfig() { + return instance.config; + } + + public static DrillOnYarnConfig instance() { + assert instance != null; + return instance; + } + + public static Config config() { + return instance().getConfig(); + } + + /** + * Return the Drill home on this machine as inferred from the config file + * contents or location. + * + * @return + */ + + public File getLocalDrillHome() { + return drillHome; + } + + public void dump() { + dump(System.out); + } + + private static final String keys[] = { + // drill.yarn + + APP_NAME, + CLUSTER_ID, + + // drill.yarn.dfs + + DFS_CONNECTION, + DFS_APP_DIR, + + // drill.yarn.hadoop + + HADOOP_HOME, + HADOOP_CLASSPATH, + HBASE_CLASSPATH, + + // drill.yarn.yarn + + YARN_QUEUE, + YARN_PRIORITY, + + // drill.yarn.drill-install + + DRILL_ARCHIVE_PATH, + DRILL_DIR_NAME, + LOCALIZE_DRILL, + CONF_AS_SITE, + DRILL_HOME, + DRILL_ARCHIVE_KEY, + SITE_ARCHIVE_KEY, + JAVA_LIB_PATH, + + // drill.yarn.client + + CLIENT_POLL_SEC, + CLIENT_START_WAIT_SEC, + CLIENT_STOP_WAIT_SEC, + + // drill.yarn.am + + AM_MEMORY, + AM_VCORES, + AM_DISKS, + AM_NODE_LABEL_EXPR, + AM_VM_ARGS, + AM_HEAP, + AM_POLL_PERIOD_MS, + AM_TICK_PERIOD_MS, + AM_PREFIX_CLASSPATH, + AM_CLASSPATH, + AM_DEBUG_LAUNCH, + AM_ENABLE_AUTO_SHUTDOWN, + + // drill.yarn.zk + + ZK_CONNECT, + ZK_ROOT, + ZK_RETRY_COUNT, + ZK_RETRY_DELAY_MS, + ZK_FAILURE_TIMEOUT_MS, + + // drill.yarn.drillbit + + DRILLBIT_MEMORY, + DRILLBIT_VCORES, + DRILLBIT_DISKS, + DRILLBIT_VM_ARGS, + DRILLBIT_HEAP, + DRILLBIT_DIRECT_MEM, + DRILLBIT_CODE_CACHE, + DRILLBIT_PREFIX_CLASSPATH, + DRILLBIT_EXTN_CLASSPATH, + DRILLBIT_CLASSPATH, + DRILLBIT_MAX_RETRIES, + DRILLBIT_DEBUG_LAUNCH, + DRILLBIT_MAX_EXTRA_NODES, + DRILLBIT_REQUEST_TIMEOUT_SEC, + DISABLE_YARN_LOGS, + DRILLBIT_HTTP_PORT, + DRILLBIT_USER_PORT, + DRILLBIT_BIT_PORT, + DRILLBIT_USE_HTTPS, + + // drill.yarn.http + + HTTP_ENABLED, + HTTP_ENABLE_SSL, + HTTP_PORT, + HTTP_AUTH_TYPE, + HTTP_SESSION_MAX_IDLE_SECS, + HTTP_DOCS_LINK, + HTTP_REFRESH_SECS, + // Do not include AM_REST_KEY: it is supposed to be secret. + // Same is true of HTTP_USER_NAME and HTTP_PASSWORD + }; + + private static String envVars[] = { + APP_ID_ENV_VAR, + DRILL_HOME_ENV_VAR, + DRILL_SITE_ENV_VAR, + AM_HEAP_ENV_VAR, + AM_JAVA_OPTS_ENV_VAR, + DRILL_CLASSPATH_PREFIX_ENV_VAR, + DRILL_CLASSPATH_ENV_VAR, + DRILL_ARCHIVE_ENV_VAR, + SITE_ARCHIVE_ENV_VAR, + DRILL_DEBUG_ENV_VAR + }; + + private void dump(PrintStream out) { + for (String key : keys) { + out.print(key); + out.print(" = "); + try { + out.println(config.getString(key)); + } catch (ConfigException.Missing e) { + out.println("<missing>"); + } + } + out.print(CLUSTERS); + out.println("["); + for (int i = 0; i < clusterGroupCount(); i++) { + ClusterDef.ClusterGroup cluster = ClusterDef.getCluster(config, i); + out.print(i); + out.println(" = {"); + cluster.dump(" ", out); + out.println(" }"); + } + out.println("]"); + } + + public void dumpEnv(PrintStream out) { + out.print("environment"); + out.println("["); + for (String envVar : envVars) { + String value = getEnv(envVar); + out.print(envVar); + out.print(" = "); + if (value == null) { + out.print("<unset>"); + } else { + out.print("\""); + out.print(value); + out.print("\""); + } + out.println(); + } + out.println("]"); + } + + public List<NameValuePair> getPairs() { + List<NameValuePair> pairs = new ArrayList<>(); + for (String key : keys) { + pairs.add(new NameValuePair(key, config.getString(key))); + } + for (int i = 0; i < clusterGroupCount(); i++) { + ClusterDef.ClusterGroup pool = ClusterDef.getCluster(config, i); + pool.getPairs(i, pairs); + } + + // Add environment variables as "pseudo" properties, + // prefixed with "envt.". + + for (String envVar : envVars) { + pairs.add(new NameValuePair("envt." + envVar, getEnv(envVar))); + } + return pairs; + } + + public static String clusterGroupKey(int index, String key) { + return CLUSTERS + "." + index + "." + key; + } + + public int clusterGroupCount() { + return config.getList(CLUSTERS).size(); + } + + private static String suffixes[] = { ".tar.gz", ".tgz", ".zip" }; + + public static String findSuffix(String baseName) { + baseName = baseName.toLowerCase(); + for (String extn : suffixes) { + if (baseName.endsWith(extn)) { + return extn; + } + } + return null; + } + + /** + * Get the location of Drill home on a remote machine, relative to the + * container working directory. Used when constructing a launch context. + * Assumes either the absolute path from the config file, or a constructed + * path to the localized Drill on the remote node. YARN examples use "./foo" + * to refer to container resources. But, since we cannot be sure when such a + * path is evaluated, we explicitly use YARN's PWD environment variable to get + * the absolute path. + * + * @return the remote path, with the "$PWD" environment variable. + * @throws DoyConfigException + */ + + public String getRemoteDrillHome() throws DoyConfigException { + // If the application is not localized, then the user can tell us the remote + // path in the config file. Otherwise, we assume that the remote path is the + // same as the local path. + + if (!config.getBoolean(LOCALIZE_DRILL)) { + String drillHomePath = config.getString(DRILL_HOME); + if (DoYUtil.isBlank(drillHomePath)) { + drillHomePath = drillHome.getAbsolutePath(); + } + return drillHomePath; + } + + // The application is localized. Work out the location within the container + // directory. The path starts with the "key" we specify when uploading the + // Drill archive; YARN expands the archive into a folder of that name. + + String drillHome = "$PWD/" + config.getString(DRILL_ARCHIVE_KEY); + + String home = config.getString(DRILL_DIR_NAME); + if (DoYUtil.isBlank(home)) { + // Assume the archive expands without a subdirectory. + } + + // If the special "<base>" marker is used, assume that the path depends + // on the name of the archive, which we know from the config file. + + else if (home.equals(BASE_NAME_MARKER)) { + + // Otherwise, assume that the archive expands to a directory with the + // same name as the archive itself (minus the archive suffix.) + + String drillArchivePath = config + .getString(DrillOnYarnConfig.DRILL_ARCHIVE_PATH); + if (DoYUtil.isBlank(drillArchivePath)) { + throw new DoyConfigException("Required config property not set: " + + DrillOnYarnConfig.DRILL_ARCHIVE_PATH); + } + File localArchiveFile = new File(drillArchivePath); + home = localArchiveFile.getName(); + String suffix = findSuffix(home); + if (suffix == null) { + throw new DoyConfigException(DrillOnYarnConfig.DRILL_ARCHIVE_PATH + + " does not name a valid archive: " + drillArchivePath); + } + drillHome += "/" + home.substring(0, home.length() - suffix.length()); + } else { + // If the user told us the name of the directory within the archive, + // use it. + + drillHome += "/" + home; + } + return drillHome; + } + + /** + * Get the optional remote site directory name. This name will include the + * absolute path for a non-localized application. It will return the path + * relative to the container for a localized application. In the localized + * case, the site archive is tar'ed relative to the site directory so that its + * contents are unarchived directly into the YARN-provided folder (with the + * name of the archive) key. That is, if the site directory on the client is + * /var/drill/my-site, the contents of the tar file will be + * "./drill-override.conf", etc., and the remote location is + * $PWD/site-key/drill-override.conf, where site-key is the key name used to + * localize the site archive. + * + * @return + */ + + public String getRemoteSiteDir() { + // If the application does not use a site directory, then return null. + + if (!hasSiteDir()) { + return null; + } + + // If the application is not localized, then use the remote site path + // provided in the config file. Otherwise, assume that the remote path + // is the same as the local path. + + if (!config.getBoolean(LOCALIZE_DRILL)) { + String drillSitePath = config.getString(SITE_DIR); + if (DoYUtil.isBlank(drillSitePath)) { + drillSitePath = drillSite.getAbsolutePath(); + } + return drillSitePath; + } + + // Work out the site directory name as above for the Drill directory. + // The caller must include a archive subdirectory name if required. + + return "$PWD/" + config.getString(SITE_ARCHIVE_KEY); + } + + /** + * Return the app ID file to use for this client run. The file is in the + * directory that holds the site dir (if a site dir is used), else the + * directory that holds Drill home (otherwise.) Not that the file does NOT go + * into the site dir or Drill home as we upload these directories (via + * archives) to DFS so we don't want to change them by adding a file. + * <p> + * It turns out that Drill allows two distinct clusters to share the same ZK + * root and/or cluster ID (just not the same combination), so the file name + * contains both parts. + * + * @param clusterId + * @return + */ + + public File getLocalAppIdFile() { + String rootDir = config.getString(DrillOnYarnConfig.ZK_ROOT); + String clusterId = config.getString(DrillOnYarnConfig.CLUSTER_ID); + String key = rootDir + "-" + clusterId; + String appIdFileName = key + ".appid"; + File appIdDir; + if (hasSiteDir()) { + appIdDir = drillSite.getParentFile(); + } else { + appIdDir = drillHome.getParentFile(); + } + return new File(appIdDir, appIdFileName); + } + + public boolean hasSiteDir() { + return drillSite != null; + } + + public File getLocalSiteDir() { + return drillSite; + } + + /** + * Returns the DFS path to the localized Drill archive. This is an AM-only + * method as it relies on an environment variable set by the client. It is set + * only if the application is localized, it is not set for a non-localized + * run. + * + * @return + */ + + public String getDrillArchiveDfsPath() { + return getEnv(DrillOnYarnConfig.DRILL_ARCHIVE_ENV_VAR); + } + + /** + * Returns the DFS path to the localized site archive. This is an AM-only + * method as it relies on an environment variable set by the client. This + * variable is optional; if not set then the AM can infer that the application + * does not use a site archive (configuration files reside in + * $DRILL_HOME/conf), or the application is not localized. + * + * @return + */ + + public String getSiteArchiveDfsPath() { + return getEnv(DrillOnYarnConfig.SITE_ARCHIVE_ENV_VAR); + } +} http://git-wip-us.apache.org/repos/asf/drill/blob/f2ac8749/drill-yarn/src/main/java/org/apache/drill/yarn/core/LaunchSpec.java ---------------------------------------------------------------------- diff --git a/drill-yarn/src/main/java/org/apache/drill/yarn/core/LaunchSpec.java b/drill-yarn/src/main/java/org/apache/drill/yarn/core/LaunchSpec.java new file mode 100644 index 0000000..6c22874 --- /dev/null +++ b/drill-yarn/src/main/java/org/apache/drill/yarn/core/LaunchSpec.java @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.yarn.core; + +import java.io.IOException; +import java.io.PrintStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.Records; + +/** + * Abstract description of a remote process launch that describes the many + * details needed to launch a process on a remote node. + * <p> + * Based on <a href="https://github.com/hortonworks/simple-yarn-app">Simple YARN + * App</a>. + */ + +public class LaunchSpec { + /** + * List of (key, file) pairs to be localized to the node before running the + * command. The file must exist in a distributed file system (such as HDFS) + * visible to both the client and remote node. Typically, the path is relative + * or absolute within the file system defined by the fs.defaultFS parameter in + * core-site.xml. + * <p> + * TODO: Can the value also be a URL such as + * <p> + * <code>hdfs://somehost:1234//path/to/file + * <p> + * The key is used as (what?). + */ + + public Map<String, LocalResource> resources = new HashMap<>(); + + /** + * Defines environment variables to be set on the remote host before launching + * the remote app. Note: do not set CLASSPATH here; use {@link #classPath} + * instead. + */ + + public Map<String, String> env = new HashMap<>(); + + /** + * Set to the name of the OS command to run when we wish to run a non-Java + * command. + */ + + public String command; + + /** + * Set to the name of the Java main class (the one with the main method) when + * we wish to run a Java command. + */ + + public String mainClass; + + /** + * Set to the application-specific class path for the Java application. These + * values are added to the Hadoop-provided values. These items are relative to + * (what?), use (what variables) to refer to the localized application + * directory. + */ + + public List<String> classPath = new ArrayList<>(); + + /** + * Optional VM arguments to pass to the JVM when running a Java class; ignored + * when running an OS command. + */ + + public List<String> vmArgs = new ArrayList<>(); + + /** + * Arguments to the remote command. + */ + + public List<String> cmdArgs = new ArrayList<>(); + + public LaunchSpec() { + } + + /** + * Create the command line to run on the remote node. The command can either + * be a simple OS command (if the {@link #command} member is set) or can be a + * Java class (if the {@link #mainClass} member is set. If the command is + * Java, then we pass along optional Java VM arguments. + * <p> + * In all cases we append arguments to the command itself, and redirect stdout + * and stderr to log files. + * + * @return the complete command string + */ + + public String getCommand() { + List<String> cmd = new ArrayList<>(); + if (command != null) { + cmd.add(command); + } else { + assert mainClass != null; + + // JAVA_HOME is provided by YARN. + + cmd.add(Environment.JAVA_HOME.$$() + "/bin/java"); + cmd.addAll(vmArgs); + if (!classPath.isEmpty()) { + cmd.add("-cp"); + cmd.add(DoYUtil.join(":", classPath)); + } + cmd.add(mainClass); + } + cmd.addAll(cmdArgs); + cmd.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); + cmd.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); + + // Java 8 + // return String.join( " ", cmd ); + return DoYUtil.join(" ", cmd); + } + + /** + * Given this generic description of an application, create the detailed YARN + * application submission context required to launch the application. + * + * @param conf + * the YARN configuration obtained by reading the Hadoop + * configuration files + * @return the completed application launch context for the given application + * @throws IOException + * if localized resources are not found in the distributed file + * system (such as HDFS) + */ + + public ContainerLaunchContext createLaunchContext(YarnConfiguration conf) + throws IOException { + // Set up the container launch context + ContainerLaunchContext container = Records + .newRecord(ContainerLaunchContext.class); + + // Set up the list of commands to run. Here, we assume that we run only + // one command. + + container.setCommands(Collections.singletonList(getCommand())); + + // Add localized resources + + container.setLocalResources(resources); + + // Environment. + + container.setEnvironment(env); + + return container; + } + + public void dump(PrintStream out) { + if (command != null) { + out.print("Command: "); + out.println(command); + } + if (mainClass != null) { + out.print("Main Class: "); + out.println(mainClass); + out.println("VM Args:"); + if (vmArgs.isEmpty()) { + out.println(" None"); + } else { + for (String item : vmArgs) { + out.print(" "); + out.println(item); + } + } + out.println("Class Path:"); + if (classPath.isEmpty()) { + out.println(" None"); + } else { + for (String item : classPath) { + out.print(" "); + out.println(item); + } + } + } + out.println("Program Args:"); + if (cmdArgs.isEmpty()) { + out.println(" None"); + } else { + for (String item : cmdArgs) { + out.print(" "); + out.println(item); + } + } + out.println("Environment:"); + if (env.isEmpty()) { + out.println(" None"); + } else { + for (String key : env.keySet()) { + out.print(" "); + out.print(key); + out.print("="); + out.println(env.get(key)); + } + } + out.println("Resources: "); + if (resources.isEmpty()) { + out.println(" None"); + } else { + for (String key : resources.keySet()) { + out.print(" Key: "); + out.println(key); + LocalResource resource = resources.get(key); + out.print(" URL: "); + out.println(resource.getResource().toString()); + out.print(" Size: "); + out.println(resource.getSize()); + out.print(" Timestamp: "); + out.println(DoYUtil.toIsoTime(resource.getTimestamp())); + out.print(" Type: "); + out.println(resource.getType().toString()); + out.print(" Visiblity: "); + out.println(resource.getVisibility().toString()); + } + } + } +}
