Repository: hive Updated Branches: refs/heads/branch-2.0 e86abb9e6 -> 7feb1ca2e refs/heads/master 872260107 -> 0d379021e
HIVE-12853 : LLAP: localize permanent UDF jars to daemon and add them to classloader (Sergey Shelukhin, reviewed by Jason Dere) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8dd1d196 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8dd1d196 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8dd1d196 Branch: refs/heads/master Commit: 8dd1d1966f2f0b86604b4e991ebc865224f42b41 Parents: 8722601 Author: Sergey Shelukhin <[email protected]> Authored: Tue Jan 19 11:49:00 2016 -0800 Committer: Sergey Shelukhin <[email protected]> Committed: Tue Jan 19 11:49:00 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 + .../llap/daemon/impl/ContainerRunnerImpl.java | 6 +- .../llap/daemon/impl/FunctionLocalizer.java | 288 +++++++++++++++++++ .../hive/llap/daemon/impl/LlapDaemon.java | 36 ++- .../llap/daemon/impl/TaskExecutorService.java | 92 +++--- .../daemon/impl/TestTaskExecutorService.java | 3 +- .../hadoop/hive/ql/exec/FunctionTask.java | 7 +- .../apache/hadoop/hive/ql/metadata/Hive.java | 16 +- .../hive/ql/optimizer/physical/LlapDecider.java | 2 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 5 +- .../hive/ql/session/DependencyResolver.java | 183 ------------ .../hadoop/hive/ql/session/SessionState.java | 109 ++----- .../hadoop/hive/ql/util/DependencyResolver.java | 183 ++++++++++++ .../apache/hadoop/hive/ql/util/DosToUnix.java | 3 +- .../hadoop/hive/ql/util/ResourceDownloader.java | 136 +++++++++ .../hadoop/hive/ql/session/TestAddResource.java | 18 +- 16 files changed, 748 insertions(+), 341 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8dd1d196/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 2c25cae..d61ae39 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2483,6 +2483,8 @@ public class HiveConf extends Configuration { LLAP_DAEMON_COMMUNICATOR_NUM_THREADS("hive.llap.daemon.communicator.num.threads", 10, "Number of threads to use in LLAP task communicator in Tez AM.", "llap.daemon.communicator.num.threads"), + LLAP_DAEMON_ALLOW_PERMANENT_FNS("hive.llap.daemon.allow.permanent.fns", true, + "Whether LLAP daemon should localize the resources for permanent UDFs."), LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS( "hive.llap.task.scheduler.node.reenable.min.timeout.ms", "200ms", new TimeValidator(TimeUnit.MILLISECONDS), http://git-wip-us.apache.org/repos/asf/hive/blob/8dd1d196/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 0d85671..5353d4a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -90,7 +90,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu boolean enablePreemption, String[] localDirsBase, AtomicReference<Integer> localShufflePort, AtomicReference<InetSocketAddress> localAddress, long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics, - AMReporter amReporter) { + AMReporter amReporter, ClassLoader classLoader) { super("ContainerRunnerImpl"); this.conf = conf; Preconditions.checkState(numExecutors > 0, @@ -103,8 +103,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu addIfService(queryTracker); String waitQueueSchedulerClassName = HiveConf.getVar( conf, ConfVars.LLAP_DAEMON_WAIT_QUEUE_COMPARATOR_CLASS_NAME); - this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, waitQueueSchedulerClassName, - enablePreemption); + this.executorService = new TaskExecutorService(numExecutors, waitQueueSize, + waitQueueSchedulerClassName, enablePreemption, classLoader); addIfService(executorService); http://git-wip-us.apache.org/repos/asf/hive/blob/8dd1d196/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/FunctionLocalizer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/FunctionLocalizer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/FunctionLocalizer.java new file mode 100644 index 0000000..bc0ad02 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/FunctionLocalizer.java @@ -0,0 +1,288 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon.impl; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URLClassLoader; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.metastore.api.Function; +import org.apache.hadoop.hive.metastore.api.ResourceUri; +import org.apache.hadoop.hive.ql.exec.FunctionTask; +import org.apache.hadoop.hive.ql.exec.Utilities; +import org.apache.hadoop.hive.ql.exec.FunctionInfo.FunctionResource; +import org.apache.hadoop.hive.ql.metadata.Hive; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.session.SessionState.ResourceType; +import org.apache.hadoop.hive.ql.util.ResourceDownloader; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This class localizes and manages jars for the functions allowed inside LLAP. + */ +public class FunctionLocalizer { + private static final String DIR_NAME = "fnresources"; + private static final Logger LOG = LoggerFactory.getLogger(FunctionLocalizer.class); + private ResourceDownloader resourceDownloader; + private final LinkedBlockingQueue<LocalizerWork> workQueue = new LinkedBlockingQueue<>(); + private volatile boolean isClosed = false; + private final List<String> recentlyLocalizedJars = new LinkedList<String>(); + private final Thread workThread; + private final File localDir; + private final Configuration conf; + private final URLClassLoader executorClassloader; + + private final ConcurrentHashMap<String, FnResources> resourcesByFn = new ConcurrentHashMap<>(); + private final ConcurrentHashMap<URI, RefCountedResource> localFiles = new ConcurrentHashMap<>(); + + public FunctionLocalizer(Configuration conf, String localDir) { + this.conf = conf; + this.localDir = new File(localDir, DIR_NAME); + this.executorClassloader = (URLClassLoader)Utilities.createUDFClassLoader( + (URLClassLoader)Thread.currentThread().getContextClassLoader(), new String[]{}); + this.workThread = new Thread(new Runnable() { + @Override + public void run() { + runWorkThread(); + } + }); + } + + void init() throws IOException { + if (localDir.exists()) { + // TODO: We don't want some random jars of unknown provenance sitting around. Or do we care? + // Ideally, we should try to reuse jars and verify using some checksum. + FileUtils.deleteDirectory(localDir); + } + this.resourceDownloader = new ResourceDownloader(conf, localDir.getAbsolutePath()); + workThread.start(); + } + + public ClassLoader getClassLoader() { + return executorClassloader; + } + + public void startLocalizeAllFunctions() throws HiveException { + List<Function> fns = Hive.get(false).getAllFunctions(); + for (Function fn : fns) { + String fqfn = fn.getDbName() + "." + fn.getFunctionName(); + List<ResourceUri> resources = fn.getResourceUris(); + if (resources == null || resources.isEmpty()) continue; // Nothing to localize. + FnResources result = new FnResources(); + resourcesByFn.put(fqfn, result); + workQueue.add(new LocalizeFn(fqfn, resources, result, false)); + } + workQueue.add(new RefreshClassloader()); + } + + public void close() { + isClosed = true; + workThread.interrupt(); + try { + workThread.join(1000); // Give it some time, then don't delay shutdown too much. + } catch (InterruptedException e) { + LOG.info("Interrupted during close"); + } + } + + private void runWorkThread() { + while (true) { + if (isClosed) { + deleteAllLocalResources(); + return; + } + LocalizerWork lw = null; + try { + lw = workQueue.take(); + } catch (InterruptedException ex) { + LOG.debug("Localizer thread interrupted"); + isClosed = true; + } + if (isClosed) { + deleteAllLocalResources(); + return; + } + try { + lw.run(this); + } catch (InterruptedException ex) { + LOG.debug("Localizer thread interrupted"); + isClosed = true; + } catch (Exception ex) { + LOG.error("Failed to run " + lw, ex); + } + } + } + + private interface LocalizerWork { + void run(FunctionLocalizer parent) + throws URISyntaxException, IOException, InterruptedException; + } + + private static class LocalizeFn implements LocalizerWork { + private final List<ResourceUri> resources; + private final FnResources result; + private final String fqfn; + private final boolean doRefreshClassloader; + public LocalizeFn(String fqfn, List<ResourceUri> resources, FnResources result, + boolean doRefreshClassloader) { + this.resources = resources; + this.result = result; + this.fqfn = fqfn; + this.doRefreshClassloader = doRefreshClassloader; + } + + public void run(FunctionLocalizer parent) throws URISyntaxException, IOException { + parent.localizeFunctionResources(fqfn, resources, result, doRefreshClassloader); + } + + public String toString() { + return "localize " + resources.size() + " resources for " + fqfn; + } + } + + private static class RefreshClassloader implements LocalizerWork { + public void run(FunctionLocalizer parent) throws URISyntaxException, IOException { + parent.refreshClassloader(); + } + + public String toString() { + return "load the recently localized jars"; + } + } + + private void deleteAllLocalResources() { + try { + executorClassloader.close(); + } catch (Exception ex) { + LOG.info("Failed to close the classloader", ex.getMessage()); + } + resourcesByFn.clear(); + for (RefCountedResource rcr : localFiles.values()) { + for (FunctionResource fr : rcr.resources) { + // We ignore refcounts (and errors) for now. + File file = new File(fr.getResourceURI()); + try { + if (!file.delete()) { + LOG.info("Failed to delete " + file); + } + } catch (Exception ex) { + LOG.info("Failed to delete " + file + ": " + ex.getMessage()); + } + } + } + } + + public void refreshClassloader() throws IOException { + if (recentlyLocalizedJars.isEmpty()) return; + String[] jars = recentlyLocalizedJars.toArray(new String[0]); + recentlyLocalizedJars.clear(); + ClassLoader updatedCl = null; + try { + updatedCl = Utilities.addToClassPath(executorClassloader, jars); + if (LOG.isInfoEnabled()) { + LOG.info("Added " + jars.length + " jars to classpath"); + } + } catch (Throwable t) { + // TODO: we could fall back to trying one by one and only ignore the failed ones. + String jarringError = "Unable to register jars: "; + for (String jar : jars) { + jarringError += (jar + ", "); + } + throw new IOException(jarringError, t); + } + if (updatedCl != executorClassloader) { + throw new AssertionError("Classloader was replaced despite using UDFClassLoader: new " + + updatedCl + ", old " + executorClassloader); + } + } + + private void localizeFunctionResources(String fqfn, List<ResourceUri> resources, + FnResources result, boolean doRefreshClassloader) throws URISyntaxException, IOException { + // We will download into fn-scoped subdirectories to avoid name collisions (we assume there + // are no collisions within the same fn). That doesn't mean we download for every fn. + if (LOG.isInfoEnabled()) { + LOG.info("Localizing " + resources.size() + " resources for " + fqfn); + } + for (ResourceUri resource : resources) { + URI srcUri = ResourceDownloader.createURI(resource.getUri()); + ResourceType rt = FunctionTask.getResourceType(resource.getResourceType()); + localizeOneResource(fqfn, srcUri, rt, result); + } + if (doRefreshClassloader) { + refreshClassloader(); + } + } + + private void localizeOneResource(String fqfn, URI srcUri, ResourceType rt, FnResources result) + throws URISyntaxException, IOException { + RefCountedResource rcr = localFiles.get(srcUri); + if (rcr != null && rcr.refCount > 0) { + logFilesUsed("Reusing", fqfn, srcUri, rcr); + ++rcr.refCount; + result.addResources(rcr); + return; + } + rcr = new RefCountedResource(); + List<URI> localUris = resourceDownloader.downloadExternal(srcUri, fqfn, false); + if (localUris == null || localUris.isEmpty()) { + LOG.error("Cannot download " + srcUri + " for " + fqfn); + return; + } + rcr.resources = new ArrayList<>(); + for (URI uri : localUris) { + // Reuse the same type for all. Only Ivy can return more than one, probably all jars. + String path = uri.getPath(); + rcr.resources.add(new FunctionResource(rt, path)); + if (rt == ResourceType.JAR) { + recentlyLocalizedJars.add(path); + } + } + ++rcr.refCount; + logFilesUsed("Using", fqfn, srcUri, rcr); + localFiles.put(srcUri, rcr); + result.addResources(rcr); + } + + private void logFilesUsed(String what, String fqfn, URI srcUri, RefCountedResource rcr) { + if (!LOG.isInfoEnabled()) return; + String desc = (rcr.resources.size() == 1 + ? rcr.resources.get(0).toString() : (rcr.resources.size() + " files")); + LOG.info(what + " files [" + desc + "] for [" + srcUri + "] resource for " + fqfn); + } + + private static class RefCountedResource { + List<FunctionResource> resources; + int refCount = 0; + } + + private static class FnResources { + final List<FunctionResource> localResources = new ArrayList<>(); + final List<RefCountedResource> originals = new ArrayList<>(); + public void addResources(RefCountedResource rcr) { + localResources.addAll(rcr.resources); + originals.add(rcr); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/8dd1d196/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index ddedfbf..c5759d6 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -19,6 +19,7 @@ import java.lang.management.ManagementFactory; import java.lang.management.MemoryPoolMXBean; import java.lang.management.MemoryType; import java.net.InetSocketAddress; +import java.net.URLClassLoader; import java.util.Arrays; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -47,6 +48,7 @@ import org.apache.hadoop.hive.llap.metrics.LlapMetricsSystem; import org.apache.hadoop.hive.llap.metrics.MetricsUtils; import org.apache.hadoop.hive.llap.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; +import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.ExitUtil; @@ -74,6 +76,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla private final JvmPauseMonitor pauseMonitor; private final ObjectName llapDaemonInfoBean; private final LlapDaemonExecutorMetrics metrics; + private final FunctionLocalizer fnLocalizer; // Parameters used for JMX private final boolean llapIoEnabled; @@ -161,22 +164,23 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla LOG.info("Started LlapMetricsSystem with displayName: " + displayName + " sessionId: " + sessionId); - this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf); this.server = new LlapDaemonProtocolServerImpl( numHandlers, this, srvAddress, mngAddress, srvPort, mngPort); - this.containerRunner = new ContainerRunnerImpl(daemonConf, - numExecutors, - waitQueueSize, - enablePreemption, - localDirs, - this.shufflePort, - srvAddress, - executorMemoryBytes, - metrics, - amReporter); + ClassLoader executorClassLoader = null; + if (HiveConf.getBoolVar(daemonConf, ConfVars.LLAP_DAEMON_ALLOW_PERMANENT_FNS)) { + this.fnLocalizer = new FunctionLocalizer(daemonConf, localDirs[0]); + executorClassLoader = fnLocalizer.getClassLoader(); + } else { + this.fnLocalizer = null; + executorClassLoader = Thread.currentThread().getContextClassLoader(); + } + + this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, waitQueueSize, + enablePreemption, localDirs, this.shufflePort, srvAddress, executorMemoryBytes, metrics, + amReporter, executorClassLoader); addIfService(containerRunner); this.registry = new LlapRegistryService(true); @@ -235,7 +239,12 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla public void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); LlapProxy.setDaemon(true); + if (fnLocalizer != null) { + fnLocalizer.init(); + fnLocalizer.startLocalizeAllFunctions(); + } LlapProxy.initializeLlapIo(conf); + } @Override @@ -274,6 +283,10 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla } LlapProxy.close(); + + if (fnLocalizer != null) { + fnLocalizer.close(); + } } public static void main(String[] args) throws Exception { @@ -431,7 +444,6 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla } } - private class QueryFailedHandlerProxy implements QueryFailedHandler { @Override http://git-wip-us.apache.org/repos/asf/hive/blob/8dd1d196/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 34aa5c9..babdf14 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -33,10 +33,12 @@ import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; @@ -76,8 +78,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * new tasks. Shutting down of the task executor service can be done gracefully or immediately. */ public class TaskExecutorService extends AbstractService implements Scheduler<TaskRunnerCallable> { - - private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorService.class); private static final boolean isInfoEnabled = LOG.isInfoEnabled(); private static final boolean isDebugEnabled = LOG.isDebugEnabled(); @@ -105,8 +105,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta private final Object lock = new Object(); - public TaskExecutorService(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, - boolean enablePreemption) { + public TaskExecutorService(int numExecutors, int waitQueueSize, + String waitQueueComparatorClassName, boolean enablePreemption, + ClassLoader classLoader) { super(TaskExecutorService.class.getSimpleName()); LOG.info("TaskExecutorService is being setup with parameters: " + "numExecutors=" + numExecutors @@ -114,31 +115,13 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta + ", waitQueueComparatorClassName=" + waitQueueComparatorClassName + ", enablePreemption=" + enablePreemption); - final Comparator<TaskWrapper> waitQueueComparator; - try { - Class<? extends Comparator> waitQueueComparatorClazz = - (Class<? extends Comparator>) Class.forName( - waitQueueComparatorClassName); - Constructor<? extends Comparator> ctor = waitQueueComparatorClazz.getConstructor(null); - waitQueueComparator = ctor.newInstance(null); - } catch (ClassNotFoundException e) { - throw new RuntimeException( - "Failed to load wait queue comparator, class=" + waitQueueComparatorClassName, e); - } catch (NoSuchMethodException e) { - throw new RuntimeException("Failed to find constructor for wait queue comparator, class=" + - waitQueueComparatorClassName, e); - } catch (InvocationTargetException | InstantiationException | IllegalAccessException e) { - throw new RuntimeException( - "Failed to find instantiate wait queue comparator, class=" + waitQueueComparatorClassName, - e); - } + final Comparator<TaskWrapper> waitQueueComparator = createComparator( + waitQueueComparatorClassName); this.waitQueue = new EvictingPriorityBlockingQueue<>(waitQueueComparator, waitQueueSize); this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size numExecutors, // max pool size - 1, TimeUnit.MINUTES, - new SynchronousQueue<Runnable>(), // direct hand-off - new ThreadFactoryBuilder().setDaemon(true).setNameFormat(TASK_EXECUTOR_THREAD_NAME_FORMAT) - .build()); + 1, TimeUnit.MINUTES, new SynchronousQueue<Runnable>(), // direct hand-off + new ExecutorThreadFactory(classLoader)); this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor); this.preemptionQueue = new PriorityBlockingQueue<>(numExecutors, new PreemptionQueueComparator()); @@ -146,18 +129,38 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta this.numSlotsAvailable = new AtomicInteger(numExecutors); // single threaded scheduler for tasks from wait queue to executor threads - ExecutorService wes = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true) - .setNameFormat(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT).build()); + ExecutorService wes = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT).build()); this.waitQueueExecutorService = MoreExecutors.listeningDecorator(wes); ExecutorService executionCompletionExecutorServiceRaw = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ExecutionCompletionThread #%d") .build()); - executionCompletionExecutorService = MoreExecutors.listeningDecorator(executionCompletionExecutorServiceRaw); + executionCompletionExecutorService = MoreExecutors.listeningDecorator( + executionCompletionExecutorServiceRaw); ListenableFuture<?> future = waitQueueExecutorService.submit(new WaitQueueWorker()); Futures.addCallback(future, new WaitQueueWorkerCallback()); + } - + private Comparator<TaskWrapper> createComparator( + String waitQueueComparatorClassName) { + final Comparator<TaskWrapper> waitQueueComparator; + try { + Class<? extends Comparator> waitQueueComparatorClazz = + (Class<? extends Comparator>) Class.forName(waitQueueComparatorClassName); + Constructor<? extends Comparator> ctor = waitQueueComparatorClazz.getConstructor(null); + waitQueueComparator = ctor.newInstance(null); + } catch (ClassNotFoundException e) { + throw new RuntimeException( + "Failed to load wait queue comparator, class=" + waitQueueComparatorClassName, e); + } catch (NoSuchMethodException e) { + throw new RuntimeException("Failed to find constructor for wait queue comparator, class=" + + waitQueueComparatorClassName, e); + } catch (InvocationTargetException | InstantiationException | IllegalAccessException e) { + throw new RuntimeException("Failed to find instantiate wait queue comparator, class=" + + waitQueueComparatorClassName, e); + } + return waitQueueComparator; } @Override @@ -424,9 +427,11 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta synchronized (lock) { boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); LOG.info("Attempting to execute {}", taskWrapper); - ListenableFuture<TaskRunner2Result> future = executorService.submit(taskWrapper.getTaskRunnerCallable()); + ListenableFuture<TaskRunner2Result> future = executorService.submit( + taskWrapper.getTaskRunnerCallable()); taskWrapper.setIsInWaitQueue(false); - FutureCallback<TaskRunner2Result> wrappedCallback = createInternalCompletionListener(taskWrapper); + FutureCallback<TaskRunner2Result> wrappedCallback = createInternalCompletionListener( + taskWrapper); // Callback on a separate thread so that when a task completes, the thread in the main queue // is actually available for execution and will not potentially result in a RejectedExecution Futures.addCallback(future, wrappedCallback, executionCompletionExecutorService); @@ -452,7 +457,8 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta } private void handleScheduleAttemptedRejection(TaskWrapper taskWrapper) { - if (enablePreemption && taskWrapper.getTaskRunnerCallable().canFinish() && !preemptionQueue.isEmpty()) { + if (enablePreemption && taskWrapper.getTaskRunnerCallable().canFinish() + && !preemptionQueue.isEmpty()) { if (isDebugEnabled) { LOG.debug("Preemption Queue: " + preemptionQueue); @@ -733,4 +739,24 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta taskExecutorService.finishableStateUpdated(this, finishableState); } } + + private static class ExecutorThreadFactory implements ThreadFactory { + private final ClassLoader classLoader; + private final ThreadFactory defaultFactory; + private final AtomicLong count = new AtomicLong(0); + + public ExecutorThreadFactory(ClassLoader classLoader) { + this.classLoader = classLoader; + this.defaultFactory = Executors.defaultThreadFactory(); + } + + @Override + public Thread newThread(Runnable r) { + Thread thread = defaultFactory.newThread(r); + thread.setName(String.format(TASK_EXECUTOR_THREAD_NAME_FORMAT, count.getAndIncrement())); + thread.setDaemon(true); + thread.setContextClassLoader(classLoader); + return thread; + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/8dd1d196/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index 5491064..d1edd12 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -189,7 +189,8 @@ public class TestTaskExecutorService { public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, boolean enablePreemption) { - super(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption); + super(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption, + Thread.currentThread().getContextClassLoader()); } private ConcurrentMap<String, InternalCompletionListenerForTest> completionListeners = new ConcurrentHashMap<>(); http://git-wip-us.apache.org/repos/asf/hive/blob/8dd1d196/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java index ed6f062..77f11b9 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hive.ql.plan.DropMacroDesc; import org.apache.hadoop.hive.ql.plan.FunctionWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.util.ResourceDownloader; import org.apache.hadoop.util.StringUtils; /** @@ -251,7 +252,7 @@ public class FunctionTask extends Task<FunctionWork> { for (ResourceUri res : resources) { String resUri = res.getUri(); - if (!SessionState.canDownloadResource(resUri)) { + if (ResourceDownloader.isFileUri(resUri)) { throw new HiveException("Hive warehouse is non-local, but " + res.getUri() + " specifies file on local filesystem. " + "Resources on non-local warehouse should specify a non-local scheme/path"); @@ -280,7 +281,7 @@ public class FunctionTask extends Task<FunctionWork> { return converted; } - private static SessionState.ResourceType getResourceType(ResourceType rt) throws HiveException { + public static SessionState.ResourceType getResourceType(ResourceType rt) { switch (rt) { case JAR: return SessionState.ResourceType.JAR; @@ -289,7 +290,7 @@ public class FunctionTask extends Task<FunctionWork> { case ARCHIVE: return SessionState.ResourceType.ARCHIVE; default: - throw new HiveException("Unexpected resource type " + rt); + throw new AssertionError("Unexpected resource type " + rt); } } http://git-wip-us.apache.org/repos/asf/hive/blob/8dd1d196/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 290ee99..efb50b2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -211,6 +211,7 @@ public class Hive { } } + public void reloadFunctions() throws HiveException { for (Function function : getAllFunctions()) { String functionName = function.getFunctionName(); @@ -287,7 +288,7 @@ public class Hive { } closeCurrent(); c.set("fs.scheme.class", "dfs"); - Hive newdb = new Hive(c); + Hive newdb = new Hive(c, true); hiveDB.set(newdb); return newdb; } @@ -296,6 +297,10 @@ public class Hive { } public static Hive get() throws HiveException { + return get(true); + } + + public static Hive get(boolean doRegisterAllFns) throws HiveException { Hive db = hiveDB.get(); if (db != null && !db.isCurrentUserOwner()) { LOG.debug("Creating new db. db.isCurrentUserOwner = " + db.isCurrentUserOwner()); @@ -304,7 +309,8 @@ public class Hive { } if (db == null) { SessionState session = SessionState.get(); - db = new Hive(session == null ? new HiveConf(Hive.class) : session.getConf()); + HiveConf conf = session == null ? new HiveConf(Hive.class) : session.getConf(); + db = new Hive(conf, doRegisterAllFns); hiveDB.set(db); } return db; @@ -324,9 +330,11 @@ public class Hive { * @param c * */ - private Hive(HiveConf c) throws HiveException { + private Hive(HiveConf c, boolean doRegisterAllFns) throws HiveException { conf = c; - registerAllFunctionsOnce(); + if (doRegisterAllFns) { + registerAllFunctionsOnce(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/8dd1d196/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java index af6129a..ae64749 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java @@ -259,7 +259,7 @@ public class LlapDecider implements PhysicalPlanResolver { private boolean checkAggregator(AggregationDesc agg) throws SemanticException { if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Checking '%s'", agg.getExprString())); + LOG.debug(String.format("Checking '%s'", agg.getExprString())); } boolean result = checkExpressions(agg.getParameters()); http://git-wip-us.apache.org/repos/asf/hive/blob/8dd1d196/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index a0251fb..daa9d4a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -195,6 +195,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFHash; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr; import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF; +import org.apache.hadoop.hive.ql.util.ResourceDownloader; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.Deserializer; import org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe; @@ -3074,7 +3075,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { SessionState ss = SessionState.get(); String progName = getScriptProgName(cmd); - if (SessionState.canDownloadResource(progName)) { + if (!ResourceDownloader.isFileUri(progName)) { String filePath = ss.add_resource(ResourceType.FILE, progName, true); Path p = new Path(filePath); String fileName = p.getName(); @@ -3338,7 +3339,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (conf.getBoolVar(ConfVars.HIVE_CAPTURE_TRANSFORM_ENTITY)) { String scriptCmd = getScriptProgName(stripQuotes(trfm.getChild(execPos).getText())); getInputs().add(new ReadEntity(new Path(scriptCmd), - !SessionState.canDownloadResource(scriptCmd))); + ResourceDownloader.isFileUri(scriptCmd))); } return output; http://git-wip-us.apache.org/repos/asf/hive/blob/8dd1d196/ql/src/java/org/apache/hadoop/hive/ql/session/DependencyResolver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/DependencyResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/session/DependencyResolver.java deleted file mode 100644 index 7289426..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/DependencyResolver.java +++ /dev/null @@ -1,183 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.session; - -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.util.Arrays; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.io.File; -import java.io.IOException; - -import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; -import org.slf4j.LoggerFactory; - -import groovy.grape.Grape; -import groovy.lang.GroovyClassLoader; - - -public class DependencyResolver { - - private static final String HIVE_HOME = "HIVE_HOME"; - private static final String HIVE_CONF_DIR = "HIVE_CONF_DIR"; - private String ivysettingsPath; - private static LogHelper _console = new LogHelper(LoggerFactory.getLogger("DependencyResolver")); - - public DependencyResolver() { - - // Check if HIVE_CONF_DIR is defined - if (System.getenv().containsKey(HIVE_CONF_DIR)) { - ivysettingsPath = System.getenv().get(HIVE_CONF_DIR) + "/ivysettings.xml"; - } - - // If HIVE_CONF_DIR is not defined or file is not found in HIVE_CONF_DIR then check HIVE_HOME/conf - if (ivysettingsPath == null || !(new File(ivysettingsPath).exists())) { - if (System.getenv().containsKey(HIVE_HOME)) { - ivysettingsPath = System.getenv().get(HIVE_HOME) + "/conf/ivysettings.xml"; - } - } - - // If HIVE_HOME is not defined or file is not found in HIVE_HOME/conf then load default ivysettings.xml from class loader - if (ivysettingsPath == null || !(new File(ivysettingsPath).exists())) { - URL ivysetttingsResource = ClassLoader.getSystemResource("ivysettings.xml"); - if (ivysetttingsResource != null){ - ivysettingsPath = ivysetttingsResource.getFile(); - _console.printInfo("ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR," + ivysettingsPath + " will be used"); - } - } - - } - - /** - * - * @param uri - * @return List of URIs of downloaded jars - * @throws URISyntaxException - * @throws IOException - */ - public List<URI> downloadDependencies(URI uri) throws URISyntaxException, IOException { - Map<String, Object> dependencyMap = new HashMap<String, Object>(); - String authority = uri.getAuthority(); - if (authority == null) { - throw new URISyntaxException(authority, "Invalid url: Expected 'org:module:version', found null"); - } - String[] authorityTokens = authority.toLowerCase().split(":"); - - if (authorityTokens.length != 3) { - throw new URISyntaxException(authority, "Invalid url: Expected 'org:module:version', found " + authority); - } - - dependencyMap.put("org", authorityTokens[0]); - dependencyMap.put("module", authorityTokens[1]); - dependencyMap.put("version", authorityTokens[2]); - Map<String, Object> queryMap = parseQueryString(uri.getQuery()); - if (queryMap != null) { - dependencyMap.putAll(queryMap); - } - return grab(dependencyMap); - } - - /** - * @param queryString - * @return queryMap Map which contains grape parameters such as transitive, exclude, ext and classifier. - * Example: Input: ext=jar&exclude=org.mortbay.jetty:jetty&transitive=true - * Output: {[ext]:[jar], [exclude]:{[group]:[org.mortbay.jetty], [module]:[jetty]}, [transitive]:[true]} - * @throws URISyntaxException - */ - private Map<String, Object> parseQueryString(String queryString) throws URISyntaxException { - if (queryString == null || queryString.isEmpty()) { - return null; - } - List<Map<String, String>> excludeList = new LinkedList<Map<String, String>>(); - Map<String, Object> queryMap = new HashMap<String, Object>(); - String[] mapTokens = queryString.split("&"); - for (String tokens : mapTokens) { - String[] mapPair = tokens.split("="); - if (mapPair.length != 2) { - throw new RuntimeException("Invalid query string: " + queryString); - } - if (mapPair[0].equals("exclude")) { - excludeList.addAll(computeExcludeList(mapPair[1])); - } else if (mapPair[0].equals("transitive")) { - if (mapPair[1].toLowerCase().equals("true")) { - queryMap.put(mapPair[0], true); - } else { - queryMap.put(mapPair[0], false); - } - } else { - queryMap.put(mapPair[0], mapPair[1]); - } - } - if (!excludeList.isEmpty()) { - queryMap.put("exclude", excludeList); - } - return queryMap; - } - - private List<Map<String, String>> computeExcludeList(String excludeString) throws URISyntaxException { - String excludes[] = excludeString.split(","); - List<Map<String, String>> excludeList = new LinkedList<Map<String, String>>(); - for (String exclude : excludes) { - Map<String, String> tempMap = new HashMap<String, String>(); - String args[] = exclude.split(":"); - if (args.length != 2) { - throw new URISyntaxException(excludeString, - "Invalid exclude string: expected 'org:module,org:module,..', found " + excludeString); - } - tempMap.put("group", args[0]); - tempMap.put("module", args[1]); - excludeList.add(tempMap); - } - return excludeList; - } - - /** - * - * @param dependencies - * @return List of URIs of downloaded jars - * @throws IOException - */ - private List<URI> grab(Map<String, Object> dependencies) throws IOException { - Map<String, Object> args = new HashMap<String, Object>(); - URI[] localUrls; - - //grape expects excludes key in args map - if (dependencies.containsKey("exclude")) { - args.put("excludes", dependencies.get("exclude")); - } - - //Set transitive to true by default - if (!dependencies.containsKey("transitive")) { - dependencies.put("transitive", true); - } - - args.put("classLoader", new GroovyClassLoader()); - System.setProperty("grape.config", ivysettingsPath); - System.setProperty("groovy.grape.report.downloads", "true"); - localUrls = Grape.resolve(args, dependencies); - if (localUrls == null) { - throw new IOException("Not able to download all the dependencies.."); - } - return Arrays.asList(localUrls); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/8dd1d196/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 5f15557..efeb70f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -46,7 +46,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.FileUtils; @@ -82,13 +81,13 @@ import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthorizerFac import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzSessionContext.CLIENT_TYPE; import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveMetastoreClientFactoryImpl; -import org.apache.hadoop.hive.ql.util.DosToUnix; +import org.apache.hadoop.hive.ql.util.ResourceDownloader; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.hive.shims.Utils; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Shell; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; /** @@ -267,7 +266,8 @@ public class SessionState { private final ResourceMaps resourceMaps; - private final DependencyResolver dependencyResolver; + private final ResourceDownloader resourceDownloader; + /** * Get the lineage state stored in this session. * @@ -354,7 +354,6 @@ public class SessionState { isSilent = conf.getBoolVar(HiveConf.ConfVars.HIVESESSIONSILENT); ls = new LineageState(); resourceMaps = new ResourceMaps(); - dependencyResolver = new DependencyResolver(); // Must be deterministic order map for consistent q-test output across Java versions overriddenConfigurations = new LinkedHashMap<String, String>(); overriddenConfigurations.putAll(HiveConf.getConfSystemProperties()); @@ -368,6 +367,8 @@ public class SessionState { // Make sure that each session has its own UDFClassloader. For details see {@link UDFClassLoader} final ClassLoader currentLoader = Utilities.createUDFClassLoader((URLClassLoader) parentLoader, new String[]{}); this.conf.setClassLoader(currentLoader); + resourceDownloader = new ResourceDownloader(conf, + HiveConf.getVar(conf, ConfVars.DOWNLOADED_RESOURCES_DIR)); } public void setCmd(String cmdString) { @@ -443,6 +444,7 @@ public class SessionState { } private void attach(HiveConf conf) { this.conf = conf; + ClassLoader classLoader = conf.getClassLoader(); if (classLoader != null) { Thread.currentThread().setContextClassLoader(classLoader); @@ -1210,11 +1212,11 @@ public class SessionState { String key; //get the local path of downloaded jars. - List<URI> downloadedURLs = resolveAndDownload(t, value, convertToUnix); + List<URI> downloadedURLs = resolveAndDownload(value, convertToUnix); - if (getURLType(value).equals("ivy")) { + if (ResourceDownloader.isIvyUri(value)) { // get the key to store in map - key = createURI(value).getAuthority(); + key = ResourceDownloader.createURI(value).getAuthority(); } else { // for local file and hdfs, key and value are same. key = downloadedURLs.get(0).toString(); @@ -1255,85 +1257,10 @@ public class SessionState { return localized; } - /** - * @param path - * @return URI corresponding to the path. - */ - private static URI createURI(String path) throws URISyntaxException { - if (!Shell.WINDOWS) { - // If this is not windows shell, path better follow unix convention. - // Else, the below call will throw an URISyntaxException - return new URI(path); - } else { - return new Path(path).toUri(); - } - } - - private static String getURLType(String value) throws URISyntaxException { - URI uri = createURI(value); - String scheme = uri.getScheme() == null ? null : uri.getScheme().toLowerCase(); - if (scheme == null || scheme.equals("file")) { - return "file"; - } - return scheme; - } - - protected List<URI> resolveAndDownload(ResourceType t, String value, boolean convertToUnix) throws URISyntaxException, - IOException { - URI uri = createURI(value); - if (getURLType(value).equals("file")) { - return Arrays.asList(uri); - } else if (getURLType(value).equals("ivy")) { - return dependencyResolver.downloadDependencies(uri); - } else { - return Arrays.asList(createURI(downloadResource(value, convertToUnix))); - } - } - - - - /** - * Returns true if it is from any external File Systems except local - */ - public static boolean canDownloadResource(String value) { - // Allow to download resources from any external FileSystem. - // And no need to download if it already exists on local file system. - String scheme = new Path(value).toUri().getScheme(); - return (scheme != null) && !scheme.equalsIgnoreCase("file"); - } - - private String downloadResource(String value, boolean convertToUnix) { - if (canDownloadResource(value)) { - getConsole().printInfo("converting to local " + value); - File resourceDir = new File(getConf().getVar(HiveConf.ConfVars.DOWNLOADED_RESOURCES_DIR)); - String destinationName = new Path(value).getName(); - File destinationFile = new File(resourceDir, destinationName); - if (resourceDir.exists() && ! resourceDir.isDirectory()) { - throw new RuntimeException("The resource directory is not a directory, resourceDir is set to" + resourceDir); - } - if (!resourceDir.exists() && !resourceDir.mkdirs()) { - throw new RuntimeException("Couldn't create directory " + resourceDir); - } - try { - FileSystem fs = FileSystem.get(createURI(value), conf); - fs.copyToLocalFile(new Path(value), new Path(destinationFile.getCanonicalPath())); - value = destinationFile.getCanonicalPath(); - - // add "execute" permission to downloaded resource file (needed when loading dll file) - FileUtil.chmod(value, "ugo+rx", true); - if (convertToUnix && DosToUnix.isWindowsScript(destinationFile)) { - try { - DosToUnix.convertWindowsScriptToUnix(destinationFile); - } catch (Exception e) { - throw new RuntimeException("Caught exception while converting file " + - destinationFile + " to unix line endings", e); - } - } - } catch (Exception e) { - throw new RuntimeException("Failed to read external resource " + value, e); - } - } - return value; + @VisibleForTesting + protected List<URI> resolveAndDownload(String value, boolean convertToUnix) + throws URISyntaxException, IOException { + return resourceDownloader.resolveAndDownload(value, convertToUnix); } public void delete_resources(ResourceType t, List<String> values) { @@ -1348,8 +1275,8 @@ public class SessionState { for (String value : values) { String key = value; try { - if (getURLType(value).equals("ivy")) { - key = createURI(value).getAuthority(); + if (ResourceDownloader.isIvyUri(value)) { + key = ResourceDownloader.createURI(value).getAuthority(); } } catch (URISyntaxException e) { throw new RuntimeException("Invalid uri string " + value + ", " + e.getMessage()); @@ -1687,6 +1614,10 @@ public class SessionState { public Timestamp getQueryCurrentTimestamp() { return queryCurrentTimestamp; } + + public ResourceDownloader getResourceDownloader() { + return resourceDownloader; + } } class ResourceMaps { http://git-wip-us.apache.org/repos/asf/hive/blob/8dd1d196/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java new file mode 100644 index 0000000..3891e59 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/DependencyResolver.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; +import org.slf4j.LoggerFactory; + +import groovy.grape.Grape; +import groovy.lang.GroovyClassLoader; + + +public class DependencyResolver { + + private static final String HIVE_HOME = "HIVE_HOME"; + private static final String HIVE_CONF_DIR = "HIVE_CONF_DIR"; + private String ivysettingsPath; + private static LogHelper _console = new LogHelper(LoggerFactory.getLogger("DependencyResolver")); + + public DependencyResolver() { + + // Check if HIVE_CONF_DIR is defined + if (System.getenv().containsKey(HIVE_CONF_DIR)) { + ivysettingsPath = System.getenv().get(HIVE_CONF_DIR) + "/ivysettings.xml"; + } + + // If HIVE_CONF_DIR is not defined or file is not found in HIVE_CONF_DIR then check HIVE_HOME/conf + if (ivysettingsPath == null || !(new File(ivysettingsPath).exists())) { + if (System.getenv().containsKey(HIVE_HOME)) { + ivysettingsPath = System.getenv().get(HIVE_HOME) + "/conf/ivysettings.xml"; + } + } + + // If HIVE_HOME is not defined or file is not found in HIVE_HOME/conf then load default ivysettings.xml from class loader + if (ivysettingsPath == null || !(new File(ivysettingsPath).exists())) { + URL ivysetttingsResource = ClassLoader.getSystemResource("ivysettings.xml"); + if (ivysetttingsResource != null){ + ivysettingsPath = ivysetttingsResource.getFile(); + _console.printInfo("ivysettings.xml file not found in HIVE_HOME or HIVE_CONF_DIR," + ivysettingsPath + " will be used"); + } + } + + } + + /** + * + * @param uri + * @return List of URIs of downloaded jars + * @throws URISyntaxException + * @throws IOException + */ + public List<URI> downloadDependencies(URI uri) throws URISyntaxException, IOException { + Map<String, Object> dependencyMap = new HashMap<String, Object>(); + String authority = uri.getAuthority(); + if (authority == null) { + throw new URISyntaxException(authority, "Invalid url: Expected 'org:module:version', found null"); + } + String[] authorityTokens = authority.toLowerCase().split(":"); + + if (authorityTokens.length != 3) { + throw new URISyntaxException(authority, "Invalid url: Expected 'org:module:version', found " + authority); + } + + dependencyMap.put("org", authorityTokens[0]); + dependencyMap.put("module", authorityTokens[1]); + dependencyMap.put("version", authorityTokens[2]); + Map<String, Object> queryMap = parseQueryString(uri.getQuery()); + if (queryMap != null) { + dependencyMap.putAll(queryMap); + } + return grab(dependencyMap); + } + + /** + * @param queryString + * @return queryMap Map which contains grape parameters such as transitive, exclude, ext and classifier. + * Example: Input: ext=jar&exclude=org.mortbay.jetty:jetty&transitive=true + * Output: {[ext]:[jar], [exclude]:{[group]:[org.mortbay.jetty], [module]:[jetty]}, [transitive]:[true]} + * @throws URISyntaxException + */ + private Map<String, Object> parseQueryString(String queryString) throws URISyntaxException { + if (queryString == null || queryString.isEmpty()) { + return null; + } + List<Map<String, String>> excludeList = new LinkedList<Map<String, String>>(); + Map<String, Object> queryMap = new HashMap<String, Object>(); + String[] mapTokens = queryString.split("&"); + for (String tokens : mapTokens) { + String[] mapPair = tokens.split("="); + if (mapPair.length != 2) { + throw new RuntimeException("Invalid query string: " + queryString); + } + if (mapPair[0].equals("exclude")) { + excludeList.addAll(computeExcludeList(mapPair[1])); + } else if (mapPair[0].equals("transitive")) { + if (mapPair[1].toLowerCase().equals("true")) { + queryMap.put(mapPair[0], true); + } else { + queryMap.put(mapPair[0], false); + } + } else { + queryMap.put(mapPair[0], mapPair[1]); + } + } + if (!excludeList.isEmpty()) { + queryMap.put("exclude", excludeList); + } + return queryMap; + } + + private List<Map<String, String>> computeExcludeList(String excludeString) throws URISyntaxException { + String excludes[] = excludeString.split(","); + List<Map<String, String>> excludeList = new LinkedList<Map<String, String>>(); + for (String exclude : excludes) { + Map<String, String> tempMap = new HashMap<String, String>(); + String args[] = exclude.split(":"); + if (args.length != 2) { + throw new URISyntaxException(excludeString, + "Invalid exclude string: expected 'org:module,org:module,..', found " + excludeString); + } + tempMap.put("group", args[0]); + tempMap.put("module", args[1]); + excludeList.add(tempMap); + } + return excludeList; + } + + /** + * + * @param dependencies + * @return List of URIs of downloaded jars + * @throws IOException + */ + private List<URI> grab(Map<String, Object> dependencies) throws IOException { + Map<String, Object> args = new HashMap<String, Object>(); + URI[] localUrls; + + //grape expects excludes key in args map + if (dependencies.containsKey("exclude")) { + args.put("excludes", dependencies.get("exclude")); + } + + //Set transitive to true by default + if (!dependencies.containsKey("transitive")) { + dependencies.put("transitive", true); + } + + args.put("classLoader", new GroovyClassLoader()); + System.setProperty("grape.config", ivysettingsPath); + System.setProperty("groovy.grape.report.downloads", "true"); + localUrls = Grape.resolve(args, dependencies); + if (localUrls == null) { + throw new IOException("Not able to download all the dependencies.."); + } + return Arrays.asList(localUrls); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8dd1d196/ql/src/java/org/apache/hadoop/hive/ql/util/DosToUnix.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/DosToUnix.java b/ql/src/java/org/apache/hadoop/hive/ql/util/DosToUnix.java index 4480d54..6aecd49 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/util/DosToUnix.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/DosToUnix.java @@ -24,11 +24,12 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileReader; import java.io.FileWriter; +import java.io.IOException; import java.io.InputStreamReader; public class DosToUnix { - public static String convertWindowsScriptToUnix(File windowsScriptFile) throws Exception { + public static String convertWindowsScriptToUnix(File windowsScriptFile) throws IOException { String windowsScriptFilename = windowsScriptFile.getName(); String unixScriptFilename = getUnixScriptNameFor(windowsScriptFilename); File unixScriptFile = null; http://git-wip-us.apache.org/repos/asf/hive/blob/8dd1d196/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java b/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java new file mode 100644 index 0000000..14102b5 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/util/ResourceDownloader.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.util; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.util.DosToUnix; +import org.apache.hadoop.util.Shell; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +public class ResourceDownloader { + private static final Logger LOG = LoggerFactory.getLogger(ResourceDownloader.class); + private final DependencyResolver dependencyResolver; + private final Configuration conf; + private final File resourceDir; + + public ResourceDownloader(Configuration conf, String resourceDirPath) { + this.dependencyResolver = new DependencyResolver(); + this.conf = conf; + this.resourceDir = new File(resourceDirPath); + ensureDirectory(resourceDir); + } + + /** + * @param path + * @return URI corresponding to the path. + */ + public static URI createURI(String path) throws URISyntaxException { + if (!Shell.WINDOWS) { + // If this is not windows shell, path better follow unix convention. + // Else, the below call will throw an URISyntaxException + return new URI(path); + } else { + return new Path(path).toUri(); + } + } + + public static boolean isIvyUri(String value) throws URISyntaxException { + return "ivy".equalsIgnoreCase(createURI(value).getScheme()); + } + + public static boolean isFileUri(String value) { + String scheme = null; + try { + scheme = createURI(value).getScheme(); + } catch (URISyntaxException ex) { + throw new RuntimeException(ex); + } + return (scheme == null) || scheme.equalsIgnoreCase("file"); + } + + public List<URI> resolveAndDownload(String source, boolean convertToUnix) + throws URISyntaxException, IOException { + return resolveAndDownloadInternal(createURI(source), null, convertToUnix, true); + } + + public List<URI> downloadExternal(URI source, String subDir, boolean convertToUnix) + throws URISyntaxException, IOException { + return resolveAndDownloadInternal(source, subDir, convertToUnix, false); + } + + private List<URI> resolveAndDownloadInternal(URI source, String subDir, + boolean convertToUnix, boolean isLocalAllowed) throws URISyntaxException, IOException { + switch (getURLType(source)) { + case FILE: return isLocalAllowed ? Lists.newArrayList(source) : null; + case IVY: return dependencyResolver.downloadDependencies(source); + case OTHER: return Lists.newArrayList( + createURI(downloadResource(source, subDir, convertToUnix))); + default: throw new AssertionError(getURLType(source)); + } + } + + private String downloadResource(URI srcUri, String subDir, boolean convertToUnix) + throws IOException, URISyntaxException { + LOG.info("converting to local " + srcUri); + File destinationDir = (subDir == null) ? resourceDir : new File(resourceDir, subDir); + ensureDirectory(destinationDir); + File destinationFile = new File(destinationDir, new Path(srcUri.toString()).getName()); + FileSystem fs = FileSystem.get(srcUri, conf); + String dest = destinationFile.getCanonicalPath(); + fs.copyToLocalFile(new Path(srcUri.toString()), new Path(dest)); + // add "execute" permission to downloaded resource file (needed when loading dll file) + FileUtil.chmod(dest, "ugo+rx", true); + if (convertToUnix && DosToUnix.isWindowsScript(destinationFile)) { + DosToUnix.convertWindowsScriptToUnix(destinationFile); + } + return dest; + } + + private static void ensureDirectory(File resourceDir) { + boolean doesExist = resourceDir.exists(); + if (doesExist && !resourceDir.isDirectory()) { + throw new RuntimeException(resourceDir + " is not a directory"); + } + if (!doesExist && !resourceDir.mkdirs()) { + throw new RuntimeException("Couldn't create directory " + resourceDir); + } + } + + private enum UriType { IVY, FILE, OTHER }; + private static ResourceDownloader.UriType getURLType(URI value) throws URISyntaxException { + String scheme = value.getScheme(); + if (scheme == null) return UriType.FILE; + scheme = scheme.toLowerCase(); + if ("ivy".equals(scheme)) return UriType.IVY; + if ("file".equals(scheme)) return UriType.FILE; + return UriType.OTHER; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/8dd1d196/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java b/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java index 1f07235..2de1b25 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/session/TestAddResource.java @@ -83,7 +83,7 @@ public class TestAddResource { list.add(createURI(TEST_JAR_DIR + "testjar5.jar")); //return all the dependency urls - Mockito.when(ss.resolveAndDownload(t, query, false)).thenReturn(list); + Mockito.when(ss.resolveAndDownload(query, false)).thenReturn(list); addList.add(query); ss.add_resources(t, addList); Set<String> dependencies = ss.list_resource(t, null); @@ -119,7 +119,7 @@ public class TestAddResource { Collections.sort(list); - Mockito.when(ss.resolveAndDownload(t, query, false)).thenReturn(list); + Mockito.when(ss.resolveAndDownload(query, false)).thenReturn(list); for (int i = 0; i < 10; i++) { addList.add(query); } @@ -157,8 +157,8 @@ public class TestAddResource { list2.add(createURI(TEST_JAR_DIR + "testjar3.jar")); list2.add(createURI(TEST_JAR_DIR + "testjar4.jar")); - Mockito.when(ss.resolveAndDownload(t, query1, false)).thenReturn(list1); - Mockito.when(ss.resolveAndDownload(t, query2, false)).thenReturn(list2); + Mockito.when(ss.resolveAndDownload(query1, false)).thenReturn(list1); + Mockito.when(ss.resolveAndDownload(query2, false)).thenReturn(list2); addList.add(query1); addList.add(query2); ss.add_resources(t, addList); @@ -214,8 +214,8 @@ public class TestAddResource { Collections.sort(list1); Collections.sort(list2); - Mockito.when(ss.resolveAndDownload(t, query1, false)).thenReturn(list1); - Mockito.when(ss.resolveAndDownload(t, query2, false)).thenReturn(list2); + Mockito.when(ss.resolveAndDownload(query1, false)).thenReturn(list1); + Mockito.when(ss.resolveAndDownload(query2, false)).thenReturn(list2); addList.add(query1); addList.add(query2); ss.add_resources(t, addList); @@ -273,9 +273,9 @@ public class TestAddResource { Collections.sort(list2); Collections.sort(list3); - Mockito.when(ss.resolveAndDownload(t, query1, false)).thenReturn(list1); - Mockito.when(ss.resolveAndDownload(t, query2, false)).thenReturn(list2); - Mockito.when(ss.resolveAndDownload(t, query3, false)).thenReturn(list3); + Mockito.when(ss.resolveAndDownload(query1, false)).thenReturn(list1); + Mockito.when(ss.resolveAndDownload(query2, false)).thenReturn(list2); + Mockito.when(ss.resolveAndDownload(query3, false)).thenReturn(list3); addList.add(query1); addList.add(query2); addList.add(query3);
