This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 2385361 HIVE-21149 : Refactor LlapServiceDriver (Miklos Gergely via
Ashutosh Chauhan)
2385361 is described below
commit 2385361ef33ff81760115cfee4de31ec18afe2f3
Author: Miklos Gergely <[email protected]>
AuthorDate: Tue Jan 22 15:16:00 2019 -0800
HIVE-21149 : Refactor LlapServiceDriver (Miklos Gergely via Ashutosh
Chauhan)
Signed-off-by: Ashutosh Chauhan <[email protected]>
---
bin/ext/llap.sh | 4 +-
.../hadoop/hive/llap/cli/LlapOptionsProcessor.java | 382 ----------
.../hadoop/hive/llap/cli/LlapServiceDriver.java | 839 ---------------------
.../hadoop/hive/llap/cli/LlapSliderUtils.java | 88 ---
.../llap/cli/service/AsyncTaskCopyAuxJars.java | 169 +++++
.../llap/cli/service/AsyncTaskCopyConfigs.java | 141 ++++
.../llap/cli/service/AsyncTaskCopyLocalJars.java | 81 ++
.../llap/cli/service/AsyncTaskCreateUdfFile.java | 126 ++++
.../llap/cli/service/AsyncTaskDownloadTezJars.java | 66 ++
.../llap/cli/service/LlapConfigJsonCreator.java | 114 +++
.../llap/cli/service/LlapServiceCommandLine.java | 470 ++++++++++++
.../hive/llap/cli/service/LlapServiceDriver.java | 394 ++++++++++
.../llap/cli/service/LlapTarComponentGatherer.java | 100 +++
.../hadoop/hive/llap/cli/service/package-info.java | 23 +
.../cli/service/TestLlapServiceCommandLine.java | 134 ++++
.../hadoop/hive/llap/cli/service/package-info.java | 23 +
16 files changed, 1843 insertions(+), 1311 deletions(-)
diff --git a/bin/ext/llap.sh b/bin/ext/llap.sh
index 91a54b3..3eb1573 100644
--- a/bin/ext/llap.sh
+++ b/bin/ext/llap.sh
@@ -18,7 +18,7 @@ export SERVICE_LIST="${SERVICE_LIST}${THISSERVICE} "
llap () {
TMPDIR=$(mktemp -d /tmp/staging-yarn-XXXXXX)
- CLASS=org.apache.hadoop.hive.llap.cli.LlapServiceDriver;
+ CLASS=org.apache.hadoop.hive.llap.cli.service.LlapServiceDriver;
if [ ! -f ${HIVE_LIB}/hive-cli-*.jar ]; then
echo "Missing Hive CLI Jar"
exit 3;
@@ -44,7 +44,7 @@ llap () {
}
llap_help () {
- CLASS=org.apache.hadoop.hive.llap.cli.LlapServiceDriver;
+ CLASS=org.apache.hadoop.hive.llap.cli.service.LlapServiceDriver;
execHiveCmd $CLASS "--help"
}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java
deleted file mode 100644
index 2445075..0000000
---
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapOptionsProcessor.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.cli;
-
-import com.google.common.base.Preconditions;
-import jline.TerminalFactory;
-
-import java.io.IOException;
-import java.util.Properties;
-
-import javax.annotation.Nonnull;
-
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.llap.log.LogHelpers;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.util.StringUtils;
-
-public class LlapOptionsProcessor {
-
- public static final String OPTION_INSTANCES = "instances"; //forward as arg
- public static final String OPTION_NAME = "name"; // forward as arg
- public static final String OPTION_DIRECTORY = "directory"; // work-dir
- public static final String OPTION_EXECUTORS = "executors"; //
llap-daemon-site
- public static final String OPTION_CACHE = "cache"; // llap-daemon-site
- public static final String OPTION_SIZE = "size"; // forward via config.json
- public static final String OPTION_XMX = "xmx"; // forward as arg
- public static final String OPTION_AUXJARS = "auxjars"; // used to localize
jars
- public static final String OPTION_AUXHIVE = "auxhive"; // used to localize
jars
- public static final String OPTION_AUXHBASE = "auxhbase"; // used to localize
jars
- public static final String OPTION_JAVA_HOME = "javaHome"; // forward via
config.json
- public static final String OPTION_HIVECONF = "hiveconf"; // llap-daemon-site
if relevant parameter
- public static final String OPTION_SERVICE_AM_CONTAINER_MB =
"service-am-container-mb"; // forward as arg
- public static final String OPTION_SERVICE_APPCONFIG_GLOBAL =
"service-appconfig-global"; // forward as arg
- public static final String OPTION_LLAP_QUEUE = "queue"; // forward via
config.json
- public static final String OPTION_IO_THREADS = "iothreads"; //
llap-daemon-site
-
- // Options for the python script that are here because our option parser
cannot ignore the unknown ones
- public static final String OPTION_ARGS = "args"; // forward as arg
- public static final String OPTION_LOGLEVEL = "loglevel"; // forward as arg
- public static final String OPTION_LOGGER = "logger"; // forward as arg
- public static final String OPTION_SERVICE_KEYTAB_DIR = "service-keytab-dir";
- public static final String OPTION_SERVICE_KEYTAB = "service-keytab";
- public static final String OPTION_SERVICE_PRINCIPAL = "service-principal";
- public static final String OPTION_SERVICE_PLACEMENT = "service-placement";
- public static final String OPTION_SERVICE_DEFAULT_KEYTAB =
"service-default-keytab";
- public static final String OPTION_OUTPUT_DIR = "output";
- public static final String OPTION_START = "startImmediately";
- public static final String OPTION_HEALTH_PERCENT = "health-percent";
- public static final String OPTION_HEALTH_TIME_WINDOW_SECS =
"health-time-window-secs";
- public static final String OPTION_HEALTH_INIT_DELAY_SECS =
"health-init-delay-secs";
-
- public static class LlapOptions {
- private final int instances;
- private final String directory;
- private final String name;
- private final int executors;
- private final int ioThreads;
- private final long cache;
- private final long size;
- private final long xmx;
- private final String jars;
- private final boolean isHbase;
- private final Properties conf;
- private final String javaPath;
- private final String llapQueueName;
- private final String logger;
- private final boolean isStarting;
- private final String output;
- private final boolean isHiveAux;
-
- public LlapOptions(String name, int instances, String directory, int
executors, int ioThreads,
- long cache, long size, long xmx, String jars, boolean isHbase,
- @Nonnull Properties hiveconf, String javaPath, String llapQueueName,
String logger,
- boolean isStarting, String output, boolean isHiveAux) throws
ParseException {
- if (instances <= 0) {
- throw new ParseException("Invalid configuration: " + instances
- + " (should be greater than 0)");
- }
- this.instances = instances;
- this.directory = directory;
- this.name = name;
- this.executors = executors;
- this.ioThreads = ioThreads;
- this.cache = cache;
- this.size = size;
- this.xmx = xmx;
- this.jars = jars;
- this.isHbase = isHbase;
- this.isHiveAux = isHiveAux;
- this.conf = hiveconf;
- this.javaPath = javaPath;
- this.llapQueueName = llapQueueName;
- this.logger = logger;
- this.isStarting = isStarting;
- this.output = output;
- }
-
- public String getOutput() {
- return output;
- }
-
- public String getName() {
- return name;
- }
-
- public int getInstances() {
- return instances;
- }
-
- public String getDirectory() {
- return directory;
- }
-
- public int getExecutors() {
- return executors;
- }
-
- public int getIoThreads() {
- return ioThreads;
- }
-
- public long getCache() {
- return cache;
- }
-
- public long getSize() {
- return size;
- }
-
- public long getXmx() {
- return xmx;
- }
-
- public String getAuxJars() {
- return jars;
- }
-
- public boolean getIsHBase() {
- return isHbase;
- }
-
- public boolean getIsHiveAux() {
- return isHiveAux;
- }
-
- public Properties getConfig() {
- return conf;
- }
-
- public String getJavaPath() {
- return javaPath;
- }
-
- public String getLlapQueueName() {
- return llapQueueName;
- }
-
- public String getLogger() {
- return logger;
- }
-
- public boolean isStarting() {
- return isStarting;
- }
- }
-
- protected static final Logger l4j =
LoggerFactory.getLogger(LlapOptionsProcessor.class.getName());
- private final Options options = new Options();
- private org.apache.commons.cli.CommandLine commandLine;
-
- @SuppressWarnings("static-access")
- public LlapOptionsProcessor() {
-
- // set the number of instances on which llap should run
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_INSTANCES).withLongOpt(OPTION_INSTANCES)
- .withDescription("Specify the number of instances to run this
on").create('i'));
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_NAME).withLongOpt(OPTION_NAME)
- .withDescription("Cluster name for YARN registry").create('n'));
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_DIRECTORY).withLongOpt(OPTION_DIRECTORY)
- .withDescription("Temp directory for jars etc.").create('d'));
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_ARGS).withLongOpt(OPTION_ARGS)
- .withDescription("java arguments to the llap instance").create('a'));
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_LOGLEVEL).withLongOpt(OPTION_LOGLEVEL)
- .withDescription("log levels for the llap instance").create('l'));
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_LOGGER).withLongOpt(OPTION_LOGGER)
- .withDescription(
- "logger for llap instance ([" + LogHelpers.LLAP_LOGGER_NAME_RFA +
"], " +
- LogHelpers.LLAP_LOGGER_NAME_QUERY_ROUTING + ", " +
LogHelpers.LLAP_LOGGER_NAME_CONSOLE)
- .create());
-
-
options.addOption(OptionBuilder.hasArg(false).withArgName(OPTION_SERVICE_DEFAULT_KEYTAB).withLongOpt(OPTION_SERVICE_DEFAULT_KEYTAB)
- .withDescription("try to set default settings for Service AM keytab;
mostly for dev testing").create());
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_SERVICE_KEYTAB_DIR).withLongOpt(OPTION_SERVICE_KEYTAB_DIR)
- .withDescription("Service AM keytab directory on HDFS (where the
headless user keytab is stored by Service keytab installation, e.g.
.yarn/keytabs/llap)").create());
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_SERVICE_KEYTAB).withLongOpt(OPTION_SERVICE_KEYTAB)
- .withDescription("Service AM keytab file name inside " +
OPTION_SERVICE_KEYTAB_DIR).create());
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_SERVICE_PRINCIPAL).withLongOpt(OPTION_SERVICE_PRINCIPAL)
- .withDescription("Service AM principal; should be the user running the
cluster, e.g. [email protected]").create());
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_SERVICE_PLACEMENT).withLongOpt(OPTION_SERVICE_PLACEMENT)
- .withDescription("Service placement policy; see YARN documentation at
https://issues.apache.org/jira/browse/YARN-1042."
- + " This is unnecessary if LLAP is going to take more than half of
the YARN capacity of a node.").create());
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_HEALTH_PERCENT).withLongOpt(OPTION_HEALTH_PERCENT)
- .withDescription("Percentage of running containers after which LLAP
application is considered healthy" +
- " (Default: 80)").create());
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_HEALTH_INIT_DELAY_SECS)
- .withLongOpt(OPTION_HEALTH_INIT_DELAY_SECS)
- .withDescription("Delay in seconds after which health percentage is
monitored (Default: 400)").create());
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_HEALTH_TIME_WINDOW_SECS)
- .withLongOpt(OPTION_HEALTH_TIME_WINDOW_SECS)
- .withDescription("Time window in seconds (after initial delay) for which
LLAP application is allowed to be in " +
- "unhealthy state before being killed (Default: 300)").create());
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_EXECUTORS).withLongOpt(OPTION_EXECUTORS)
- .withDescription("executor per instance").create('e'));
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_CACHE).withLongOpt(OPTION_CACHE)
- .withDescription("cache size per instance").create('c'));
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_SIZE).withLongOpt(OPTION_SIZE)
- .withDescription("container size per instance").create('s'));
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_XMX).withLongOpt(OPTION_XMX)
- .withDescription("working memory size").create('w'));
-
- options.addOption(OptionBuilder.hasArg().withArgName(OPTION_LLAP_QUEUE)
- .withLongOpt(OPTION_LLAP_QUEUE)
- .withDescription("The queue within which LLAP will be
started").create('q'));
-
- options.addOption(OptionBuilder.hasArg().withArgName(OPTION_OUTPUT_DIR)
- .withLongOpt(OPTION_OUTPUT_DIR)
- .withDescription("Output directory for the generated
scripts").create());
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_AUXJARS).withLongOpt(OPTION_AUXJARS)
- .withDescription("additional jars to package (by default, JSON SerDe
jar is packaged"
- + " if available)").create('j'));
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_AUXHBASE).withLongOpt(OPTION_AUXHBASE)
- .withDescription("whether to package the HBase jars (true by
default)").create('h'));
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_AUXHIVE).withLongOpt(OPTION_AUXHIVE)
- .withDescription("whether to package the Hive aux jars (true by
default)").create(OPTION_AUXHIVE));
-
-
options.addOption(OptionBuilder.hasArg().withArgName(OPTION_JAVA_HOME).withLongOpt(OPTION_JAVA_HOME)
- .withDescription(
- "Path to the JRE/JDK. This should be installed at the same
location on all cluster nodes ($JAVA_HOME, java.home by default)")
- .create());
-
- // -hiveconf x=y
-
options.addOption(OptionBuilder.withValueSeparator().hasArgs(2).withArgName("property=value")
- .withLongOpt(OPTION_HIVECONF)
- .withDescription("Use value for given property. Overridden by explicit
parameters")
- .create());
-
- options.addOption(OptionBuilder.hasArg().withArgName("b")
- .withLongOpt(OPTION_SERVICE_AM_CONTAINER_MB)
- .withDescription("The size of the service AppMaster container in
MB").create('b'));
-
-
options.addOption(OptionBuilder.withValueSeparator().hasArgs(2).withArgName("property=value")
- .withLongOpt(OPTION_SERVICE_APPCONFIG_GLOBAL)
- .withDescription("Property (key=value) to be set in the global section
of the Service appConfig")
- .create());
-
- options.addOption(OptionBuilder.hasArg().withArgName(OPTION_IO_THREADS)
- .withLongOpt(OPTION_IO_THREADS).withDescription("executor per
instance").create('t'));
-
- options.addOption(OptionBuilder.hasArg(false).withArgName(OPTION_START)
- .withLongOpt(OPTION_START).withDescription("immediately start the
cluster")
- .create('z'));
-
- // [-H|--help]
- options.addOption(new Option("H", "help", false, "Print help
information"));
- }
-
- private static long parseSuffixed(String value) {
- return StringUtils.TraditionalBinaryPrefix.string2long(value);
- }
-
- public LlapOptions processOptions(String argv[]) throws ParseException,
IOException {
- commandLine = new GnuParser().parse(options, argv);
- if (commandLine.hasOption('H') || false ==
commandLine.hasOption("instances")) {
- // needs at least --instances
- printUsage();
- return null;
- }
-
- int instances =
Integer.parseInt(commandLine.getOptionValue(OPTION_INSTANCES));
- String directory = commandLine.getOptionValue(OPTION_DIRECTORY);
- String jars = commandLine.getOptionValue(OPTION_AUXJARS);
-
- String name = commandLine.getOptionValue(OPTION_NAME, null);
-
- final int executors =
Integer.parseInt(commandLine.getOptionValue(OPTION_EXECUTORS, "-1"));
- final int ioThreads = Integer.parseInt(
- commandLine.getOptionValue(OPTION_IO_THREADS,
Integer.toString(executors)));
- final long cache = parseSuffixed(commandLine.getOptionValue(OPTION_CACHE,
"-1"));
- final long size = parseSuffixed(commandLine.getOptionValue(OPTION_SIZE,
"-1"));
- final long xmx = parseSuffixed(commandLine.getOptionValue(OPTION_XMX,
"-1"));
- final boolean isHbase = Boolean.parseBoolean(
- commandLine.getOptionValue(OPTION_AUXHBASE, "true"));
- final boolean isHiveAux = Boolean.parseBoolean(
- commandLine.getOptionValue(OPTION_AUXHIVE, "true"));
- final boolean doStart = commandLine.hasOption(OPTION_START);
- final String output = commandLine.getOptionValue(OPTION_OUTPUT_DIR, null);
-
- final String queueName = commandLine.getOptionValue(OPTION_LLAP_QUEUE,
- HiveConf.ConfVars.LLAP_DAEMON_QUEUE_NAME.getDefaultValue());
-
- final Properties hiveconf;
-
- if (commandLine.hasOption(OPTION_HIVECONF)) {
- hiveconf = commandLine.getOptionProperties(OPTION_HIVECONF);
- } else {
- hiveconf = new Properties();
- }
-
- String javaHome = null;
- if (commandLine.hasOption(OPTION_JAVA_HOME)) {
- javaHome = commandLine.getOptionValue(OPTION_JAVA_HOME);
- }
-
- String logger = null;
- if (commandLine.hasOption(OPTION_LOGGER)) {
- logger = commandLine.getOptionValue(OPTION_LOGGER);
- Preconditions.checkArgument(
- logger.equalsIgnoreCase(LogHelpers.LLAP_LOGGER_NAME_QUERY_ROUTING) ||
- logger.equalsIgnoreCase(LogHelpers.LLAP_LOGGER_NAME_CONSOLE) ||
- logger.equalsIgnoreCase(LogHelpers.LLAP_LOGGER_NAME_RFA));
- }
-
- // loglevel & args are parsed by the python processor
-
- return new LlapOptions(name, instances, directory, executors, ioThreads,
cache, size, xmx,
- jars, isHbase, hiveconf, javaHome, queueName, logger, doStart, output,
isHiveAux);
- }
-
- private void printUsage() {
- HelpFormatter hf = new HelpFormatter();
- try {
- int width = hf.getWidth();
- int jlineWidth = TerminalFactory.get().getWidth();
- width = Math.min(160, Math.max(jlineWidth, width)); // Ignore
potentially incorrect values
- hf.setWidth(width);
- } catch (Throwable t) { // Ignore
- }
- hf.printHelp("llap", options);
- }
-}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
deleted file mode 100644
index 4bc2431..0000000
---
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ /dev/null
@@ -1,839 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hive.llap.cli;
-
-import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hive.common.CompressionUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.llap.LlapUtil;
-import org.apache.hadoop.hive.llap.cli.LlapOptionsProcessor.LlapOptions;
-import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
-import org.apache.hadoop.hive.llap.daemon.impl.LlapConstants;
-import org.apache.hadoop.hive.llap.daemon.impl.StaticPermanentFunctionChecker;
-import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
-import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat;
-import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
-import org.apache.hadoop.hive.metastore.api.Function;
-import org.apache.hadoop.hive.metastore.api.ResourceUri;
-import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.io.HiveInputFormat;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.ql.util.ResourceDownloader;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.registry.client.binding.RegistryUtils;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.tez.dag.api.TezConfiguration;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.eclipse.jetty.rewrite.handler.Rule;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-public class LlapServiceDriver {
- protected static final Logger LOG =
LoggerFactory.getLogger(LlapServiceDriver.class.getName());
-
- private static final String[]
- DEFAULT_AUX_CLASSES =
- new String[] { "org.apache.hive.hcatalog.data.JsonSerDe",
"org.apache.hadoop.hive.druid.DruidStorageHandler",
- "org.apache.hive.storage.jdbc.JdbcStorageHandler",
"org.apache.commons.dbcp.BasicDataSourceFactory",
- "org.apache.commons.pool.impl.GenericObjectPool",
"org.apache.hadoop.hive.kafka.KafkaStorageHandler" };
- private static final String HBASE_SERDE_CLASS =
"org.apache.hadoop.hive.hbase.HBaseSerDe";
- private static final String[] NEEDED_CONFIGS =
LlapDaemonConfiguration.DAEMON_CONFIGS;
- private static final String[] OPTIONAL_CONFIGS =
LlapDaemonConfiguration.SSL_DAEMON_CONFIGS;
- private static final String OUTPUT_DIR_PREFIX = "llap-yarn-";
-
- // This is not a config that users set in hive-site. It's only use is to
share information
- // between the java component of the service driver and the python component.
- private static final String CONFIG_CLUSTER_NAME =
"private.hive.llap.servicedriver.cluster.name";
-
- /**
- * This is a working configuration for the instance to merge various
variables.
- * It is not written out for llap server usage
- */
- private final HiveConf conf;
-
- public LlapServiceDriver() {
- SessionState ss = SessionState.get();
- conf = (ss != null) ? ss.getConf() : new HiveConf(SessionState.class);
- }
-
- public static void main(String[] args) throws Exception {
- LOG.info("LLAP service driver invoked with arguments={}", args);
- int ret = 0;
- try {
- ret = new LlapServiceDriver().run(args);
- } catch (Throwable t) {
- System.err.println("Failed: " + t.getMessage());
- t.printStackTrace();
- ret = 3;
- } finally {
- LOG.info("LLAP service driver finished");
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Completed processing - exiting with " + ret);
- }
- System.exit(ret);
- }
-
-
- private static Configuration resolve(Configuration configured, Properties
direct,
- Properties hiveconf) {
- Configuration conf = new Configuration(false);
-
- populateConf(configured, conf, hiveconf, "CLI hiveconf");
- populateConf(configured, conf, direct, "CLI direct");
-
- return conf;
- }
-
- private static void populateConf(Configuration configured, Configuration
target,
- Properties properties, String source) {
- for (Entry<Object, Object> entry : properties.entrySet()) {
- String key = (String) entry.getKey();
- String val = configured.get(key);
- if (val != null) {
- target.set(key, val, source);
- }
- }
- }
-
- static void populateConfWithLlapProperties(Configuration conf, Properties
properties) {
- for(Entry<Object, Object> props : properties.entrySet()) {
- String key = (String) props.getKey();
- if (HiveConf.getLlapDaemonConfVars().contains(key)) {
- conf.set(key, (String) props.getValue());
- } else {
- if (key.startsWith(HiveConf.PREFIX_LLAP) ||
key.startsWith(HiveConf.PREFIX_HIVE_LLAP)) {
- LOG.warn("Adding key [{}] even though it is not in the set of known
llap-server keys", key);
- conf.set(key, (String) props.getValue());
- } else {
- LOG.warn("Ignoring unknown llap server parameter: [{}]", key);
- }
- }
- }
- }
-
- private static abstract class NamedCallable<T> implements Callable<T> {
- public final String taskName;
- public NamedCallable (String name) {
- this.taskName = name;
- }
- public String getName() {
- return taskName;
- }
- }
-
- private int run(String[] args) throws Exception {
- LlapOptionsProcessor optionsProcessor = new LlapOptionsProcessor();
- final LlapOptions options = optionsProcessor.processOptions(args);
-
- final Properties propsDirectOptions = new Properties();
-
- if (options == null) {
- // help
- return 1;
- }
-
- // Working directory.
- Path tmpDir = new Path(options.getDirectory());
-
- if (conf == null) {
- throw new Exception("Cannot load any configuration to run command");
- }
-
- final long t0 = System.nanoTime();
-
- final FileSystem fs = FileSystem.get(conf);
- final FileSystem lfs = FileSystem.getLocal(conf).getRawFileSystem();
-
- int threadCount = Math.max(1, Runtime.getRuntime().availableProcessors() /
2);
- final ExecutorService executor = Executors.newFixedThreadPool(threadCount,
- new ThreadFactoryBuilder().setNameFormat("llap-pkg-%d").build());
- final CompletionService<Void> asyncRunner = new
ExecutorCompletionService<Void>(executor);
-
- int rc = 0;
- try {
-
- // needed so that the file is actually loaded into configuration.
- for (String f : NEEDED_CONFIGS) {
- conf.addResource(f);
- if (conf.getResource(f) == null) {
- throw new Exception("Unable to find required config file: " + f);
- }
- }
- for (String f : OPTIONAL_CONFIGS) {
- conf.addResource(f);
- }
-
- conf.reloadConfiguration();
-
- populateConfWithLlapProperties(conf, options.getConfig());
-
- if (options.getName() != null) {
- // update service registry configs - caveat: this has nothing to do
with the actual settings
- // as read by the AM
- // if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to
dynamically switch between
- // instances
- conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" +
options.getName());
-
propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname,
- "@" + options.getName());
- }
-
- if (options.getLogger() != null) {
- HiveConf.setVar(conf, ConfVars.LLAP_DAEMON_LOGGER,
options.getLogger());
- propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_LOGGER.varname,
options.getLogger());
- }
- boolean isDirect = HiveConf.getBoolVar(conf,
HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT);
-
- if (options.getSize() != -1) {
- if (options.getCache() != -1) {
- if (HiveConf.getBoolVar(conf,
HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED) == false) {
- // direct heap allocations need to be safer
- Preconditions.checkArgument(options.getCache() <
options.getSize(), "Cache size ("
- + LlapUtil.humanReadableByteCount(options.getCache()) + ") has
to be smaller"
- + " than the container sizing (" +
LlapUtil.humanReadableByteCount(options.getSize()) + ")");
- } else if (options.getCache() < options.getSize()) {
- LOG.warn("Note that this might need YARN physical memory
monitoring to be turned off "
- + "(yarn.nodemanager.pmem-check-enabled=false)");
- }
- }
- if (options.getXmx() != -1) {
- Preconditions.checkArgument(options.getXmx() < options.getSize(),
"Working memory (Xmx="
- + LlapUtil.humanReadableByteCount(options.getXmx()) + ") has to
be"
- + " smaller than the container sizing (" +
LlapUtil.humanReadableByteCount(options.getSize())
- + ")");
- }
- if (isDirect && !HiveConf.getBoolVar(conf,
HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) {
- // direct and not memory mapped
- Preconditions.checkArgument(options.getXmx() + options.getCache() <=
options.getSize(),
- "Working memory (Xmx=" +
LlapUtil.humanReadableByteCount(options.getXmx()) + ") + cache size ("
- + LlapUtil.humanReadableByteCount(options.getCache()) + ") has
to be smaller than the container sizing ("
- + LlapUtil.humanReadableByteCount(options.getSize()) + ")");
- }
- }
-
-
- if (options.getExecutors() != -1) {
- conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname,
options.getExecutors());
-
propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname,
- String.valueOf(options.getExecutors()));
- // TODO: vcpu settings - possibly when DRFA works right
- }
-
- if (options.getIoThreads() != -1) {
- conf.setLong(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname,
options.getIoThreads());
-
propsDirectOptions.setProperty(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname,
- String.valueOf(options.getIoThreads()));
- }
-
- long cache = -1, xmx = -1;
- if (options.getCache() != -1) {
- cache = options.getCache();
- conf.set(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
Long.toString(cache));
-
propsDirectOptions.setProperty(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
- Long.toString(cache));
- }
-
- if (options.getXmx() != -1) {
- // Needs more explanation here
- // Xmx is not the max heap value in JDK8. You need to subtract 50% of
the survivor fraction
- // from this, to get actual usable memory before it goes into GC
- xmx = options.getXmx();
- long xmxMb = (xmx / (1024L * 1024L));
- conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
xmxMb);
-
propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
- String.valueOf(xmxMb));
- }
-
- long size = options.getSize();
- if (size == -1) {
- long heapSize = xmx;
- if (!isDirect) {
- heapSize += cache;
- }
- size = Math.min((long)(heapSize * 1.2), heapSize + 1024L*1024*1024);
- if (isDirect) {
- size += cache;
- }
- }
- long containerSize = size / (1024 * 1024);
- final long minAlloc =
conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1);
- Preconditions.checkArgument(containerSize >= minAlloc, "Container size ("
- + LlapUtil.humanReadableByteCount(options.getSize()) + ") should be
greater"
- + " than minimum allocation(" +
LlapUtil.humanReadableByteCount(minAlloc * 1024L * 1024L) + ")");
- conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
containerSize);
-
propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
- String.valueOf(containerSize));
-
- LOG.info("Memory settings: container memory: {} executor memory: {}
cache memory: {}",
- LlapUtil.humanReadableByteCount(options.getSize()),
- LlapUtil.humanReadableByteCount(options.getXmx()),
- LlapUtil.humanReadableByteCount(options.getCache()));
-
- if (options.getLlapQueueName() != null &&
!options.getLlapQueueName().isEmpty()) {
- conf.set(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname,
options.getLlapQueueName());
- propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname,
- options.getLlapQueueName());
- }
-
- final URL logger =
conf.getResource(LlapConstants.LOG4j2_PROPERTIES_FILE);
-
- if (null == logger) {
- throw new Exception("Unable to find required config file:
llap-daemon-log4j2.properties");
- }
-
- Path home = new Path(System.getenv("HIVE_HOME"));
- Path scriptParent = new Path(new Path(home, "scripts"), "llap");
- Path scripts = new Path(scriptParent, "bin");
-
- if (!lfs.exists(home)) {
- throw new Exception("Unable to find HIVE_HOME:" + home);
- } else if (!lfs.exists(scripts)) {
- LOG.warn("Unable to find llap scripts:" + scripts);
- }
-
- final Path libDir = new Path(tmpDir, "lib");
- final Path tezDir = new Path(libDir, "tez");
- final Path udfDir = new Path(libDir, "udfs");
- final Path confPath = new Path(tmpDir, "conf");
- if (!lfs.mkdirs(confPath)) {
- LOG.warn("mkdirs for " + confPath + " returned false");
- }
- if (!lfs.mkdirs(tezDir)) {
- LOG.warn("mkdirs for " + tezDir + " returned false");
- }
- if (!lfs.mkdirs(udfDir)) {
- LOG.warn("mkdirs for " + udfDir + " returned false");
- }
-
- NamedCallable<Void> downloadTez = new NamedCallable<Void>("downloadTez")
{
- @Override
- public Void call() throws Exception {
- synchronized (fs) {
- String tezLibs = conf.get(TezConfiguration.TEZ_LIB_URIS);
- if (tezLibs == null) {
- LOG.warn("Missing tez.lib.uris in tez-site.xml");
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Copying tez libs from " + tezLibs);
- }
- lfs.mkdirs(tezDir);
- fs.copyToLocalFile(new Path(tezLibs), new Path(libDir,
"tez.tar.gz"));
- CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(),
tezDir.toString(),
- true);
- lfs.delete(new Path(libDir, "tez.tar.gz"), false);
- }
- return null;
- }
- };
-
- NamedCallable<Void> copyLocalJars = new
NamedCallable<Void>("copyLocalJars") {
- @Override
- public Void call() throws Exception {
- Class<?>[] dependencies = new Class<?>[] {
LlapDaemonProtocolProtos.class, // llap-common
- LlapTezUtils.class, // llap-tez
- LlapInputFormat.class, // llap-server
- HiveInputFormat.class, // hive-exec
- SslContextFactory.class, // hive-common (https deps)
- Rule.class, // Jetty rewrite class
- RegistryUtils.ServiceRecordMarshal.class, // ZK registry
- // log4j2
- com.lmax.disruptor.RingBuffer.class, // disruptor
- org.apache.logging.log4j.Logger.class, // log4j-api
- org.apache.logging.log4j.core.Appender.class, // log4j-core
- org.apache.logging.slf4j.Log4jLogger.class, // log4j-slf4j
- // log4j-1.2-API needed for NDC
- org.apache.log4j.config.Log4j1ConfigurationFactory.class,
- io.netty.util.NetUtil.class, // netty4
- org.jboss.netty.util.NetUtil.class, //netty3
- org.apache.arrow.vector.types.pojo.ArrowType.class,
//arrow-vector
- org.apache.arrow.memory.BaseAllocator.class, //arrow-memory
- org.apache.arrow.flatbuf.Schema.class, //arrow-format
- com.google.flatbuffers.Table.class, //flatbuffers
- com.carrotsearch.hppc.ByteArrayDeque.class //hppc
- };
-
- for (Class<?> c : dependencies) {
- Path jarPath = new Path(Utilities.jarFinderGetJar(c));
- lfs.copyFromLocalFile(jarPath, libDir);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Copying " + jarPath + " to " + libDir);
- }
- }
- return null;
- }
- };
-
- // copy default aux classes (json/hbase)
-
- NamedCallable<Void> copyAuxJars = new NamedCallable<Void>("copyAuxJars")
{
- @Override
- public Void call() throws Exception {
- for (String className : DEFAULT_AUX_CLASSES) {
- localizeJarForClass(lfs, libDir, className, false);
- }
- Collection<String> codecs =
conf.getStringCollection("io.compression.codecs");
- if (codecs != null) {
- for (String codecClassName : codecs) {
- localizeJarForClass(lfs, libDir, codecClassName, false);
- }
- }
- for (String className : getDbSpecificJdbcJars()) {
- localizeJarForClass(lfs, libDir, className, false);
- }
- if (options.getIsHBase()) {
- try {
- localizeJarForClass(lfs, libDir, HBASE_SERDE_CLASS, true);
- Job fakeJob = new Job(new JobConf()); // HBase API is convoluted.
- TableMapReduceUtil.addDependencyJars(fakeJob);
- Collection<String> hbaseJars =
- fakeJob.getConfiguration().getStringCollection("tmpjars");
- for (String jarPath : hbaseJars) {
- if (!jarPath.isEmpty()) {
- lfs.copyFromLocalFile(new Path(jarPath), libDir);
- }
- }
- } catch (Throwable t) {
- String err =
- "Failed to add HBase jars. Use --auxhbase=false to avoid
localizing them";
- LOG.error(err);
- System.err.println(err);
- throw new RuntimeException(t);
- }
- }
-
- HashSet<String> auxJars = new HashSet<>();
- // There are many ways to have AUX jars in Hive... sigh
- if (options.getIsHiveAux()) {
- // Note: we don't add ADDED jars, RELOADABLE jars, etc. That is by
design; there are too many ways
- // to add jars in Hive, some of which are session/etc. specific.
Env + conf + arg should be enough.
- addAuxJarsToSet(auxJars, conf.getAuxJars(), ",");
- addAuxJarsToSet(auxJars, System.getenv("HIVE_AUX_JARS_PATH"), ":");
- LOG.info("Adding the following aux jars from the environment and
configs: " + auxJars);
- }
-
- addAuxJarsToSet(auxJars, options.getAuxJars(), ",");
- for (String jarPath : auxJars) {
- lfs.copyFromLocalFile(new Path(jarPath), libDir);
- }
- return null;
- }
-
- private void addAuxJarsToSet(HashSet<String> auxJarSet, String
auxJars, String delimiter) {
- if (auxJars != null && !auxJars.isEmpty()) {
- // TODO: transitive dependencies warning?
- String[] jarPaths = auxJars.split(delimiter);
- for (String jarPath : jarPaths) {
- if (!jarPath.isEmpty()) {
- auxJarSet.add(jarPath);
- }
- }
- }
- }
- };
-
- NamedCallable<Void> copyUdfJars = new NamedCallable<Void>("copyUdfJars")
{
- @Override
- public Void call() throws Exception {
- // UDFs
- final Set<String> allowedUdfs;
-
- if (HiveConf.getBoolVar(conf,
HiveConf.ConfVars.LLAP_ALLOW_PERMANENT_FNS)) {
- synchronized (fs) {
- allowedUdfs = downloadPermanentFunctions(conf, udfDir);
- }
- } else {
- allowedUdfs = Collections.emptySet();
- }
-
- PrintWriter udfStream =
- new PrintWriter(lfs.create(new Path(confPath,
- StaticPermanentFunctionChecker.PERMANENT_FUNCTIONS_LIST)));
- for (String udfClass : allowedUdfs) {
- udfStream.println(udfClass);
- }
-
- udfStream.close();
- return null;
- }
- };
-
- String java_home;
- if (options.getJavaPath() == null || options.getJavaPath().isEmpty()) {
- java_home = System.getenv("JAVA_HOME");
- String jre_home = System.getProperty("java.home");
- if (java_home == null) {
- java_home = jre_home;
- } else if (!java_home.equals(jre_home)) {
- LOG.warn("Java versions might not match : JAVA_HOME=[{}],process
jre=[{}]", java_home,
- jre_home);
- }
- } else {
- java_home = options.getJavaPath();
- }
- if (java_home == null || java_home.isEmpty()) {
- throw new RuntimeException(
- "Could not determine JAVA_HOME from command line parameters,
environment or system properties");
- }
- LOG.info("Using [{}] for JAVA_HOME", java_home);
-
- NamedCallable<Void> copyConfigs = new NamedCallable<Void>("copyConfigs")
{
- @Override
- public Void call() throws Exception {
- // Copy over the mandatory configs for the package.
- for (String f : NEEDED_CONFIGS) {
- copyConfig(lfs, confPath, f);
- }
- for (String f : OPTIONAL_CONFIGS) {
- try {
- copyConfig(lfs, confPath, f);
- } catch (Throwable t) {
- LOG.info("Error getting an optional config " + f + "; ignoring:
" + t.getMessage());
- }
- }
- createLlapDaemonConfig(lfs, confPath, conf, propsDirectOptions,
options.getConfig());
- setUpLogAndMetricConfigs(lfs, logger, confPath);
- return null;
- }
- };
-
- @SuppressWarnings("unchecked")
- final NamedCallable<Void>[] asyncWork =
- new NamedCallable[] {
- downloadTez,
- copyUdfJars,
- copyLocalJars,
- copyAuxJars,
- copyConfigs };
- @SuppressWarnings("unchecked")
- final Future<Void>[] asyncResults = new Future[asyncWork.length];
- for (int i = 0; i < asyncWork.length; i++) {
- asyncResults[i] = asyncRunner.submit(asyncWork[i]);
- }
-
- // TODO: need to move from Python to Java for the rest of the script.
- JSONObject configs = createConfigJson(containerSize, cache, xmx,
java_home);
- writeConfigJson(tmpDir, lfs, configs);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Config generation took " + (System.nanoTime() - t0) + "
ns");
- }
- for (int i = 0; i < asyncWork.length; i++) {
- final long t1 = System.nanoTime();
- asyncResults[i].get();
- final long t2 = System.nanoTime();
- if (LOG.isDebugEnabled()) {
- LOG.debug(asyncWork[i].getName() + " waited for " + (t2 - t1) + "
ns");
- }
- }
- if (options.isStarting()) {
- String version = System.getenv("HIVE_VERSION");
- if (version == null || version.isEmpty()) {
- version = DateTime.now().toString("ddMMMyyyy");
- }
-
- String outputDir = options.getOutput();
- Path packageDir = null;
- if (outputDir == null) {
- outputDir = OUTPUT_DIR_PREFIX + version;
- packageDir = new Path(Paths.get(".").toAbsolutePath().toString(),
- OUTPUT_DIR_PREFIX + version);
- } else {
- packageDir = new Path(outputDir);
- }
- rc = runPackagePy(args, tmpDir, scriptParent, version, outputDir);
- if (rc == 0) {
- LlapSliderUtils.startCluster(conf, options.getName(),
- "llap-" + version + ".tar.gz", packageDir,
- HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME));
- }
- } else {
- rc = 0;
- }
- } finally {
- executor.shutdown();
- lfs.close();
- fs.close();
- }
-
- if (rc == 0) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Exiting successfully");
- }
- } else {
- LOG.info("Exiting with rc = " + rc);
- }
- return rc;
- }
-
- private int runPackagePy(String[] args, Path tmpDir, Path scriptParent,
- String version, String outputDir) throws IOException,
InterruptedException {
- Path scriptPath = new Path(new Path(scriptParent, "yarn"), "package.py");
- List<String> scriptArgs = new ArrayList<>(args.length + 7);
- scriptArgs.add("python");
- scriptArgs.add(scriptPath.toString());
- scriptArgs.add("--input");
- scriptArgs.add(tmpDir.toString());
- scriptArgs.add("--output");
- scriptArgs.add(outputDir);
- scriptArgs.add("--javaChild");
- for (String arg : args) {
- scriptArgs.add(arg);
- }
- LOG.debug("Calling package.py via: " + scriptArgs);
- ProcessBuilder builder = new ProcessBuilder(scriptArgs);
- builder.redirectError(ProcessBuilder.Redirect.INHERIT);
- builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
- builder.environment().put("HIVE_VERSION", version);
- return builder.start().waitFor();
- }
-
- private void writeConfigJson(Path tmpDir, final FileSystem lfs,
- JSONObject configs) throws IOException, JSONException {
- FSDataOutputStream os = lfs.create(new Path(tmpDir, "config.json"));
- OutputStreamWriter w = new OutputStreamWriter(os);
- configs.write(w);
- w.close();
- os.close();
- }
-
- private JSONObject createConfigJson(long containerSize, long cache, long xmx,
- String java_home) throws JSONException {
- // extract configs for processing by the python fragments in YARN Service
- JSONObject configs = new JSONObject();
-
- configs.put("java.home", java_home);
-
- configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
- HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB));
- configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
-
- configs.put(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
- HiveConf.getSizeVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE));
-
- configs.put(HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT.varname,
- HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT));
-
- configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
- HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
-
- configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname,
- HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE));
-
- configs.put(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname,
- HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS));
-
- // Let YARN pick the queue name, if it isn't provided in hive-site, or via
the command-line
- if (HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME) != null) {
- configs.put(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname,
- HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME));
- }
-
- // Propagate the cluster name to the script.
- String clusterHosts = HiveConf.getVar(conf,
ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
- if (!StringUtils.isEmpty(clusterHosts) && clusterHosts.startsWith("@")
- && clusterHosts.length() > 1) {
- configs.put(CONFIG_CLUSTER_NAME, clusterHosts.substring(1));
- }
-
- configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
- conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1));
-
- configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
- conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
-1));
-
- long maxDirect = (xmx > 0 && cache > 0 && xmx < cache * 1.25) ? (long)
(cache * 1.25) : -1;
- configs.put("max_direct_memory", Long.toString(maxDirect));
- return configs;
- }
-
- private Set<String> downloadPermanentFunctions(Configuration conf, Path
udfDir) throws HiveException,
- URISyntaxException, IOException {
- Map<String,String> udfs = new HashMap<String, String>();
- HiveConf hiveConf = new HiveConf();
- // disable expensive operations on the metastore
-
hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_INIT_METADATA_COUNT_ENABLED,
false);
- hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_METRICS, false);
- // performance problem: ObjectStore does its own new HiveConf()
- Hive hive = Hive.getWithFastCheck(hiveConf, false);
- ResourceDownloader resourceDownloader =
- new ResourceDownloader(conf, udfDir.toUri().normalize().getPath());
- List<Function> fns = hive.getAllFunctions();
- Set<URI> srcUris = new HashSet<>();
- for (Function fn : fns) {
- String fqfn = fn.getDbName() + "." + fn.getFunctionName();
- if (udfs.containsKey(fn.getClassName())) {
- LOG.warn("Duplicate function names found for " + fn.getClassName() + "
with " + fqfn
- + " and " + udfs.get(fn.getClassName()));
- }
- udfs.put(fn.getClassName(), fqfn);
- List<ResourceUri> resources = fn.getResourceUris();
- if (resources == null || resources.isEmpty()) {
- LOG.warn("Missing resources for " + fqfn);
- continue;
- }
- for (ResourceUri resource : resources) {
- srcUris.add(ResourceDownloader.createURI(resource.getUri()));
- }
- }
- for (URI srcUri : srcUris) {
- List<URI> localUris = resourceDownloader.downloadExternal(srcUri, null,
false);
- for(URI dst : localUris) {
- LOG.warn("Downloaded " + dst + " from " + srcUri);
- }
- }
- return udfs.keySet();
- }
-
- private void addJarForClassToListIfExists(String cls, List<String> jarList) {
- try {
- Class.forName(cls);
- jarList.add(cls);
- } catch (Exception e) {
- }
- }
- private List<String> getDbSpecificJdbcJars() {
- List<String> jdbcJars = new ArrayList<String>();
- addJarForClassToListIfExists("com.mysql.jdbc.Driver", jdbcJars); // add
mysql jdbc driver
- addJarForClassToListIfExists("org.postgresql.Driver", jdbcJars); // add
postgresql jdbc driver
- addJarForClassToListIfExists("oracle.jdbc.OracleDriver", jdbcJars); // add
oracle jdbc driver
-
addJarForClassToListIfExists("com.microsoft.sqlserver.jdbc.SQLServerDriver",
jdbcJars); // add mssql jdbc driver
- return jdbcJars;
- }
-
- private void localizeJarForClass(FileSystem lfs, Path libDir, String
className, boolean doThrow)
- throws IOException {
- String jarPath = null;
- boolean hasException = false;
- try {
- Class<?> auxClass = Class.forName(className);
- jarPath = Utilities.jarFinderGetJar(auxClass);
- } catch (Throwable t) {
- if (doThrow) {
- throw (t instanceof IOException) ? (IOException)t : new IOException(t);
- }
- hasException = true;
- String err = "Cannot find a jar for [" + className + "] due to an
exception ("
- + t.getMessage() + "); not packaging the jar";
- LOG.error(err);
- System.err.println(err);
- }
- if (jarPath != null) {
- lfs.copyFromLocalFile(new Path(jarPath), libDir);
- } else if (!hasException) {
- String err = "Cannot find a jar for [" + className + "]; not packaging
the jar";
- if (doThrow) {
- throw new IOException(err);
- }
- LOG.error(err);
- System.err.println(err);
- }
- }
-
- /**
- *
- * @param lfs filesystem on which file will be generated
- * @param confPath path wher the config will be generated
- * @param configured the base configuration instances
- * @param direct properties specified directly - i.e. using the properties
exact option
- * @param hiveconf properties specifried via --hiveconf
- * @throws IOException
- */
- private void createLlapDaemonConfig(FileSystem lfs, Path confPath,
Configuration configured,
- Properties direct, Properties hiveconf)
throws IOException {
- FSDataOutputStream confStream =
- lfs.create(new Path(confPath,
LlapDaemonConfiguration.LLAP_DAEMON_SITE));
-
- Configuration llapDaemonConf = resolve(configured, direct, hiveconf);
-
- llapDaemonConf.writeXml(confStream);
- confStream.close();
- }
-
- private void copyConfig(FileSystem lfs, Path confPath, String f) throws
IOException {
- HiveConf.getBoolVar(new Configuration(false),
ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS);
- // they will be file:// URLs
- lfs.copyFromLocalFile(new Path(conf.getResource(f).toString()), confPath);
- }
-
- private void setUpLogAndMetricConfigs(final FileSystem lfs, final URL logger,
- final Path confPath) throws IOException {
- // logger can be a resource stream or a real file (cannot use copy)
- InputStream loggerContent = logger.openStream();
- IOUtils.copyBytes(loggerContent,
- lfs.create(new Path(confPath, "llap-daemon-log4j2.properties"), true),
conf, true);
-
- String metricsFile = LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE;
- URL metrics2 = conf.getResource(metricsFile);
- if (metrics2 == null) {
- LOG.warn(LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " cannot
be found."
- + " Looking for " + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE);
- metricsFile = LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE;
- metrics2 = conf.getResource(metricsFile);
- }
- if (metrics2 != null) {
- InputStream metrics2FileStream = metrics2.openStream();
- IOUtils.copyBytes(metrics2FileStream,
- lfs.create(new Path(confPath, metricsFile), true), conf, true);
- LOG.info("Copied hadoop metrics2 properties file from " + metrics2);
- } else {
- LOG.warn("Cannot find " +
LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " or "
- + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE + " in classpath.");
- }
- }
-}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapSliderUtils.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapSliderUtils.java
index bdec1c1..a9a216c 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapSliderUtils.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapSliderUtils.java
@@ -18,102 +18,14 @@
package org.apache.hadoop.hive.llap.cli;
-import java.io.File;
-import java.io.IOException;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.client.ServiceClient;
-import org.apache.hadoop.yarn.service.utils.CoreFileSystem;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class LlapSliderUtils {
- private static final Logger LOG =
LoggerFactory.getLogger(LlapSliderUtils.class);
- private static final String LLAP_PACKAGE_DIR = ".yarn/package/LLAP/";
-
public static ServiceClient createServiceClient(Configuration conf) throws
Exception {
ServiceClient serviceClient = new ServiceClient();
serviceClient.init(conf);
serviceClient.start();
return serviceClient;
}
-
- public static Service getService(Configuration conf, String name) {
- LOG.info("Get service details for " + name);
- ServiceClient sc;
- try {
- sc = createServiceClient(conf);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- Service service = null;
- try {
- service = sc.getStatus(name);
- } catch (YarnException | IOException e) {
- // Probably the app does not exist
- LOG.info(e.getLocalizedMessage());
- throw new RuntimeException(e);
- } finally {
- try {
- sc.close();
- } catch (IOException e) {
- LOG.info("Failed to close service client", e);
- }
- }
- return service;
- }
-
- public static void startCluster(Configuration conf, String name, String
packageName, Path packageDir, String queue) {
- LOG.info("Starting cluster with " + name + ", " + packageName + ", " +
queue + ", " + packageDir);
- ServiceClient sc;
- try {
- sc = createServiceClient(conf);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- try {
- try {
- LOG.info("Executing the stop command");
- sc.actionStop(name, true);
- } catch (Exception ex) {
- // Ignore exceptions from stop
- LOG.info(ex.getLocalizedMessage());
- }
- try {
- LOG.info("Executing the destroy command");
- sc.actionDestroy(name);
- } catch (Exception ex) {
- // Ignore exceptions from destroy
- LOG.info(ex.getLocalizedMessage());
- }
- LOG.info("Uploading the app tarball");
- CoreFileSystem fs = new CoreFileSystem(conf);
- fs.createWithPermissions(new Path(LLAP_PACKAGE_DIR),
- FsPermission.getDirDefault());
- fs.copyLocalFileToHdfs(new File(packageDir.toString(), packageName),
- new Path(LLAP_PACKAGE_DIR), new FsPermission("755"));
-
- LOG.info("Executing the launch command");
- File yarnfile = new File(new Path(packageDir, "Yarnfile").toString());
- Long lifetime = null; // unlimited lifetime
- try {
- sc.actionLaunch(yarnfile.getAbsolutePath(), name, lifetime, queue);
- } finally {
- }
- LOG.debug("Started the cluster via service API");
- } catch (YarnException | IOException e) {
- throw new RuntimeException(e);
- } finally {
- try {
- sc.close();
- } catch (IOException e) {
- LOG.info("Failed to close service client", e);
- }
- }
- }
-
}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyAuxJars.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyAuxJars.java
new file mode 100644
index 0000000..7b2e32b
--- /dev/null
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyAuxJars.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Copy auxiliary jars for the tarball. */
+class AsyncTaskCopyAuxJars implements Callable<Void> {
+ private static final Logger LOG =
LoggerFactory.getLogger(AsyncTaskCopyAuxJars.class.getName());
+
+ private static final String[] DEFAULT_AUX_CLASSES =
+ new String[] {"org.apache.hive.hcatalog.data.JsonSerDe",
"org.apache.hadoop.hive.druid.DruidStorageHandler",
+ "org.apache.hive.storage.jdbc.JdbcStorageHandler",
"org.apache.commons.dbcp.BasicDataSourceFactory",
+ "org.apache.commons.pool.impl.GenericObjectPool",
"org.apache.hadoop.hive.kafka.KafkaStorageHandler"};
+ private static final String HBASE_SERDE_CLASS =
"org.apache.hadoop.hive.hbase.HBaseSerDe";
+
+ private final LlapServiceCommandLine cl;
+ private final HiveConf conf;
+ private final FileSystem rawFs;
+ private final Path libDir;
+
+ AsyncTaskCopyAuxJars(LlapServiceCommandLine cl, HiveConf conf, FileSystem
rawFs, Path libDir) {
+ this.cl = cl;
+ this.conf = conf;
+ this.rawFs = rawFs;
+ this.libDir = libDir;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ localizeJarForClass(Arrays.asList(DEFAULT_AUX_CLASSES), false);
+ localizeJarForClass(conf.getStringCollection("io.compression.codecs"),
false);
+ localizeJarForClass(getDbSpecificJdbcJars(), false);
+
+ if (cl.getIsHBase()) {
+ try {
+ localizeJarForClass(Arrays.asList(HBASE_SERDE_CLASS), true);
+ Job fakeJob = Job.getInstance(new JobConf()); // HBase API is
convoluted.
+ TableMapReduceUtil.addDependencyJars(fakeJob);
+ Collection<String> hbaseJars =
fakeJob.getConfiguration().getStringCollection("tmpjars");
+ for (String jarPath : hbaseJars) {
+ if (!jarPath.isEmpty()) {
+ rawFs.copyFromLocalFile(new Path(jarPath), libDir);
+ }
+ }
+ } catch (Throwable t) {
+ String err = "Failed to add HBase jars. Use --auxhbase=false to avoid
localizing them";
+ LOG.error(err);
+ System.err.println(err);
+ throw new RuntimeException(t);
+ }
+ }
+
+ Set<String> auxJars = new HashSet<>();
+ // There are many ways to have AUX jars in Hive... sigh
+ if (cl.getIsHiveAux()) {
+ // Note: we don't add ADDED jars, RELOADABLE jars, etc. That is by
design; there are too many ways
+ // to add jars in Hive, some of which are session/etc. specific. Env +
conf + arg should be enough.
+ addAuxJarsToSet(auxJars, conf.getAuxJars(), ",");
+ addAuxJarsToSet(auxJars, System.getenv("HIVE_AUX_JARS_PATH"), ":");
+ LOG.info("Adding the following aux jars from the environment and
configs: " + auxJars);
+ }
+
+ addAuxJarsToSet(auxJars, cl.getAuxJars(), ",");
+ for (String jarPath : auxJars) {
+ rawFs.copyFromLocalFile(new Path(jarPath), libDir);
+ }
+ return null;
+ }
+
+ private void localizeJarForClass(Collection<String> classNames, boolean
doThrow) throws IOException {
+ if (CollectionUtils.isEmpty(classNames)) {
+ return;
+ }
+
+ for (String className : classNames) {
+ String jarPath = null;
+ boolean hasException = false;
+ try {
+ Class<?> clazz = Class.forName(className);
+ jarPath = Utilities.jarFinderGetJar(clazz);
+ } catch (Throwable t) {
+ if (doThrow) {
+ throw (t instanceof IOException) ? (IOException)t : new
IOException(t);
+ }
+ hasException = true;
+ String err = "Cannot find a jar for [" + className + "] due to an
exception (" +
+ t.getMessage() + "); not packaging the jar";
+ LOG.error(err);
+ System.err.println(err);
+ }
+
+ if (jarPath != null) {
+ rawFs.copyFromLocalFile(new Path(jarPath), libDir);
+ } else if (!hasException) {
+ String err = "Cannot find a jar for [" + className + "]; not packaging
the jar";
+ if (doThrow) {
+ throw new IOException(err);
+ }
+ LOG.error(err);
+ System.err.println(err);
+ }
+ }
+ }
+
+ private List<String> getDbSpecificJdbcJars() {
+ List<String> jdbcJars = new ArrayList<String>();
+ addJarForClassToListIfExists("com.mysql.jdbc.Driver", jdbcJars); // add
mysql jdbc driver
+ addJarForClassToListIfExists("org.postgresql.Driver", jdbcJars); // add
postgresql jdbc driver
+ addJarForClassToListIfExists("oracle.jdbc.OracleDriver", jdbcJars); // add
oracle jdbc driver
+
addJarForClassToListIfExists("com.microsoft.sqlserver.jdbc.SQLServerDriver",
jdbcJars); // add mssql jdbc driver
+ return jdbcJars;
+ }
+
+ private void addJarForClassToListIfExists(String cls, List<String> jarList) {
+ try {
+ Class.forName(cls);
+ jarList.add(cls);
+ } catch (Exception e) {
+ }
+ }
+
+ private void addAuxJarsToSet(Set<String> auxJarSet, String auxJars, String
delimiter) {
+ if (StringUtils.isNotEmpty(auxJars)) {
+ // TODO: transitive dependencies warning?
+ String[] jarPaths = auxJars.split(delimiter);
+ for (String jarPath : jarPaths) {
+ if (!jarPath.isEmpty()) {
+ auxJarSet.add(jarPath);
+ }
+ }
+ }
+ }
+}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyConfigs.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyConfigs.java
new file mode 100644
index 0000000..9d5b385
--- /dev/null
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyConfigs.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.util.Properties;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapConstants;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Copy config files for the tarball. */
+class AsyncTaskCopyConfigs implements Callable<Void> {
+ private static final Logger LOG =
LoggerFactory.getLogger(AsyncTaskCopyConfigs.class.getName());
+
+ private final LlapServiceCommandLine cl;
+ private final HiveConf conf;
+ private final Properties directProperties;
+ private final FileSystem rawFs;
+ private final Path confDir;
+
+ AsyncTaskCopyConfigs(LlapServiceCommandLine cl, HiveConf conf, Properties
directProperties, FileSystem rawFs,
+ Path confDir) {
+ this.cl = cl;
+ this.conf = conf;
+ this.directProperties = directProperties;
+ this.rawFs = rawFs;
+ this.confDir = confDir;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ // Copy over the mandatory configs for the package.
+ for (String f : LlapDaemonConfiguration.DAEMON_CONFIGS) {
+ copyConfig(f);
+ }
+ for (String f : LlapDaemonConfiguration.SSL_DAEMON_CONFIGS) {
+ try {
+ copyConfig(f);
+ } catch (Throwable t) {
+ LOG.info("Error getting an optional config " + f + "; ignoring: " +
t.getMessage());
+ }
+ }
+ createLlapDaemonConfig();
+ setUpLoggerConfig();
+ setUpMetricsConfig();
+ return null;
+ }
+
+ private void copyConfig(String f) throws IOException {
+ HiveConf.getBoolVar(new Configuration(false),
ConfVars.LLAP_CLIENT_CONSISTENT_SPLITS);
+ // they will be file:// URLs
+ rawFs.copyFromLocalFile(new Path(conf.getResource(f).toString()), confDir);
+ }
+
+ private void createLlapDaemonConfig() throws IOException {
+ FSDataOutputStream confStream = rawFs.create(new Path(confDir,
LlapDaemonConfiguration.LLAP_DAEMON_SITE));
+
+ Configuration llapDaemonConf = resolve();
+
+ llapDaemonConf.writeXml(confStream);
+ confStream.close();
+ }
+
+ private Configuration resolve() {
+ Configuration target = new Configuration(false);
+
+ populateConf(target, cl.getConfig(), "CLI hiveconf");
+ populateConf(target, directProperties, "CLI direct");
+
+ return target;
+ }
+
+ private void populateConf(Configuration target, Properties properties,
String source) {
+ for (Entry<Object, Object> entry : properties.entrySet()) {
+ String key = (String) entry.getKey();
+ String val = conf.get(key);
+ if (val != null) {
+ target.set(key, val, source);
+ }
+ }
+ }
+
+ private void setUpLoggerConfig() throws Exception {
+ // logger can be a resource stream or a real file (cannot use copy)
+ URL logger = conf.getResource(LlapConstants.LOG4j2_PROPERTIES_FILE);
+ if (null == logger) {
+ throw new Exception("Unable to find required config file:
llap-daemon-log4j2.properties");
+ }
+ InputStream loggerContent = logger.openStream();
+ IOUtils.copyBytes(loggerContent,
+ rawFs.create(new Path(confDir, "llap-daemon-log4j2.properties"),
true), conf, true);
+ }
+
+ private void setUpMetricsConfig() throws IOException {
+ String metricsFile = LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE;
+ URL metrics2 = conf.getResource(metricsFile);
+ if (metrics2 == null) {
+ LOG.warn(LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " cannot
be found." +
+ " Looking for " + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE);
+ metricsFile = LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE;
+ metrics2 = conf.getResource(metricsFile);
+ }
+ if (metrics2 != null) {
+ InputStream metrics2FileStream = metrics2.openStream();
+ IOUtils.copyBytes(metrics2FileStream, rawFs.create(new Path(confDir,
metricsFile), true), conf, true);
+ LOG.info("Copied hadoop metrics2 properties file from " + metrics2);
+ } else {
+ LOG.warn("Cannot find " +
LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " or " +
+ LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE + " in classpath.");
+ }
+ }
+}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyLocalJars.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyLocalJars.java
new file mode 100644
index 0000000..90f9b2c
--- /dev/null
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCopyLocalJars.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
+import org.apache.hadoop.hive.llap.io.api.impl.LlapInputFormat;
+import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.eclipse.jetty.rewrite.handler.Rule;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Copy local jars for the tarball. */
+class AsyncTaskCopyLocalJars implements Callable<Void> {
+ private static final Logger LOG =
LoggerFactory.getLogger(AsyncTaskCopyLocalJars.class.getName());
+
+ private final FileSystem rawFs;
+ private final Path libDir;
+
+ AsyncTaskCopyLocalJars(FileSystem rawFs, Path libDir) {
+ this.rawFs = rawFs;
+ this.libDir = libDir;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ Class<?>[] dependencies = new Class<?>[] {
+ LlapDaemonProtocolProtos.class, // llap-common
+ LlapTezUtils.class, // llap-tez
+ LlapInputFormat.class, // llap-server
+ HiveInputFormat.class, // hive-exec
+ SslContextFactory.class, // hive-common (https deps)
+ Rule.class, // Jetty rewrite class
+ RegistryUtils.ServiceRecordMarshal.class, // ZK registry
+ // log4j2
+ com.lmax.disruptor.RingBuffer.class, // disruptor
+ org.apache.logging.log4j.Logger.class, // log4j-api
+ org.apache.logging.log4j.core.Appender.class, // log4j-core
+ org.apache.logging.slf4j.Log4jLogger.class, // log4j-slf4j
+ // log4j-1.2-API needed for NDC
+ org.apache.log4j.config.Log4j1ConfigurationFactory.class,
+ io.netty.util.NetUtil.class, // netty4
+ org.jboss.netty.util.NetUtil.class, //netty3
+ org.apache.arrow.vector.types.pojo.ArrowType.class, //arrow-vector
+ org.apache.arrow.memory.BaseAllocator.class, //arrow-memory
+ org.apache.arrow.flatbuf.Schema.class, //arrow-format
+ com.google.flatbuffers.Table.class, //flatbuffers
+ com.carrotsearch.hppc.ByteArrayDeque.class //hppc
+ };
+
+ for (Class<?> c : dependencies) {
+ Path jarPath = new Path(Utilities.jarFinderGetJar(c));
+ rawFs.copyFromLocalFile(jarPath, libDir);
+ LOG.debug("Copying " + jarPath + " to " + libDir);
+ }
+ return null;
+ }
+}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCreateUdfFile.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCreateUdfFile.java
new file mode 100644
index 0000000..430471e
--- /dev/null
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskCreateUdfFile.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.Charset;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.daemon.impl.StaticPermanentFunctionChecker;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.ResourceUri;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.util.ResourceDownloader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Create the list of allowed UDFs for the tarball. */
+class AsyncTaskCreateUdfFile implements Callable<Void> {
+ private static final Logger LOG =
LoggerFactory.getLogger(AsyncTaskCreateUdfFile.class.getName());
+
+ private final HiveConf conf;
+ private final FileSystem fs;
+ private final FileSystem rawFs;
+ private final Path udfDir;
+ private final Path confDir;
+
+ AsyncTaskCreateUdfFile(HiveConf conf, FileSystem fs, FileSystem rawFs, Path
udfDir, Path confDir) {
+ this.conf = conf;
+ this.fs = fs;
+ this.rawFs = rawFs;
+ this.udfDir = udfDir;
+ this.confDir = confDir;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ // UDFs
+ final Set<String> allowedUdfs;
+
+ if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOW_PERMANENT_FNS))
{
+ synchronized (fs) {
+ allowedUdfs = downloadPermanentFunctions();
+ }
+ } else {
+ allowedUdfs = Collections.emptySet();
+ }
+
+ OutputStream os = rawFs.create(new Path(confDir,
StaticPermanentFunctionChecker.PERMANENT_FUNCTIONS_LIST));
+ OutputStreamWriter osw = new OutputStreamWriter(os,
Charset.defaultCharset());
+ PrintWriter udfStream = new PrintWriter(osw);
+ for (String udfClass : allowedUdfs) {
+ udfStream.println(udfClass);
+ }
+
+ udfStream.close();
+ return null;
+ }
+
+ private Set<String> downloadPermanentFunctions() throws HiveException,
URISyntaxException, IOException {
+ Map<String, String> udfs = new HashMap<String, String>();
+ HiveConf hiveConf = new HiveConf();
+ // disable expensive operations on the metastore
+
hiveConf.setBoolean(MetastoreConf.ConfVars.INIT_METADATA_COUNT_ENABLED.getVarname(),
false);
+ hiveConf.setBoolean(MetastoreConf.ConfVars.METRICS_ENABLED.getVarname(),
false);
+ // performance problem: ObjectStore does its own new HiveConf()
+ Hive hive = Hive.getWithFastCheck(hiveConf, false);
+ ResourceDownloader resourceDownloader = new ResourceDownloader(conf,
udfDir.toUri().normalize().getPath());
+ List<Function> fns = hive.getAllFunctions();
+ Set<URI> srcUris = new HashSet<>();
+ for (Function fn : fns) {
+ String fqfn = fn.getDbName() + "." + fn.getFunctionName();
+ if (udfs.containsKey(fn.getClassName())) {
+ LOG.warn("Duplicate function names found for " + fn.getClassName() + "
with " + fqfn + " and " +
+ udfs.get(fn.getClassName()));
+ }
+ udfs.put(fn.getClassName(), fqfn);
+ List<ResourceUri> resources = fn.getResourceUris();
+ if (resources == null || resources.isEmpty()) {
+ LOG.warn("Missing resources for " + fqfn);
+ continue;
+ }
+ for (ResourceUri resource : resources) {
+ srcUris.add(ResourceDownloader.createURI(resource.getUri()));
+ }
+ }
+ for (URI srcUri : srcUris) {
+ List<URI> localUris = resourceDownloader.downloadExternal(srcUri, null,
false);
+ for(URI dst : localUris) {
+ LOG.warn("Downloaded " + dst + " from " + srcUri);
+ }
+ }
+ return udfs.keySet();
+ }
+}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskDownloadTezJars.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskDownloadTezJars.java
new file mode 100644
index 0000000..29b05a6
--- /dev/null
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/AsyncTaskDownloadTezJars.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.CompressionUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Download tez related jars for the tarball. */
+class AsyncTaskDownloadTezJars implements Callable<Void> {
+ private static final Logger LOG =
LoggerFactory.getLogger(AsyncTaskDownloadTezJars.class.getName());
+
+ private final HiveConf conf;
+ private final FileSystem fs;
+ private final FileSystem rawFs;
+ private final Path libDir;
+ private final Path tezDir;
+
+ AsyncTaskDownloadTezJars(HiveConf conf, FileSystem fs, FileSystem rawFs,
Path libDir, Path tezDir) {
+ this.conf = conf;
+ this.fs = fs;
+ this.rawFs = rawFs;
+ this.libDir = libDir;
+ this.tezDir = tezDir;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ synchronized (fs) {
+ String tezLibs = conf.get(TezConfiguration.TEZ_LIB_URIS);
+ if (tezLibs == null) {
+ LOG.warn("Missing tez.lib.uris in tez-site.xml");
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Copying tez libs from " + tezLibs);
+ }
+ rawFs.mkdirs(tezDir);
+ fs.copyToLocalFile(new Path(tezLibs), new Path(libDir, "tez.tar.gz"));
+ CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(),
tezDir.toString(), true);
+ rawFs.delete(new Path(libDir, "tez.tar.gz"), false);
+ }
+ return null;
+ }
+}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapConfigJsonCreator.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapConfigJsonCreator.java
new file mode 100644
index 0000000..8e9b939
--- /dev/null
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapConfigJsonCreator.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+/**
+ * Creates the config json for llap start.
+ */
+class LlapConfigJsonCreator {
+ // This is not a config that users set in hive-site. It's only use is to
share information
+ // between the java component of the service driver and the python component.
+ private static final String CONFIG_CLUSTER_NAME =
"private.hive.llap.servicedriver.cluster.name";
+
+ private final HiveConf conf;
+ private final FileSystem fs;
+ private final Path tmpDir;
+
+ private final long cache;
+ private final long xmx;
+ private final String javaHome;
+
+ LlapConfigJsonCreator(HiveConf conf, FileSystem fs, Path tmpDir, long cache,
long xmx, String javaHome) {
+ this.conf = conf;
+ this.fs = fs;
+ this.tmpDir = tmpDir;
+ this.cache = cache;
+ this.xmx = xmx;
+ this.javaHome = javaHome;
+ }
+
+ void createLlapConfigJson() throws Exception {
+ JSONObject configs = createConfigJson();
+ writeConfigJson(configs);
+ }
+
+ private JSONObject createConfigJson() throws JSONException {
+ // extract configs for processing by the python fragments in YARN Service
+ JSONObject configs = new JSONObject();
+
+ configs.put("java.home", javaHome);
+
+ configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
+ conf.getLongVar(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB));
+
+ configs.put(ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
conf.getSizeVar(ConfVars.LLAP_IO_MEMORY_MAX_SIZE));
+
+ configs.put(ConfVars.LLAP_ALLOCATOR_DIRECT.varname,
conf.getBoolVar(HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT));
+
+ configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
+ conf.getIntVar(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
+
+ configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname,
+ conf.getIntVar(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE));
+
+ configs.put(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname,
conf.getIntVar(ConfVars.LLAP_DAEMON_NUM_EXECUTORS));
+
+ // Let YARN pick the queue name, if it isn't provided in hive-site, or via
the command-line
+ if (conf.getVar(ConfVars.LLAP_DAEMON_QUEUE_NAME) != null) {
+ configs.put(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname,
conf.getVar(ConfVars.LLAP_DAEMON_QUEUE_NAME));
+ }
+
+ // Propagate the cluster name to the script.
+ String clusterHosts = conf.getVar(ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
+ if (!StringUtils.isEmpty(clusterHosts) && clusterHosts.startsWith("@") &&
clusterHosts.length() > 1) {
+ configs.put(CONFIG_CLUSTER_NAME, clusterHosts.substring(1));
+ }
+
+ configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1));
+
+ configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+ conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
-1));
+
+ long maxDirect = (xmx > 0 && cache > 0 && xmx < cache * 1.25) ? (long)
(cache * 1.25) : -1;
+ configs.put("max_direct_memory", Long.toString(maxDirect));
+
+ return configs;
+ }
+
+ private void writeConfigJson(JSONObject configs) throws Exception {
+ try (FSDataOutputStream fsdos = fs.create(new Path(tmpDir, "config.json"));
+ OutputStreamWriter w = new OutputStreamWriter(fsdos,
Charset.defaultCharset())) {
+ configs.write(w);
+ }
+ }
+}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceCommandLine.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceCommandLine.java
new file mode 100644
index 0000000..5323102
--- /dev/null
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceCommandLine.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+
+import jline.TerminalFactory;
+
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.log.LogHelpers;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
+
+@SuppressWarnings("static-access")
+class LlapServiceCommandLine {
+ private static final Logger LOG =
LoggerFactory.getLogger(LlapServiceCommandLine.class.getName());
+
+ private static final Option DIRECTORY = OptionBuilder
+ .withLongOpt("directory")
+ .withDescription("Temp directory for jars etc.")
+ .withArgName("directory")
+ .hasArg()
+ .create('d');
+
+ private static final Option NAME = OptionBuilder
+ .withLongOpt("name")
+ .withDescription("Cluster name for YARN registry")
+ .withArgName("name")
+ .hasArg()
+ .create('n');
+
+ private static final Option EXECUTORS = OptionBuilder
+ .withLongOpt("executors")
+ .withDescription("executor per instance")
+ .withArgName("executors")
+ .hasArg()
+ .create('e');
+
+ private static final Option IO_THREADS = OptionBuilder
+ .withLongOpt("iothreads")
+ .withDescription("iothreads per instance")
+ .withArgName("iothreads")
+ .hasArg()
+ .create('t');
+
+ private static final Option CACHE = OptionBuilder
+ .withLongOpt("cache")
+ .withDescription("cache size per instance")
+ .withArgName("cache")
+ .hasArg()
+ .create('c');
+
+ private static final Option SIZE = OptionBuilder
+ .withLongOpt("size")
+ .withDescription("cache size per instance")
+ .withArgName("size")
+ .hasArg()
+ .create('s');
+
+ private static final Option XMX = OptionBuilder
+ .withLongOpt("xmx")
+ .withDescription("working memory size")
+ .withArgName("xmx")
+ .hasArg()
+ .create('w');
+
+ private static final Option AUXJARS = OptionBuilder
+ .withLongOpt("auxjars")
+ .withDescription("additional jars to package (by default, JSON SerDe jar
is packaged if available)")
+ .withArgName("auxjars")
+ .hasArg()
+ .create('j');
+
+ private static final Option AUXHBASE = OptionBuilder
+ .withLongOpt("auxhbase")
+ .withDescription("whether to package the HBase jars (true by default)")
+ .withArgName("auxhbase")
+ .hasArg()
+ .create('h');
+
+ private static final Option HIVECONF = OptionBuilder
+ .withLongOpt("hiveconf")
+ .withDescription("Use value for given property. Overridden by explicit
parameters")
+ .withArgName("property=value")
+ .hasArgs(2)
+ .withValueSeparator()
+ .create();
+
+ private static final Option JAVAHOME = OptionBuilder
+ .withLongOpt("javaHome")
+ .withDescription("Path to the JRE/JDK. This should be installed at the
same location on all cluster nodes " +
+ "($JAVA_HOME, java.home by default)")
+ .withArgName("javaHome")
+ .hasArg()
+ .create();
+
+ private static final Option QUEUE = OptionBuilder
+ .withLongOpt("queue")
+ .withDescription("The queue within which LLAP will be started")
+ .withArgName("queue")
+ .hasArg()
+ .create('q');
+
+ private static final Set<String> VALID_LOGGERS =
ImmutableSet.of(LogHelpers.LLAP_LOGGER_NAME_RFA.toLowerCase(),
+ LogHelpers.LLAP_LOGGER_NAME_QUERY_ROUTING.toLowerCase(),
LogHelpers.LLAP_LOGGER_NAME_CONSOLE.toLowerCase());
+
+ private static final Option LOGGER = OptionBuilder
+ .withLongOpt("logger")
+ .withDescription("logger for llap instance ([" + VALID_LOGGERS + "]")
+ .withArgName("logger")
+ .hasArg()
+ .create();
+
+ private static final Option START = OptionBuilder
+ .withLongOpt("startImmediately")
+ .withDescription("immediately start the cluster")
+ .withArgName("startImmediately")
+ .hasArg(false)
+ .create('z');
+
+ private static final Option OUTPUT = OptionBuilder
+ .withLongOpt("output")
+ .withDescription("Output directory for the generated scripts")
+ .withArgName("output")
+ .hasArg()
+ .create();
+
+ private static final Option AUXHIVE = OptionBuilder
+ .withLongOpt("auxhive")
+ .withDescription("whether to package the Hive aux jars (true by
default)")
+ .withArgName("auxhive")
+ .hasArg()
+ .create("auxhive");
+
+ private static final Option HELP = OptionBuilder
+ .withLongOpt("help")
+ .withDescription("Print help information")
+ .withArgName("help")
+ .hasArg(false)
+ .create('H');
+
+ // Options for the python script that are here because our option parser
cannot ignore the unknown ones
+ private static final String OPTION_INSTANCES = "instances";
+ private static final String OPTION_ARGS = "args";
+ private static final String OPTION_LOGLEVEL = "loglevel";
+ private static final String OPTION_SERVICE_KEYTAB_DIR = "service-keytab-dir";
+ private static final String OPTION_SERVICE_KEYTAB = "service-keytab";
+ private static final String OPTION_SERVICE_PRINCIPAL = "service-principal";
+ private static final String OPTION_SERVICE_PLACEMENT = "service-placement";
+ private static final String OPTION_SERVICE_DEFAULT_KEYTAB =
"service-default-keytab";
+ private static final String OPTION_HEALTH_PERCENT = "health-percent";
+ private static final String OPTION_HEALTH_TIME_WINDOW_SECS =
"health-time-window-secs";
+ private static final String OPTION_HEALTH_INIT_DELAY_SECS =
"health-init-delay-secs";
+ private static final String OPTION_SERVICE_AM_CONTAINER_MB =
"service-am-container-mb";
+ private static final String OPTION_SERVICE_APPCONFIG_GLOBAL =
"service-appconfig-global";
+
+ private static final Options OPTIONS = new Options();
+ static {
+ OPTIONS.addOption(DIRECTORY);
+ OPTIONS.addOption(NAME);
+ OPTIONS.addOption(EXECUTORS);
+ OPTIONS.addOption(IO_THREADS);
+ OPTIONS.addOption(CACHE);
+ OPTIONS.addOption(SIZE);
+ OPTIONS.addOption(XMX);
+ OPTIONS.addOption(AUXJARS);
+ OPTIONS.addOption(AUXHBASE);
+ OPTIONS.addOption(HIVECONF);
+ OPTIONS.addOption(JAVAHOME);
+ OPTIONS.addOption(QUEUE);
+ OPTIONS.addOption(LOGGER);
+ OPTIONS.addOption(START);
+ OPTIONS.addOption(OUTPUT);
+ OPTIONS.addOption(AUXHIVE);
+ OPTIONS.addOption(HELP);
+
+ OPTIONS.addOption(OptionBuilder
+ .withLongOpt(OPTION_INSTANCES)
+ .withDescription("Specify the number of instances to run this on")
+ .withArgName(OPTION_INSTANCES)
+ .hasArg()
+ .create('i'));
+
+ OPTIONS.addOption(OptionBuilder
+ .withLongOpt(OPTION_ARGS)
+ .withDescription("java arguments to the llap instance")
+ .withArgName(OPTION_ARGS)
+ .hasArg()
+ .create('a'));
+
+ OPTIONS.addOption(OptionBuilder
+ .withLongOpt(OPTION_LOGLEVEL)
+ .withDescription("log levels for the llap instance")
+ .withArgName(OPTION_LOGLEVEL)
+ .hasArg()
+ .create('l'));
+
+ OPTIONS.addOption(OptionBuilder
+ .withLongOpt(OPTION_SERVICE_KEYTAB_DIR)
+ .withDescription("Service AM keytab directory on HDFS (where the
headless user keytab is stored by Service " +
+ "keytab installation, e.g. .yarn/keytabs/llap)")
+ .withArgName(OPTION_SERVICE_KEYTAB_DIR)
+ .hasArg()
+ .create());
+
+ OPTIONS.addOption(OptionBuilder
+ .withLongOpt(OPTION_SERVICE_KEYTAB)
+ .withDescription("Service AM keytab file name inside " +
OPTION_SERVICE_KEYTAB_DIR)
+ .withArgName(OPTION_SERVICE_KEYTAB)
+ .hasArg()
+ .create());
+
+ OPTIONS.addOption(OptionBuilder
+ .withLongOpt(OPTION_SERVICE_PRINCIPAL)
+ .withDescription("Service AM principal; should be the user running the
cluster, e.g. [email protected]")
+ .withArgName(OPTION_SERVICE_PRINCIPAL)
+ .hasArg()
+ .create());
+
+ OPTIONS.addOption(OptionBuilder
+ .withLongOpt(OPTION_SERVICE_PLACEMENT)
+ .withDescription("Service placement policy; see YARN documentation at
" +
+ "https://issues.apache.org/jira/browse/YARN-1042. This is
unnecessary if LLAP is going to take more than " +
+ "half of the YARN capacity of a node.")
+ .withArgName(OPTION_SERVICE_PLACEMENT)
+ .hasArg()
+ .create());
+
+ OPTIONS.addOption(OptionBuilder
+ .withLongOpt(OPTION_SERVICE_DEFAULT_KEYTAB)
+ .withDescription("try to set default settings for Service AM keytab;
mostly for dev testing")
+ .withArgName(OPTION_SERVICE_DEFAULT_KEYTAB)
+ .hasArg(false)
+ .create());
+
+ OPTIONS.addOption(OptionBuilder
+ .withLongOpt(OPTION_HEALTH_PERCENT)
+ .withDescription("Percentage of running containers after which LLAP
application is considered healthy" +
+ " (Default: 80)")
+ .withArgName(OPTION_HEALTH_PERCENT)
+ .hasArg()
+ .create());
+
+ OPTIONS.addOption(OptionBuilder
+ .withLongOpt(OPTION_HEALTH_TIME_WINDOW_SECS)
+ .withDescription("Time window in seconds (after initial delay) for
which LLAP application is allowed to be " +
+ "in unhealthy state before being killed (Default: 300)")
+ .withArgName(OPTION_HEALTH_TIME_WINDOW_SECS)
+ .hasArg()
+ .create());
+
+ OPTIONS.addOption(OptionBuilder
+ .withLongOpt(OPTION_HEALTH_INIT_DELAY_SECS)
+ .withDescription("Delay in seconds after which health percentage is
monitored (Default: 400)")
+ .withArgName(OPTION_HEALTH_INIT_DELAY_SECS)
+ .hasArg()
+ .create());
+
+ OPTIONS.addOption(OptionBuilder
+ .withLongOpt(OPTION_SERVICE_AM_CONTAINER_MB)
+ .withDescription("The size of the service AppMaster container in MB")
+ .withArgName("b")
+ .hasArg()
+ .create('b'));
+
+ OPTIONS.addOption(OptionBuilder
+ .withLongOpt(OPTION_SERVICE_APPCONFIG_GLOBAL)
+ .withDescription("Property (key=value) to be set in the global section
of the Service appConfig")
+ .withArgName("property=value")
+ .hasArgs(2)
+ .withValueSeparator()
+ .create());
+ }
+
+ private String[] args;
+
+ private String directory;
+ private String name;
+ private int executors;
+ private int ioThreads;
+ private long cache;
+ private long size;
+ private long xmx;
+ private String jars;
+ private boolean isHbase;
+ private Properties conf = new Properties();
+ private String javaPath = null;
+ private String llapQueueName;
+ private String logger = null;
+ private boolean isStarting;
+ private String output;
+ private boolean isHiveAux;
+ private boolean isHelp;
+
+ static LlapServiceCommandLine parseArguments(String[] args) {
+ LlapServiceCommandLine cl = null;
+ try {
+ cl = new LlapServiceCommandLine(args);
+ } catch (Exception e) {
+ LOG.error("Parsing the command line arguments failed", e);
+ printUsage();
+ System.exit(1);
+ }
+
+ if (cl.isHelp) {
+ printUsage();
+ System.exit(0);
+ }
+
+ return cl;
+ }
+
+ LlapServiceCommandLine(String[] args) throws ParseException {
+ LOG.info("LLAP invoked with arguments = {}", Arrays.toString(args));
+ this.args = args;
+ parseCommandLine(args);
+ }
+
+ private void parseCommandLine(String[] args) throws ParseException {
+ CommandLine cl = new GnuParser().parse(OPTIONS, args);
+ if (cl.hasOption(HELP.getOpt())) {
+ isHelp = true;
+ return;
+ }
+
+ if (!cl.hasOption(OPTION_INSTANCES)) {
+ printUsage();
+ throw new ParseException("instance must be set");
+ }
+
+ int instances = Integer.parseInt(cl.getOptionValue(OPTION_INSTANCES));
+ if (instances <= 0) {
+ throw new ParseException("Invalid configuration: " + instances + "
(should be greater than 0)");
+ }
+
+ directory = cl.getOptionValue(DIRECTORY.getOpt());
+ name = cl.getOptionValue(NAME.getOpt());
+ executors = Integer.parseInt(cl.getOptionValue(EXECUTORS.getOpt(), "-1"));
+ ioThreads = Integer.parseInt(cl.getOptionValue(IO_THREADS.getOpt(),
Integer.toString(executors)));
+ cache =
TraditionalBinaryPrefix.string2long(cl.getOptionValue(CACHE.getOpt(), "-1"));
+ size =
TraditionalBinaryPrefix.string2long(cl.getOptionValue(SIZE.getOpt(), "-1"));
+ xmx = TraditionalBinaryPrefix.string2long(cl.getOptionValue(XMX.getOpt(),
"-1"));
+ jars = cl.getOptionValue(AUXJARS.getOpt());
+ isHbase = Boolean.parseBoolean(cl.getOptionValue(AUXHBASE.getOpt(),
"true"));
+ if (cl.hasOption(HIVECONF.getLongOpt())) {
+ conf = cl.getOptionProperties(HIVECONF.getLongOpt());
+ }
+ if (cl.hasOption(JAVAHOME.getLongOpt())) {
+ javaPath = cl.getOptionValue(JAVAHOME.getLongOpt());
+ }
+ llapQueueName = cl.getOptionValue(QUEUE.getOpt(),
ConfVars.LLAP_DAEMON_QUEUE_NAME.getDefaultValue());
+ if (cl.hasOption(LOGGER.getLongOpt())) {
+ logger = cl.getOptionValue(LOGGER.getLongOpt());
+
Preconditions.checkArgument(VALID_LOGGERS.contains(logger.toLowerCase()));
+ }
+ isStarting = cl.hasOption(START.getOpt());
+ output = cl.getOptionValue(OUTPUT.getLongOpt());
+ isHiveAux = Boolean.parseBoolean(cl.getOptionValue(AUXHIVE.getOpt(),
"true"));
+ }
+
+ private static void printUsage() {
+ HelpFormatter hf = new HelpFormatter();
+ try {
+ int width = hf.getWidth();
+ int jlineWidth = TerminalFactory.get().getWidth();
+ width = Math.min(160, Math.max(jlineWidth, width));
+ hf.setWidth(width);
+ } catch (Throwable t) { // Ignore
+ }
+
+ hf.printHelp("llap", OPTIONS);
+ }
+
+ String[] getArgs() {
+ return args;
+ }
+
+ String getDirectory() {
+ return directory;
+ }
+
+ String getName() {
+ return name;
+ }
+
+ int getExecutors() {
+ return executors;
+ }
+
+ int getIoThreads() {
+ return ioThreads;
+ }
+
+ long getCache() {
+ return cache;
+ }
+
+ long getSize() {
+ return size;
+ }
+
+ long getXmx() {
+ return xmx;
+ }
+
+ String getAuxJars() {
+ return jars;
+ }
+
+ boolean getIsHBase() {
+ return isHbase;
+ }
+
+ boolean getIsHiveAux() {
+ return isHiveAux;
+ }
+
+ Properties getConfig() {
+ return conf;
+ }
+
+ String getJavaPath() {
+ return javaPath;
+ }
+
+ String getLlapQueueName() {
+ return llapQueueName;
+ }
+
+ String getLogger() {
+ return logger;
+ }
+
+ boolean isStarting() {
+ return isStarting;
+ }
+
+ String getOutput() {
+ return output;
+ }
+}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceDriver.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceDriver.java
new file mode 100644
index 0000000..bbc7265
--- /dev/null
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapServiceDriver.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.llap.LlapUtil;
+import org.apache.hadoop.hive.llap.cli.LlapSliderUtils;
+import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
+import org.apache.hadoop.hive.llap.daemon.impl.LlapConstants;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.service.client.ServiceClient;
+import org.apache.hadoop.yarn.service.utils.CoreFileSystem;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/** Starts the llap daemon. */
+public class LlapServiceDriver {
+ private static final Logger LOG =
LoggerFactory.getLogger(LlapServiceDriver.class.getName());
+
+ private static final String LLAP_PACKAGE_DIR = ".yarn/package/LLAP/";
+ private static final String OUTPUT_DIR_PREFIX = "llap-yarn-";
+
+ /**
+ * This is a working configuration for the instance to merge various
variables.
+ * It is not written out for llap server usage
+ */
+ private final HiveConf conf;
+ private final LlapServiceCommandLine cl;
+
+ public LlapServiceDriver(LlapServiceCommandLine cl) throws Exception {
+ this.cl = cl;
+
+ SessionState ss = SessionState.get();
+ this.conf = (ss != null) ? ss.getConf() : new HiveConf(SessionState.class);
+ if (conf == null) {
+ throw new Exception("Cannot load any configuration to run command");
+ }
+ }
+
+ private int run() throws Exception {
+ Properties propsDirectOptions = new Properties();
+
+ // Working directory.
+ Path tmpDir = new Path(cl.getDirectory());
+
+ long t0 = System.nanoTime();
+
+ FileSystem fs = FileSystem.get(conf);
+ FileSystem rawFs = FileSystem.getLocal(conf).getRawFileSystem();
+
+ int threadCount = Math.max(1, Runtime.getRuntime().availableProcessors() /
2);
+ ExecutorService executor = Executors.newFixedThreadPool(threadCount,
+ new ThreadFactoryBuilder().setNameFormat("llap-pkg-%d").build());
+
+ int rc = 0;
+ try {
+
+ setupConf(propsDirectOptions);
+
+ URL logger = conf.getResource(LlapConstants.LOG4j2_PROPERTIES_FILE);
+ if (logger == null) {
+ throw new Exception("Unable to find required config file:
llap-daemon-log4j2.properties");
+ }
+
+ Path home = new Path(System.getenv("HIVE_HOME"));
+ Path scriptParent = new Path(new Path(home, "scripts"), "llap");
+ Path scripts = new Path(scriptParent, "bin");
+
+ if (!rawFs.exists(home)) {
+ throw new Exception("Unable to find HIVE_HOME:" + home);
+ } else if (!rawFs.exists(scripts)) {
+ LOG.warn("Unable to find llap scripts:" + scripts);
+ }
+
+ String javaHome = getJavaHome();
+
+ LlapTarComponentGatherer tarComponentGatherer = new
LlapTarComponentGatherer(cl, conf, propsDirectOptions,
+ fs, rawFs, executor, tmpDir);
+ tarComponentGatherer.createDirs();
+ tarComponentGatherer.submitTarComponentGatherTasks();
+
+ // TODO: need to move from Python to Java for the rest of the script.
+ LlapConfigJsonCreator lcjCreator = new LlapConfigJsonCreator(conf,
rawFs, tmpDir, cl.getCache(), cl.getXmx(),
+ javaHome);
+ lcjCreator.createLlapConfigJson();
+
+ LOG.debug("Config Json generation took " + (System.nanoTime() - t0) + "
ns");
+
+ tarComponentGatherer.waitForFinish();
+
+ if (cl.isStarting()) {
+ rc = startLlap(tmpDir, scriptParent);
+ } else {
+ rc = 0;
+ }
+ } finally {
+ executor.shutdown();
+ rawFs.close();
+ fs.close();
+ }
+
+ if (rc == 0) {
+ LOG.debug("Exiting successfully");
+ } else {
+ LOG.info("Exiting with rc = " + rc);
+ }
+ return rc;
+ }
+
+ private void setupConf(Properties propsDirectOptions) throws Exception {
+ // needed so that the file is actually loaded into configuration.
+ for (String f : LlapDaemonConfiguration.DAEMON_CONFIGS) {
+ conf.addResource(f);
+ if (conf.getResource(f) == null) {
+ throw new Exception("Unable to find required config file: " + f);
+ }
+ }
+ for (String f : LlapDaemonConfiguration.SSL_DAEMON_CONFIGS) {
+ conf.addResource(f);
+ }
+
+ conf.reloadConfiguration();
+
+ populateConfWithLlapProperties(conf, cl.getConfig());
+
+ if (cl.getName() != null) {
+ // update service registry configs - caveat: this has nothing to do with
the actual settings as read by the AM
+ // if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to
dynamically switch between instances
+ conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + cl.getName());
+
propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@"
+ cl.getName());
+ }
+
+ if (cl.getLogger() != null) {
+ HiveConf.setVar(conf, ConfVars.LLAP_DAEMON_LOGGER, cl.getLogger());
+ propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_LOGGER.varname,
cl.getLogger());
+ }
+
+ boolean isDirect = HiveConf.getBoolVar(conf,
HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT);
+
+ String cacheStr = LlapUtil.humanReadableByteCount(cl.getCache());
+ String sizeStr = LlapUtil.humanReadableByteCount(cl.getSize());
+ String xmxStr = LlapUtil.humanReadableByteCount(cl.getXmx());
+
+ if (cl.getSize() != -1) {
+ if (cl.getCache() != -1) {
+ if (!HiveConf.getBoolVar(conf,
HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) {
+ // direct heap allocations need to be safer
+ Preconditions.checkArgument(cl.getCache() < cl.getSize(), "Cache
size (" + cacheStr + ") has to be smaller" +
+ " than the container sizing (" + sizeStr + ")");
+ } else if (cl.getCache() < cl.getSize()) {
+ LOG.warn("Note that this might need YARN physical memory monitoring
to be turned off "
+ + "(yarn.nodemanager.pmem-check-enabled=false)");
+ }
+ }
+ if (cl.getXmx() != -1) {
+ Preconditions.checkArgument(cl.getXmx() < cl.getSize(), "Working
memory (Xmx=" + xmxStr + ") has to be" +
+ " smaller than the container sizing (" + sizeStr + ")");
+ }
+ if (isDirect && !HiveConf.getBoolVar(conf,
HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED)) {
+ // direct and not memory mapped
+ Preconditions.checkArgument(cl.getXmx() + cl.getCache() <=
cl.getSize(), "Working memory (Xmx=" +
+ xmxStr + ") + cache size (" + cacheStr + ") has to be smaller than
the container sizing (" + sizeStr + ")");
+ }
+ }
+
+ if (cl.getExecutors() != -1) {
+ conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname,
cl.getExecutors());
+
propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname,
String.valueOf(cl.getExecutors()));
+ // TODO: vcpu settings - possibly when DRFA works right
+ }
+
+ if (cl.getIoThreads() != -1) {
+ conf.setLong(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname,
cl.getIoThreads());
+ propsDirectOptions.setProperty(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname,
String.valueOf(cl.getIoThreads()));
+ }
+
+ long cache = cl.getCache();
+ if (cache != -1) {
+ conf.set(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
Long.toString(cache));
+
propsDirectOptions.setProperty(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
Long.toString(cache));
+ }
+
+ long xmx = cl.getXmx();
+ if (xmx != -1) {
+ // Needs more explanation here
+ // Xmx is not the max heap value in JDK8. You need to subtract 50% of
the survivor fraction
+ // from this, to get actual usable memory before it goes into GC
+ long xmxMb = (xmx / (1024L * 1024L));
+ conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, xmxMb);
+
propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
String.valueOf(xmxMb));
+ }
+
+ long containerSize = cl.getSize();
+ if (containerSize == -1) {
+ long heapSize = xmx;
+ if (!isDirect) {
+ heapSize += cache;
+ }
+ containerSize = Math.min((long)(heapSize * 1.2), heapSize + 1024L * 1024
* 1024);
+ if (isDirect) {
+ containerSize += cache;
+ }
+ }
+ long containerSizeMB = containerSize / (1024 * 1024);
+ long minAllocMB =
conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1);
+ String containerSizeStr = LlapUtil.humanReadableByteCount(containerSize);
+ Preconditions.checkArgument(containerSizeMB >= minAllocMB, "Container size
(" + containerSizeStr + ") should be " +
+ "greater than minimum allocation(" +
LlapUtil.humanReadableByteCount(minAllocMB * 1024L * 1024L) + ")");
+ conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
containerSizeMB);
+
propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
String.valueOf(containerSizeMB));
+
+ LOG.info("Memory settings: container memory: {} executor memory: {} cache
memory: {}", containerSizeStr, xmxStr,
+ cacheStr);
+
+ if (!StringUtils.isEmpty(cl.getLlapQueueName())) {
+ conf.set(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, cl.getLlapQueueName());
+ propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname,
cl.getLlapQueueName());
+ }
+ }
+
+ private String getJavaHome() {
+ String javaHome = cl.getJavaPath();
+ if (StringUtils.isEmpty(javaHome)) {
+ javaHome = System.getenv("JAVA_HOME");
+ String jreHome = System.getProperty("java.home");
+ if (javaHome == null) {
+ javaHome = jreHome;
+ } else if (!javaHome.equals(jreHome)) {
+ LOG.warn("Java versions might not match : JAVA_HOME=[{}],process
jre=[{}]", javaHome, jreHome);
+ }
+ }
+ if (StringUtils.isEmpty(javaHome)) {
+ throw new RuntimeException(
+ "Could not determine JAVA_HOME from command line parameters,
environment or system properties");
+ }
+ LOG.info("Using [{}] for JAVA_HOME", javaHome);
+ return javaHome;
+ }
+
+ private static void populateConfWithLlapProperties(Configuration conf,
Properties properties) {
+ for(Entry<Object, Object> props : properties.entrySet()) {
+ String key = (String) props.getKey();
+ if (HiveConf.getLlapDaemonConfVars().contains(key)) {
+ conf.set(key, (String) props.getValue());
+ } else {
+ if (key.startsWith(HiveConf.PREFIX_LLAP) ||
key.startsWith(HiveConf.PREFIX_HIVE_LLAP)) {
+ LOG.warn("Adding key [{}] even though it is not in the set of known
llap-server keys", key);
+ conf.set(key, (String) props.getValue());
+ } else {
+ LOG.warn("Ignoring unknown llap server parameter: [{}]", key);
+ }
+ }
+ }
+ }
+
+ private int startLlap(Path tmpDir, Path scriptParent) throws IOException,
InterruptedException {
+ int rc;
+ String version = System.getenv("HIVE_VERSION");
+ if (StringUtils.isEmpty(version)) {
+ version = DateTime.now().toString("ddMMMyyyy");
+ }
+
+ String outputDir = cl.getOutput();
+ Path packageDir = null;
+ if (outputDir == null) {
+ outputDir = OUTPUT_DIR_PREFIX + version;
+ packageDir = new Path(Paths.get(".").toAbsolutePath().toString(),
OUTPUT_DIR_PREFIX + version);
+ } else {
+ packageDir = new Path(outputDir);
+ }
+
+ rc = runPackagePy(tmpDir, scriptParent, version, outputDir);
+ if (rc == 0) {
+ String tarballName = "llap-" + version + ".tar.gz";
+ startCluster(conf, cl.getName(), tarballName, packageDir,
conf.getVar(ConfVars.LLAP_DAEMON_QUEUE_NAME));
+ }
+ return rc;
+ }
+
+ private int runPackagePy(Path tmpDir, Path scriptParent, String version,
String outputDir)
+ throws IOException, InterruptedException {
+ Path scriptPath = new Path(new Path(scriptParent, "yarn"), "package.py");
+ List<String> scriptArgs = new ArrayList<>(cl.getArgs().length + 7);
+ scriptArgs.addAll(Arrays.asList("python", scriptPath.toString(),
"--input", tmpDir.toString(), "--output",
+ outputDir, "--javaChild"));
+ scriptArgs.addAll(Arrays.asList(cl.getArgs()));
+
+ LOG.debug("Calling package.py via: " + scriptArgs);
+ ProcessBuilder builder = new ProcessBuilder(scriptArgs);
+ builder.redirectError(ProcessBuilder.Redirect.INHERIT);
+ builder.redirectOutput(ProcessBuilder.Redirect.INHERIT);
+ builder.environment().put("HIVE_VERSION", version);
+ return builder.start().waitFor();
+ }
+
+ private void startCluster(Configuration conf, String name, String
packageName, Path packageDir, String queue) {
+ LOG.info("Starting cluster with " + name + ", " + packageName + ", " +
queue + ", " + packageDir);
+ ServiceClient sc;
+ try {
+ sc = LlapSliderUtils.createServiceClient(conf);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ try {
+ try {
+ LOG.info("Executing the stop command");
+ sc.actionStop(name, true);
+ } catch (Exception ex) { // Ignore exceptions from stop
+ LOG.info(ex.getLocalizedMessage());
+ }
+ try {
+ LOG.info("Executing the destroy command");
+ sc.actionDestroy(name);
+ } catch (Exception ex) { // Ignore exceptions from destroy
+ LOG.info(ex.getLocalizedMessage());
+ }
+ LOG.info("Uploading the app tarball");
+ CoreFileSystem fs = new CoreFileSystem(conf);
+ fs.createWithPermissions(new Path(LLAP_PACKAGE_DIR),
FsPermission.getDirDefault());
+ fs.copyLocalFileToHdfs(new File(packageDir.toString(), packageName), new
Path(LLAP_PACKAGE_DIR),
+ new FsPermission("755"));
+
+ LOG.info("Executing the launch command");
+ File yarnfile = new File(new Path(packageDir, "Yarnfile").toString());
+ Long lifetime = null; // unlimited lifetime
+ sc.actionLaunch(yarnfile.getAbsolutePath(), name, lifetime, queue);
+ LOG.debug("Started the cluster via service API");
+ } catch (YarnException | IOException e) {
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ sc.close();
+ } catch (IOException e) {
+ LOG.info("Failed to close service client", e);
+ }
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ LlapServiceCommandLine cl = new LlapServiceCommandLine(args);
+ int ret = 0;
+ try {
+ ret = new LlapServiceDriver(cl).run();
+ } catch (Throwable t) {
+ System.err.println("Failed: " + t.getMessage());
+ t.printStackTrace();
+ ret = 3;
+ } finally {
+ LOG.info("LLAP service driver finished");
+ }
+ LOG.debug("Completed processing - exiting with " + ret);
+ System.exit(ret);
+ }
+}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapTarComponentGatherer.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapTarComponentGatherer.java
new file mode 100644
index 0000000..a83647b
--- /dev/null
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/LlapTarComponentGatherer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Gathers all the jar files necessary to start llap.
+ */
+class LlapTarComponentGatherer {
+ private static final Logger LOG =
LoggerFactory.getLogger(LlapTarComponentGatherer.class.getName());
+
+ //using Callable<Void> instead of Runnable to be able to throw Exception
+ private final Map<String, Future<Void>> tasks = new HashMap<>();
+
+ private final LlapServiceCommandLine cl;
+ private final HiveConf conf;
+ private final Properties directProperties;
+ private final FileSystem fs;
+ private final FileSystem rawFs;
+ private final ExecutorService executor;
+ private final Path libDir;
+ private final Path tezDir;
+ private final Path udfDir;
+ private final Path confDir;
+
+ LlapTarComponentGatherer(LlapServiceCommandLine cl, HiveConf conf,
Properties directProperties, FileSystem fs,
+ FileSystem rawFs, ExecutorService executor, Path tmpDir) {
+ this.cl = cl;
+ this.conf = conf;
+ this.directProperties = directProperties;
+ this.fs = fs;
+ this.rawFs = rawFs;
+ this.executor = executor;
+ this.libDir = new Path(tmpDir, "lib");
+ this.tezDir = new Path(libDir, "tez");
+ this.udfDir = new Path(libDir, "udfs");
+ this.confDir = new Path(tmpDir, "conf");
+ }
+
+ void createDirs() throws Exception {
+ if (!rawFs.mkdirs(tezDir)) {
+ LOG.warn("mkdirs for " + tezDir + " returned false");
+ }
+ if (!rawFs.mkdirs(udfDir)) {
+ LOG.warn("mkdirs for " + udfDir + " returned false");
+ }
+ if (!rawFs.mkdirs(confDir)) {
+ LOG.warn("mkdirs for " + confDir + " returned false");
+ }
+ }
+
+ void submitTarComponentGatherTasks() {
+ CompletionService<Void> asyncRunner = new
ExecutorCompletionService<Void>(executor);
+
+ tasks.put("downloadTezJars", asyncRunner.submit(new
AsyncTaskDownloadTezJars(conf, fs, rawFs, libDir, tezDir)));
+ tasks.put("copyLocalJars", asyncRunner.submit(new
AsyncTaskCopyLocalJars(rawFs, libDir)));
+ tasks.put("copyAuxJars", asyncRunner.submit(new AsyncTaskCopyAuxJars(cl,
conf, rawFs, libDir)));
+ tasks.put("createUdfFile", asyncRunner.submit(new
AsyncTaskCreateUdfFile(conf, fs, rawFs, udfDir, confDir)));
+ tasks.put("copyConfigs", asyncRunner.submit(new AsyncTaskCopyConfigs(cl,
conf, directProperties, rawFs,
+ confDir)));
+ }
+
+ void waitForFinish() throws Exception {
+ for (Map.Entry<String, Future<Void>> task : tasks.entrySet()) {
+ long t1 = System.nanoTime();
+ task.getValue().get();
+ long t2 = System.nanoTime();
+ LOG.debug(task.getKey() + " waited for " + (t2 - t1) + " ns");
+ }
+ }
+}
diff --git
a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/package-info.java
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/package-info.java
new file mode 100644
index 0000000..46aacf8
--- /dev/null
+++
b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/service/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package consisting the program LlapServiceDriver (and other classes used by
it) which is starting up the llap daemon.
+ */
+package org.apache.hadoop.hive.llap.cli.service;
+
diff --git
a/llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/TestLlapServiceCommandLine.java
b/llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/TestLlapServiceCommandLine.java
new file mode 100644
index 0000000..bb2a99b
--- /dev/null
+++
b/llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/TestLlapServiceCommandLine.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.cli.service;
+
+import static junit.framework.TestCase.assertEquals;
+
+import java.util.Properties;
+
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.configuration2.ConfigurationConverter;
+import org.apache.commons.configuration2.MapConfiguration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import com.google.common.collect.ImmutableMap;
+
+/** Tests for LlapServiceCommandLine. */
+public class TestLlapServiceCommandLine {
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ @Test
+ public void testArgumentParsingEmpty() throws Exception {
+ thrown.expect(ParseException.class);
+ thrown.expectMessage("instance must be set");
+
+ new LlapServiceCommandLine(new String[] {});
+ }
+
+ @Test
+ public void testArgumentParsingDefault() throws Exception {
+ LlapServiceCommandLine cl = new LlapServiceCommandLine(new String[]
{"--instances", "1"});
+ assertEquals(null, cl.getAuxJars());
+ assertEquals(-1, cl.getCache());
+ assertEquals(new Properties(), cl.getConfig());
+ assertEquals(null, cl.getDirectory());
+ assertEquals(-1, cl.getExecutors());
+ assertEquals(-1, cl.getIoThreads());
+ assertEquals(true, cl.getIsHBase());
+ assertEquals(true, cl.getIsHiveAux());
+ assertEquals(null, cl.getJavaPath());
+ assertEquals(null, cl.getLlapQueueName());
+ assertEquals(null, cl.getLogger());
+ assertEquals(null, cl.getName());
+ assertEquals(null, cl.getOutput());
+ assertEquals(-1, cl.getSize());
+ assertEquals(-1, cl.getXmx());
+ assertEquals(false, cl.isStarting());
+ }
+
+ @Test
+ public void testParsingArguments() throws Exception {
+ LlapServiceCommandLine cl = new LlapServiceCommandLine(new String[]
{"--instances", "2", "--auxjars", "auxjarsVal",
+ "--cache", "10k", "--hiveconf", "a=b", "--directory", "directoryVal",
"--executors", "4", "--iothreads", "5",
+ "--auxhbase", "false", "--auxhive", "false", "--javaHome",
"javaHomeVal", "--queue", "queueVal",
+ "--logger", "console", "--name", "nameVal", "--output", "outputVal",
"--size", "10m", "--xmx", "10g",
+ "--startImmediately"});
+ assertEquals("auxjarsVal", cl.getAuxJars());
+ assertEquals(10L * 1024, cl.getCache());
+ assertEquals(ConfigurationConverter.getProperties(new
MapConfiguration(ImmutableMap.of("a", "b"))), cl.getConfig());
+ assertEquals("directoryVal", cl.getDirectory());
+ assertEquals(4, cl.getExecutors());
+ assertEquals(5, cl.getIoThreads());
+ assertEquals(false, cl.getIsHBase());
+ assertEquals(false, cl.getIsHiveAux());
+ assertEquals("javaHomeVal", cl.getJavaPath());
+ assertEquals("queueVal", cl.getLlapQueueName());
+ assertEquals("console", cl.getLogger());
+ assertEquals("nameVal", cl.getName());
+ assertEquals("outputVal", cl.getOutput());
+ assertEquals(10L * 1024 * 1024, cl.getSize());
+ assertEquals(10L * 1024 * 1024 * 1024, cl.getXmx());
+ assertEquals(true, cl.isStarting());
+ }
+
+ @Test
+ public void testIllegalLogger() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ new LlapServiceCommandLine(new String[] {"--instances", "1", "--logger",
"someValue"});
+ }
+
+ @Test
+ public void testIllegalInstances() throws Exception {
+ thrown.expect(NumberFormatException.class);
+ new LlapServiceCommandLine(new String[] {"--instances", "a"});
+ }
+
+ @Test
+ public void testIllegalCache() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ new LlapServiceCommandLine(new String[] {"--instances", "1", "--cache",
"a"});
+ }
+
+ @Test
+ public void testIllegalExecutors() throws Exception {
+ thrown.expect(NumberFormatException.class);
+ new LlapServiceCommandLine(new String[] {"--instances", "1",
"--executors", "a"});
+ }
+
+ @Test
+ public void testIllegalIoThreads() throws Exception {
+ thrown.expect(NumberFormatException.class);
+ new LlapServiceCommandLine(new String[] {"--instances", "1",
"--iothreads", "a"});
+ }
+
+ @Test
+ public void testIllegalSize() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ new LlapServiceCommandLine(new String[] {"--instances", "1", "--size",
"a"});
+ }
+
+ @Test
+ public void testIllegalXmx() throws Exception {
+ thrown.expect(IllegalArgumentException.class);
+ new LlapServiceCommandLine(new String[] {"--instances", "1", "--xmx",
"a"});
+ }
+}
diff --git
a/llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/package-info.java
b/llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/package-info.java
new file mode 100644
index 0000000..e8746d2
--- /dev/null
+++
b/llap-server/src/test/org/apache/hadoop/hive/llap/cli/service/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package consisting the tests for the program LlapServiceDriver and other
classes used by it.
+ */
+package org.apache.hadoop.hive.llap.cli.service;
+