http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java index af1fa66..afe1484 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorSparkPartitionPruningSinkOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext; +import org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkCommonOperator; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; @@ -143,13 +144,17 @@ public final class OperatorFactory { public static <T extends OperatorDesc> Operator<T> getVectorOperator( Class<? extends Operator<?>> opClass, CompilationOpContext cContext, T conf, - VectorizationContext vContext) throws HiveException { + VectorizationContext vContext, Operator<? extends OperatorDesc> originalOp) throws HiveException { try { VectorDesc vectorDesc = ((AbstractOperatorDesc) conf).getVectorDesc(); vectorDesc.setVectorOp(opClass); - Operator<T> op = (Operator<T>) opClass.getDeclaredConstructor( - CompilationOpContext.class, VectorizationContext.class, OperatorDesc.class) - .newInstance(cContext, vContext, conf); + Operator<T> op = (Operator<T>) opClass.getDeclaredConstructor(CompilationOpContext.class, + VectorizationContext.class, OperatorDesc.class).newInstance(cContext, vContext, conf); + op.setOperatorId(originalOp.getOperatorId()); + if (op instanceof VectorReduceSinkOperator || op instanceof VectorReduceSinkCommonOperator) { + ((ReduceSinkDesc) op.getConf()).setOutputOperators(((ReduceSinkDesc) originalOp.getConf()) + .getOutputOperators()); + } return op; } catch (Exception e) { e.printStackTrace(); @@ -158,11 +163,12 @@ public final class OperatorFactory { } public static <T extends OperatorDesc> Operator<T> getVectorOperator( - CompilationOpContext cContext, T conf, VectorizationContext vContext) throws HiveException { + CompilationOpContext cContext, T conf, VectorizationContext vContext, + Operator<? extends OperatorDesc> originalOp) throws HiveException { Class<T> descClass = (Class<T>) conf.getClass(); - Class<?> opClass = vectorOpvec.get(descClass); + Class<? extends Operator<? extends OperatorDesc>> opClass = vectorOpvec.get(descClass); if (opClass != null) { - return getVectorOperator(vectorOpvec.get(descClass), cContext, conf, vContext); + return getVectorOperator(opClass, cContext, conf, vContext, originalOp); } throw new HiveException("No vector operator for descriptor class " + descClass.getName()); }
http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index 3b10bfd..5412ef1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -107,8 +107,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable { srcFiles.addAll(Arrays.asList(srcs)); LOG.debug("ReplCopyTask numFiles:" + (srcFiles == null ? "null" : srcFiles.size())); - boolean inheritPerms = conf.getBoolVar(HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS); - if (!FileUtils.mkdir(dstFs, toPath, inheritPerms, conf)) { + if (!FileUtils.mkdir(dstFs, toPath, conf)) { console.printError("Cannot make target directory: " + toPath.toString()); return 2; } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java index 247d589..01a652d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java @@ -33,10 +33,12 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.CopyOnFirstWriteProperties; import org.apache.hadoop.hive.ql.CompilationOpContext; import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; @@ -223,6 +225,7 @@ public class SerializationUtilities { kryo.register(java.sql.Timestamp.class, new TimestampSerializer()); kryo.register(Path.class, new PathSerializer()); kryo.register(Arrays.asList("").getClass(), new ArraysAsListSerializer()); + kryo.register(CopyOnFirstWriteProperties.class, new CopyOnFirstWritePropertiesSerializer()); ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy()) .setFallbackInstantiatorStrategy( @@ -422,6 +425,33 @@ public class SerializationUtilities { } /** + * CopyOnFirstWriteProperties needs a special serializer, since it extends Properties, + * which implements Map, so MapSerializer would be used for it by default. Yet it has + * the additional 'interned' field that the standard MapSerializer doesn't handle + * properly. But FieldSerializer doesn't work for it as well, because the Hashtable + * superclass declares most of its fields transient. + */ + private static class CopyOnFirstWritePropertiesSerializer extends + com.esotericsoftware.kryo.serializers.MapSerializer { + + @Override + public void write(Kryo kryo, Output output, Map map) { + super.write(kryo, output, map); + CopyOnFirstWriteProperties p = (CopyOnFirstWriteProperties) map; + Properties ip = p.getInterned(); + kryo.writeObjectOrNull(output, ip, Properties.class); + } + + @Override + public Map read(Kryo kryo, Input input, Class<Map> type) { + Map map = super.read(kryo, input, type); + Properties ip = kryo.readObjectOrNull(input, Properties.class); + ((CopyOnFirstWriteProperties) map).setInterned(ip); + return map; + } + } + + /** * Serializes the plan. * * @param plan The plan, such as QueryPlan, MapredWork, etc. http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java index 65227e9..65363ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java @@ -347,14 +347,15 @@ public class StatsNoJobTask extends Task<StatsNoJobWork> implements Serializable try { // Wait a while for existing tasks to terminate - if (!threadPool.awaitTermination(100, TimeUnit.SECONDS)) { - // Cancel currently executing tasks - threadPool.shutdownNow(); + while (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) { + LOG.debug("Waiting for all stats tasks to finish..."); + } + // Cancel currently executing tasks + threadPool.shutdownNow(); - // Wait a while for tasks to respond to being cancelled - if (!threadPool.awaitTermination(100, TimeUnit.SECONDS)) { - LOG.debug("Stats collection thread pool did not terminate"); - } + // Wait a while for tasks to respond to being cancelled + if (!threadPool.awaitTermination(100, TimeUnit.SECONDS)) { + LOG.debug("Stats collection thread pool did not terminate"); } } catch (InterruptedException ie) { http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java index a596e92..eddc31e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java @@ -22,7 +22,6 @@ import java.io.Serializable; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.hive.ql.metadata.Hive; -import org.apache.hadoop.hive.ql.session.OperationLog; import org.apache.hadoop.hive.ql.session.SessionState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +34,6 @@ public class TaskRunner extends Thread { protected Task<? extends Serializable> tsk; protected TaskResult result; protected SessionState ss; - private OperationLog operationLog; private static AtomicLong taskCounter = new AtomicLong(0); private static ThreadLocal<Long> taskRunnerID = new ThreadLocal<Long>() { @Override @@ -74,7 +72,6 @@ public class TaskRunner extends Thread { public void run() { runner = Thread.currentThread(); try { - OperationLog.setCurrentOperationLog(operationLog); SessionState.start(ss); runSequential(); } finally { @@ -113,8 +110,4 @@ public class TaskRunner extends Thread { public static long getTaskRunnerID () { return taskRunnerID.get(); } - - public void setOperationLog(OperationLog operationLog) { - this.operationLog = operationLog; - } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java index f3c7c77..48ae02f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java @@ -27,6 +27,7 @@ import java.util.TreeMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.LlapDaemonInfo; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -105,8 +106,8 @@ public class TopNHash { } final boolean isTez = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez"); - final boolean isLlap = isTez && HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap"); - final int numExecutors = isLlap ? HiveConf.getIntVar(hconf, HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS) : 1; + final boolean isLlap = LlapDaemonInfo.INSTANCE.isLlap(); + final int numExecutors = isLlap ? LlapDaemonInfo.INSTANCE.getNumExecutors() : 1; // Used Memory = totalMemory() - freeMemory(); // Total Free Memory = maxMemory() - Used Memory; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 5b5ddc3..777c119 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -76,6 +76,7 @@ import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.WordUtils; @@ -109,6 +110,8 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Order; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.Context; +import org.apache.hadoop.hive.ql.Driver.DriverState; +import org.apache.hadoop.hive.ql.Driver.LockedDriverState; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.QueryPlan; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; @@ -203,7 +206,6 @@ import org.apache.hadoop.mapred.SequenceFileInputFormat; import org.apache.hadoop.mapred.SequenceFileOutputFormat; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.Shell; import org.apache.hive.common.util.ACLConfigurationParser; import org.apache.hive.common.util.ReflectionUtil; import org.slf4j.Logger; @@ -280,11 +282,11 @@ public final class Utilities { * The object in the reducer are composed of these top level fields. */ - public static String HADOOP_LOCAL_FS = "file:///"; + public static final String HADOOP_LOCAL_FS = "file:///"; public static final String HADOOP_LOCAL_FS_SCHEME = "file"; - public static String MAP_PLAN_NAME = "map.xml"; - public static String REDUCE_PLAN_NAME = "reduce.xml"; - public static String MERGE_PLAN_NAME = "merge.xml"; + public static final String MAP_PLAN_NAME = "map.xml"; + public static final String REDUCE_PLAN_NAME = "reduce.xml"; + public static final String MERGE_PLAN_NAME = "merge.xml"; public static final String INPUT_NAME = "iocontext.input.name"; public static final String HAS_MAP_WORK = "has.map.work"; public static final String HAS_REDUCE_WORK = "has.reduce.work"; @@ -293,11 +295,11 @@ public final class Utilities { public static final String HIVE_ADDED_JARS = "hive.added.jars"; public static final String VECTOR_MODE = "VECTOR_MODE"; public static final String USE_VECTORIZED_INPUT_FILE_FORMAT = "USE_VECTORIZED_INPUT_FILE_FORMAT"; - public static String MAPNAME = "Map "; - public static String REDUCENAME = "Reducer "; + public static final String MAPNAME = "Map "; + public static final String REDUCENAME = "Reducer "; @Deprecated - protected static String DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX = "mapred.dfsclient.parallelism.max"; + protected static final String DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX = "mapred.dfsclient.parallelism.max"; /** * ReduceField: @@ -603,7 +605,7 @@ public final class Utilities { public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScratchDir) { String useName = conf.get(INPUT_NAME); if (useName == null) { - useName = "mapreduce"; + useName = "mapreduce:" + hiveScratchDir; } conf.set(INPUT_NAME, useName); setMapWork(conf, w.getMapWork(), hiveScratchDir, true); @@ -767,8 +769,8 @@ public final class Utilities { // Note: When DDL supports specifying what string to represent null, // we should specify "NULL" to represent null in the temp table, and then // we can make the following translation deprecated. - public static String nullStringStorage = "\\N"; - public static String nullStringOutput = "NULL"; + public static final String nullStringStorage = "\\N"; + public static final String nullStringOutput = "NULL"; public static Random randGen = new Random(); @@ -2681,7 +2683,7 @@ public final class Utilities { setColumnTypeList(jobConf, rowSchema, excludeVCs); } - public static String suffix = ".hashtable"; + public static final String suffix = ".hashtable"; public static Path generatePath(Path basePath, String dumpFilePrefix, Byte tag, String bigBucketFileName) { @@ -3162,6 +3164,7 @@ public final class Utilities { Set<Path> pathsProcessed = new HashSet<Path>(); List<Path> pathsToAdd = new LinkedList<Path>(); + LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState(); // AliasToWork contains all the aliases Collection<String> aliasToWork = work.getAliasToWork().keySet(); if (!skipDummy) { @@ -3182,6 +3185,9 @@ public final class Utilities { boolean hasLogged = false; Path path = null; for (Map.Entry<Path, ArrayList<String>> e : pathToAliases) { + if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) + throw new IOException("Operation is Canceled. "); + Path file = e.getKey(); List<String> aliases = e.getValue(); if (aliases.contains(alias)) { @@ -3235,6 +3241,8 @@ public final class Utilities { List<Path> finalPathsToAdd = new LinkedList<>(); List<Future<Path>> futures = new LinkedList<>(); for (final Path path : pathsToAdd) { + if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) + throw new IOException("Operation is Canceled. "); if (pool == null) { finalPathsToAdd.add(new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call()); } else { @@ -3244,6 +3252,8 @@ public final class Utilities { if (pool != null) { for (Future<Path> future : futures) { + if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) + throw new IOException("Operation is Canceled. "); finalPathsToAdd.add(future.get()); } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java new file mode 100644 index 0000000..4ad4f98 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.exec.mapjoin; + +/** + * When this Error is thrown, better not retry. + */ +public class MapJoinMemoryExhaustionError extends Error { + private static final long serialVersionUID = 3678353959830506881L; + public MapJoinMemoryExhaustionError(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java deleted file mode 100644 index dbe00b6..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.exec.mapjoin; - -import org.apache.hadoop.hive.ql.metadata.HiveException; - - - -public class MapJoinMemoryExhaustionException extends HiveException { - private static final long serialVersionUID = 3678353959830506881L; - public MapJoinMemoryExhaustionException(String msg) { - super(msg); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java index 7fc3226..d5e81e1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java @@ -86,17 +86,17 @@ public class MapJoinMemoryExhaustionHandler { * * @param tableContainerSize currently table container size * @param numRows number of rows processed - * @throws MapJoinMemoryExhaustionException + * @throws MapJoinMemoryExhaustionError */ public void checkMemoryStatus(long tableContainerSize, long numRows) - throws MapJoinMemoryExhaustionException { + throws MapJoinMemoryExhaustionError { long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed(); double percentage = (double) usedMemory / (double) maxHeapSize; String msg = Utilities.now() + "\tProcessing rows:\t" + numRows + "\tHashtable size:\t" + tableContainerSize + "\tMemory usage:\t" + usedMemory + "\tpercentage:\t" + percentageNumberFormat.format(percentage); console.printInfo(msg); if(percentage > maxMemoryUsage) { - throw new MapJoinMemoryExhaustionException(msg); + throw new MapJoinMemoryExhaustionError(msg); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java index 1945163..93a36c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java @@ -32,6 +32,7 @@ import java.util.Properties; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; +import org.apache.hadoop.hive.ql.log.LogDivertAppenderForTest; import org.apache.hadoop.mapreduce.MRJobConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +69,7 @@ import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.HiveKey; import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl; import org.apache.hadoop.hive.ql.io.IOPrepareCache; +import org.apache.hadoop.hive.ql.log.LogDivertAppender; import org.apache.hadoop.hive.ql.log.NullAppender; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.FetchWork; @@ -116,6 +118,8 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop protected transient JobConf job; public static MemoryMXBean memoryMXBean; protected HadoopJobExecHelper jobExecHelper; + private transient boolean isShutdown = false; + private transient boolean jobKilled = false; protected static transient final Logger LOG = LoggerFactory.getLogger(ExecDriver.class); @@ -412,10 +416,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop if (driverContext.isShutdown()) { LOG.warn("Task was cancelled"); - if (rj != null) { - rj.killJob(); - rj = null; - } + killJob(); return 5; } @@ -448,7 +449,7 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop if (rj != null) { if (returnVal != 0) { - rj.killJob(); + killJob(); } jobID = rj.getID().toString(); } @@ -632,6 +633,8 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop private static void setupChildLog4j(Configuration conf) { try { LogUtils.initHiveExecLog4j(); + LogDivertAppender.registerRoutingAppender(conf); + LogDivertAppenderForTest.registerRoutingAppenderIfInTest(conf); } catch (LogInitializationException e) { System.err.println(e.getMessage()); } @@ -703,6 +706,8 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop } System.setProperty(HiveConf.ConfVars.HIVEQUERYID.toString(), queryId); + LogUtils.registerLoggingContext(conf); + if (noLog) { // If started from main(), and noLog is on, we should not output // any logs. To turn the log on, please set -Dtest.silent=false @@ -853,22 +858,37 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop ss.getHiveHistory().logPlanProgress(queryPlan); } + public boolean isTaskShutdown() { + return isShutdown; + } + @Override public void shutdown() { super.shutdown(); - if (rj != null) { + killJob(); + isShutdown = true; + } + + @Override + public String getExternalHandle() { + return this.jobID; + } + + private void killJob() { + boolean needToKillJob = false; + synchronized(this) { + if (rj != null && !jobKilled) { + jobKilled = true; + needToKillJob = true; + } + } + if (needToKillJob) { try { rj.killJob(); } catch (Exception e) { LOG.warn("failed to kill job " + rj.getID(), e); } - rj = null; } } - - @Override - public String getExternalHandle() { - return this.jobID; - } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 591ea97..c5d4f9a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -60,7 +60,7 @@ import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; -import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -69,7 +69,6 @@ import org.apache.hadoop.hive.ql.plan.FetchWork; import org.apache.hadoop.hive.ql.plan.MapredLocalWork; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.api.StageType; -import org.apache.hadoop.hive.ql.session.OperationLog; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionState.LogHelper; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; @@ -327,18 +326,8 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab CachingPrintStream errPrintStream = new CachingPrintStream(System.err); - StreamPrinter outPrinter; - StreamPrinter errPrinter; - OperationLog operationLog = OperationLog.getCurrentOperationLog(); - if (operationLog != null) { - outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out, - operationLog.getPrintStream()); - errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream, - operationLog.getPrintStream()); - } else { - outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out); - errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream); - } + StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), null, System.out); + StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), null, errPrintStream); outPrinter.start(); errPrinter.start(); @@ -395,7 +384,7 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab + Utilities.showTime(elapsed) + " sec."); } catch (Throwable throwable) { if (throwable instanceof OutOfMemoryError - || (throwable instanceof MapJoinMemoryExhaustionException)) { + || (throwable instanceof MapJoinMemoryExhaustionError)) { l4j.error("Hive Runtime Error: Map local work exhausted memory", throwable); return 3; } else { http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java index 04e24bd..360b639 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java @@ -24,6 +24,8 @@ import java.util.Map; import java.util.TreeMap; import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hive.common.MemoryEstimate; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; @@ -46,7 +48,7 @@ import com.google.common.annotations.VisibleForTesting; * Initially inspired by HPPC LongLongOpenHashMap; however, the code is almost completely reworked * and there's very little in common left save for quadratic probing (and that with some changes). */ -public final class BytesBytesMultiHashMap { +public final class BytesBytesMultiHashMap implements MemoryEstimate { public static final Logger LOG = LoggerFactory.getLogger(BytesBytesMultiHashMap.class); /* @@ -521,7 +523,18 @@ public final class BytesBytesMultiHashMap { * @return number of bytes */ public long memorySize() { - return writeBuffers.size() + refs.length * 8 + 100; + return getEstimatedMemorySize(); + } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += writeBuffers.getEstimatedMemorySize(); + size += jdm.lengthForLongArrayOfSize(refs.length); + // 11 primitive1 fields, 2 refs above with alignment + size += JavaDataModel.alignUp(15 * jdm.primitive1(), jdm.memoryAlign()); + return size; } public void seal() { http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java index a3bccc6..adf1a90 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java @@ -53,7 +53,7 @@ import org.apache.hadoop.io.Writable; public class HashMapWrapper extends AbstractMapJoinTableContainer implements Serializable { private static final long serialVersionUID = 1L; protected static final Logger LOG = LoggerFactory.getLogger(HashMapWrapper.class); - + private static final long DEFAULT_HASHMAP_ENTRY_SIZE = 1024L; // default threshold for using main memory based HashMap private static final int THRESHOLD = 1000000; private static final float LOADFACTOR = 0.75f; @@ -140,6 +140,14 @@ public class HashMapWrapper extends AbstractMapJoinTableContainer implements Ser return new GetAdaptor(keyTypeFromLoader); } + @Override + public long getEstimatedMemorySize() { + // TODO: Key and Values are Object[] which can be eagerly deserialized or lazily deserialized. To accurately + // estimate the entry size, every possible Objects in Key, Value should implement MemoryEstimate interface which + // is very intrusive. So assuming default entry size here. + return size() * DEFAULT_HASHMAP_ENTRY_SIZE; + } + private class GetAdaptor implements ReusableGetAdaptor { private Object[] currentKey; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java index 04e89e8..6523f00 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java @@ -118,6 +118,11 @@ public class HybridHashTableContainer private final String spillLocalDirs; + @Override + public long getEstimatedMemorySize() { + return memoryUsed; + } + /** * This class encapsulates the triplet together since they are closely related to each other * The triplet: hashmap (either in memory or on disk), small table container, big table container http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java index c86e5f5..014d17a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.common.MemoryEstimate; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; @@ -33,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.util.JavaDataModel; import org.apache.hadoop.hive.serde2.ByteStream.Output; import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput; import org.apache.hadoop.hive.serde2.AbstractSerDe; @@ -72,6 +74,11 @@ public class MapJoinBytesTableContainer implements MapJoinTableContainer, MapJoinTableContainerDirectAccess { private static final Logger LOG = LoggerFactory.getLogger(MapJoinTableContainer.class); + // TODO: For object inspector fields, assigning 16KB for now. To better estimate the memory size every + // object inspectors have to implement MemoryEstimate interface which is a lot of change with little benefit compared + // to writing an instrumentation agent for object size estimation + public static final long DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE = 16 * 1024L; + private final BytesBytesMultiHashMap hashMap; /** The OI used to deserialize values. We never deserialize keys. */ private LazyBinaryStructObjectInspector internalValueOi; @@ -147,7 +154,7 @@ public class MapJoinBytesTableContainer this.notNullMarkers = notNullMarkers; } - public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource { + public static interface KeyValueHelper extends BytesBytesMultiHashMap.KvSource, MemoryEstimate { void setKeyValue(Writable key, Writable val) throws SerDeException; /** Get hash value from the key. */ int getHashFromKey() throws SerDeException; @@ -216,6 +223,22 @@ public class MapJoinBytesTableContainer public int getHashFromKey() throws SerDeException { throw new UnsupportedOperationException("Not supported for MapJoinBytesTableContainer"); } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += keySerDe == null ? 0 : jdm.object(); + size += valSerDe == null ? 0 : jdm.object(); + size += keySoi == null ? 0 : DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + size += valSoi == null ? 0 : DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + size += keyOis == null ? 0 : jdm.arrayList() + keyOis.size() * DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + size += valOis == null ? 0 : jdm.arrayList() + valOis.size() * DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + size += keyObjs == null ? 0 : jdm.array() + keyObjs.length * jdm.object(); + size += valObjs == null ? 0 : jdm.array() + valObjs.length * jdm.object(); + size += jdm.primitive1(); + return size; + } } static class LazyBinaryKvWriter implements KeyValueHelper { @@ -319,6 +342,15 @@ public class MapJoinBytesTableContainer aliasFilter &= filterGetter.getShort(); return aliasFilter; } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += (4 * jdm.object()); + size += jdm.primitive1(); + return size; + } } /* @@ -361,6 +393,15 @@ public class MapJoinBytesTableContainer int keyLength = key.getLength(); return HashCodeUtil.murmurHash(keyBytes, 0, keyLength); } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += jdm.object() + (key == null ? 0 : key.getCapacity()); + size += jdm.object() + (val == null ? 0 : val.getCapacity()); + return size; + } } @Override @@ -768,4 +809,19 @@ public class MapJoinBytesTableContainer public int size() { return hashMap.size(); } + + @Override + public long getEstimatedMemorySize() { + JavaDataModel jdm = JavaDataModel.get(); + long size = 0; + size += hashMap.getEstimatedMemorySize(); + size += directWriteHelper == null ? 0 : directWriteHelper.getEstimatedMemorySize(); + size += writeHelper == null ? 0 : writeHelper.getEstimatedMemorySize(); + size += sortableSortOrders == null ? 0 : jdm.lengthForBooleanArrayOfSize(sortableSortOrders.length); + size += nullMarkers == null ? 0 : jdm.lengthForByteArrayOfSize(nullMarkers.length); + size += notNullMarkers == null ? 0 : jdm.lengthForByteArrayOfSize(notNullMarkers.length); + size += jdm.arrayList(); // empty list + size += DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE; + return size; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java index 6d71fef..5ca5ff6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.JoinUtil; +import org.apache.hadoop.hive.common.MemoryEstimate; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper; import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter; @@ -31,7 +32,7 @@ import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Writable; -public interface MapJoinTableContainer { +public interface MapJoinTableContainer extends MemoryEstimate { /** * Retrieve rows from hashtable key by key, one key at a time, w/o copying the structures * for each key. "Old" HashMapWrapper will still create/retrieve new objects for java HashMap; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java index 4c69899..4ca8f93 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java @@ -356,12 +356,9 @@ public class RemoteHiveSparkClient implements HiveSparkClient { private void logConfigurations(JobConf localJobConf) { if (LOG.isInfoEnabled()) { LOG.info("Logging job configuration: "); - StringWriter outWriter = new StringWriter(); - try { - Configuration.dumpConfiguration(localJobConf, outWriter); - } catch (IOException e) { - LOG.warn("Error logging job configuration", e); - } + StringBuilder outWriter = new StringBuilder(); + // redact sensitive information before logging + HiveConfUtil.dumpConfig(localJobConf, outWriter); LOG.info(outWriter.toString()); } } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java index 12a76a7..5f85f9e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java @@ -298,12 +298,13 @@ public class SparkPlanGenerator { throw new IllegalArgumentException(msg, e); } if (work instanceof MapWork) { + MapWork mapWork = (MapWork) work; cloned.setBoolean("mapred.task.is.map", true); - List<Path> inputPaths = Utilities.getInputPaths(cloned, (MapWork) work, + List<Path> inputPaths = Utilities.getInputPaths(cloned, mapWork, scratchDir, context, false); Utilities.setInputPaths(cloned, inputPaths); - Utilities.setMapWork(cloned, (MapWork) work, scratchDir, false); - Utilities.createTmpDirs(cloned, (MapWork) work); + Utilities.setMapWork(cloned, mapWork, scratchDir, false); + Utilities.createTmpDirs(cloned, mapWork); if (work instanceof MergeFileWork) { MergeFileWork mergeFileWork = (MergeFileWork) work; cloned.set(Utilities.MAPRED_MAPPER_CLASS, MergeFileMapper.class.getName()); @@ -313,9 +314,21 @@ public class SparkPlanGenerator { } else { cloned.set(Utilities.MAPRED_MAPPER_CLASS, ExecMapper.class.getName()); } - if (((MapWork) work).getMinSplitSize() != null) { + if (mapWork.getMaxSplitSize() != null) { + HiveConf.setLongVar(cloned, HiveConf.ConfVars.MAPREDMAXSPLITSIZE, + mapWork.getMaxSplitSize()); + } + if (mapWork.getMinSplitSize() != null) { HiveConf.setLongVar(cloned, HiveConf.ConfVars.MAPREDMINSPLITSIZE, - ((MapWork) work).getMinSplitSize()); + mapWork.getMinSplitSize()); + } + if (mapWork.getMinSplitSizePerNode() != null) { + HiveConf.setLongVar(cloned, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE, + mapWork.getMinSplitSizePerNode()); + } + if (mapWork.getMinSplitSizePerRack() != null) { + HiveConf.setLongVar(cloned, HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK, + mapWork.getMinSplitSizePerRack()); } // remember the JobConf cloned for each MapWork, so we won't clone for it again workToJobConf.put(work, cloned); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java index 27bed9c..7eaad18 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java @@ -95,6 +95,7 @@ public class SparkReduceRecordHandler extends SparkRecordHandler { // number of columns pertaining to keys in a vectorized row batch private int keysColumnOffset; private static final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE; + private static final int BATCH_BYTES = VectorizedRowBatch.DEFAULT_BYTES; private StructObjectInspector keyStructInspector; private StructObjectInspector[] valueStructInspectors; /* this is only used in the error code path */ @@ -373,6 +374,7 @@ public class SparkReduceRecordHandler extends SparkRecordHandler { } int rowIdx = 0; + int batchBytes = 0; try { while (values.hasNext()) { /* deserialize value into columns */ @@ -381,11 +383,13 @@ public class SparkReduceRecordHandler extends SparkRecordHandler { VectorizedBatchUtil.addRowToBatchFrom(valueObj, valueStructInspectors[tag], rowIdx, keysColumnOffset, batch, buffer); + batchBytes += valueWritable.getLength(); rowIdx++; - if (rowIdx >= BATCH_SIZE) { + if (rowIdx >= BATCH_SIZE || batchBytes > BATCH_BYTES) { VectorizedBatchUtil.setBatchSize(batch, rowIdx); reducer.process(batch, tag); rowIdx = 0; + batchBytes = 0; if (isLogInfoEnabled) { logMemoryInfo(); } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java index 4c01329..98b1605 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java @@ -83,6 +83,8 @@ public class SparkTask extends Task<SparkWork> { private transient int totalTaskCount; private transient int failedTaskCount; private transient List<Integer> stageIds; + private transient SparkJobRef jobRef = null; + private transient boolean isShutdown = false; @Override public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext driverContext, @@ -107,7 +109,7 @@ public class SparkTask extends Task<SparkWork> { perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB); - SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork); + jobRef = sparkSession.submit(driverContext, sparkWork); perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB); addToHistory(jobRef); @@ -127,8 +129,14 @@ public class SparkTask extends Task<SparkWork> { // TODO: If the timeout is because of lack of resources in the cluster, we should // ideally also cancel the app request here. But w/o facilities from Spark or YARN, // it's difficult to do it on hive side alone. See HIVE-12650. + LOG.info("Failed to submit Spark job " + sparkJobID); + jobRef.cancelJob(); + } else if (rc == 4) { + LOG.info("The number of tasks reaches above the limit " + conf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS) + + ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID ); jobRef.cancelJob(); } + if (this.jobID == null) { this.jobID = sparkJobStatus.getAppID(); } @@ -290,6 +298,23 @@ public class SparkTask extends Task<SparkWork> { return finishTime; } + public boolean isTaskShutdown() { + return isShutdown; + } + + @Override + public void shutdown() { + super.shutdown(); + if (jobRef != null && !isShutdown) { + try { + jobRef.cancelJob(); + } catch (Exception e) { + LOG.warn("failed to kill job", e); + } + } + isShutdown = true; + } + /** * Set the number of reducers for the spark work. */ http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java index 7d18c0a..eb9883a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java @@ -78,7 +78,7 @@ public class SparkUtilities { Path localFile = new Path(source.getPath()); Path remoteFile = new Path(SessionState.get().getSparkSession().getHDFSSessionDir(), getFileName(source)); - FileSystem fileSystem = FileSystem.get(conf); + FileSystem fileSystem = FileSystem.get(remoteFile.toUri(), conf); // Overwrite if the remote file already exists. Whether the file can be added // on executor is up to spark, i.e. spark.files.overwrite fileSystem.copyFromLocalFile(false, true, localFile, remoteFile); @@ -92,7 +92,7 @@ public class SparkUtilities { String deployMode = sparkConf.contains("spark.submit.deployMode") ? sparkConf.get("spark.submit.deployMode") : null; return SparkClientUtilities.isYarnClusterMode(master, deployMode) && - !source.getScheme().equals("hdfs"); + !(source.getScheme().equals("hdfs") || source.getScheme().equals("viewfs")); } private static String getFileName(URI uri) { http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java index dd73f3e..9dfb65e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java @@ -34,7 +34,8 @@ import org.apache.spark.JobExecutionStatus; * It print current job status to console and sleep current thread between monitor interval. */ public class RemoteSparkJobMonitor extends SparkJobMonitor { - + private int sparkJobMaxTaskCount = -1; + private int totalTaskCount = 0; private RemoteSparkJobStatus sparkJobStatus; private final HiveConf hiveConf; @@ -42,6 +43,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor { super(hiveConf); this.sparkJobStatus = sparkJobStatus; this.hiveConf = hiveConf; + sparkJobMaxTaskCount = hiveConf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS); } @Override @@ -100,6 +102,17 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor { } else { console.logInfo(format); } + } else { + // Count the number of tasks, and kill application if it goes beyond the limit. + if (sparkJobMaxTaskCount != -1 && totalTaskCount == 0) { + totalTaskCount = getTotalTaskCount(progressMap); + if (totalTaskCount > sparkJobMaxTaskCount) { + rc = 4; + done = true; + console.printInfo("\nThe total number of task in the Spark job [" + totalTaskCount + "] is greater than the limit [" + + sparkJobMaxTaskCount + "]. The Spark job will be cancelled."); + } + } } printStatus(progressMap, lastProgressMap); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java index 0b224f2..41730b5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java @@ -66,7 +66,6 @@ abstract class SparkJobMonitor { private int lines = 0; private final PrintStream out; - private static final int COLUMN_1_WIDTH = 16; private static final String HEADER_FORMAT = "%16s%10s %13s %5s %9s %7s %7s %6s "; private static final String STAGE_FORMAT = "%-16s%10s %13s %5s %9s %7s %7s %6s "; @@ -173,6 +172,15 @@ abstract class SparkJobMonitor { lastPrintTime = System.currentTimeMillis(); } + protected int getTotalTaskCount(Map<String, SparkStageProgress> progressMap) { + int totalTasks = 0; + for (SparkStageProgress progress: progressMap.values() ) { + totalTasks += progress.getTotalTaskCount(); + } + + return totalTasks; + } + private String getReport(Map<String, SparkStageProgress> progressMap) { StringBuilder reportBuffer = new StringBuilder(); SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java index 951dbb4..67db303 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java @@ -67,6 +67,9 @@ public class RemoteSparkJobStatus implements SparkJobStatus { return getAppID.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS); } catch (Exception e) { LOG.warn("Failed to get APP ID.", e); + if (Thread.interrupted()) { + error = e; + } return null; } } @@ -186,6 +189,9 @@ public class RemoteSparkJobStatus implements SparkJobStatus { } public JobHandle.State getRemoteJobState() { + if (error != null) { + return JobHandle.State.FAILED; + } return jobHandle.getState(); } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java index aa2dfc7..6497495 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java @@ -336,6 +336,11 @@ public class DagUtils { setupAutoReducerParallelism(edgeProp, w); break; } + case CUSTOM_SIMPLE_EDGE: { + setupQuickStart(edgeProp, w); + break; + } + default: // nothing } @@ -965,10 +970,9 @@ public class DagUtils { * @return true if the file names match else returns false. * @throws IOException when any file system related call fails */ - private boolean checkPreExisting(Path src, Path dest, Configuration conf) + private boolean checkPreExisting(FileSystem sourceFS, Path src, Path dest, Configuration conf) throws IOException { FileSystem destFS = dest.getFileSystem(conf); - FileSystem sourceFS = src.getFileSystem(conf); FileStatus destStatus = FileUtils.getFileStatusOrNull(destFS, dest); if (destStatus != null) { return (sourceFS.getFileStatus(src).getLen() == destStatus.getLen()); @@ -988,7 +992,9 @@ public class DagUtils { public LocalResource localizeResource( Path src, Path dest, LocalResourceType type, Configuration conf) throws IOException { FileSystem destFS = dest.getFileSystem(conf); - if (src != null && !checkPreExisting(src, dest, conf)) { + // We call copyFromLocal below, so we basically assume src is a local file. + FileSystem srcFs = FileSystem.getLocal(conf); + if (src != null && !checkPreExisting(srcFs, src, dest, conf)) { // copy the src to the destination and create local resource. // do not overwrite. String srcStr = src.toString(); @@ -1000,7 +1006,7 @@ public class DagUtils { // authoritative one), don't wait infinitely for the notifier, just wait a little bit // and check HDFS before and after. if (notifierOld != null - && checkOrWaitForTheFile(src, dest, conf, notifierOld, 1, 150, false)) { + && checkOrWaitForTheFile(srcFs, src, dest, conf, notifierOld, 1, 150, false)) { return createLocalResource(destFS, dest, type, LocalResourceVisibility.PRIVATE); } try { @@ -1022,7 +1028,7 @@ public class DagUtils { conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, TimeUnit.MILLISECONDS); // Only log on the first wait, and check after wait on the last iteration. if (!checkOrWaitForTheFile( - src, dest, conf, notifierOld, waitAttempts, sleepInterval, true)) { + srcFs, src, dest, conf, notifierOld, waitAttempts, sleepInterval, true)) { LOG.error("Could not find the jar that was being uploaded"); throw new IOException("Previous writer likely failed to write " + dest + ". Failing because I am unlikely to write too."); @@ -1037,10 +1043,10 @@ public class DagUtils { LocalResourceVisibility.PRIVATE); } - public boolean checkOrWaitForTheFile(Path src, Path dest, Configuration conf, Object notifier, - int waitAttempts, long sleepInterval, boolean doLog) throws IOException { + public boolean checkOrWaitForTheFile(FileSystem srcFs, Path src, Path dest, Configuration conf, + Object notifier, int waitAttempts, long sleepInterval, boolean doLog) throws IOException { for (int i = 0; i < waitAttempts; i++) { - if (checkPreExisting(src, dest, conf)) return true; + if (checkPreExisting(srcFs, src, dest, conf)) return true; if (doLog && i == 0) { LOG.info("Waiting for the file " + dest + " (" + waitAttempts + " attempts, with " + sleepInterval + "ms interval)"); @@ -1059,7 +1065,7 @@ public class DagUtils { throw new IOException(interruptedException); } } - return checkPreExisting(src, dest, conf); // One last check. + return checkPreExisting(srcFs, src, dest, conf); // One last check. } /** @@ -1265,6 +1271,20 @@ public class DagUtils { } } + private void setupQuickStart(TezEdgeProperty edgeProp, Vertex v) + throws IOException { + if (!edgeProp.isSlowStart()) { + Configuration pluginConf = new Configuration(false); + VertexManagerPluginDescriptor desc = + VertexManagerPluginDescriptor.create(ShuffleVertexManager.class.getName()); + pluginConf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, 0); + pluginConf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, 0); + UserPayload payload = TezUtils.createUserPayloadFromConf(pluginConf); + desc.setUserPayload(payload); + v.setVertexManagerPlugin(desc); + } + } + public String createDagName(Configuration conf, QueryPlan plan) { String name = getUserSpecifiedDagName(conf); if (name == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java index 7b13e90..7011d23 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -146,7 +147,20 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable } nwayConf.setNumberOfPartitions(numPartitions); } - + final float inflationFactor = HiveConf.getFloatVar(hconf, HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR); + final long memoryCheckInterval = HiveConf.getLongVar(hconf, + HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL); + final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, HiveConf.ConfVars.HIVE_EXECUTION_MODE)); + long numEntries = 0; + long noCondTaskSize = desc.getNoConditionalTaskSize(); + boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 0 && memoryCheckInterval > 0; + if (!doMemCheck) { + LOG.info("Not doing hash table memory monitoring. isLlap: {} inflationFactor: {} noConditionalTaskSize: {} " + + "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, memoryCheckInterval); + } else { + LOG.info("Memory monitoring for hash table loader enabled. noconditionalTaskSize: {} inflationFactor: {} ", + noCondTaskSize, inflationFactor); + } for (int pos = 0; pos < mapJoinTables.length; pos++) { if (pos == desc.getPosBigTable()) { continue; @@ -205,12 +219,32 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable tableContainer = new HashMapWrapper(hconf, keyCount); } - LOG.info("Using tableContainer " + tableContainer.getClass().getSimpleName()); + LOG.info("Using tableContainer: " + tableContainer.getClass().getSimpleName()); tableContainer.setSerde(keyCtx, valCtx); while (kvReader.next()) { - tableContainer.putRow( - (Writable)kvReader.getCurrentKey(), (Writable)kvReader.getCurrentValue()); + tableContainer.putRow((Writable) kvReader.getCurrentKey(), (Writable) kvReader.getCurrentValue()); + numEntries++; + if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) { + final long estMemUsage = tableContainer.getEstimatedMemorySize(); + final long threshold = (long) (inflationFactor * noCondTaskSize); + // guard against poor configuration of noconditional task size. We let hash table grow till 2/3'rd memory + // available for container/executor + final long effectiveThreshold = (long) Math.max(threshold, (2.0/3.0) * desc.getMaxMemoryAvailable()); + if (estMemUsage > effectiveThreshold) { + String msg = "Hash table loading exceeded memory limits." + + " estimatedMemoryUsage: " + estMemUsage + " noconditionalTaskSize: " + noCondTaskSize + + " inflationFactor: " + inflationFactor + " threshold: " + threshold + + " effectiveThreshold: " + effectiveThreshold; + LOG.error(msg); + throw new MapJoinMemoryExhaustionError(msg); + } else { + if (LOG.isInfoEnabled()) { + LOG.info("Checking hash table loader memory usage.. numEntries: {} estimatedMemoryUsage: {} " + + "effectiveThreshold: {}", numEntries, estMemUsage, effectiveThreshold); + } + } + } } tableContainer.seal(); LOG.info("Finished loading hashtable using " + tableContainer.getClass() + ". Small table position: " + pos); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java index ad8b9e0..60660ac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java @@ -103,6 +103,8 @@ public class ReduceRecordSource implements RecordSource { // number of columns pertaining to keys in a vectorized row batch private int firstValueColumnOffset; + private final int BATCH_BYTES = VectorizedRowBatch.DEFAULT_BYTES; + private StructObjectInspector keyStructInspector; private StructObjectInspector valueStructInspectors; @@ -190,7 +192,9 @@ public class ReduceRecordSource implements RecordSource { VectorizedBatchUtil.typeInfosFromStructObjectInspector( keyStructInspector), /* useExternalBuffer */ true, - binarySortableSerDe.getSortOrders())); + binarySortableSerDe.getSortOrders(), + binarySortableSerDe.getNullMarkers(), + binarySortableSerDe.getNotNullMarkers())); keyBinarySortableDeserializeToRow.init(0); final int valuesSize = valueStructInspectors.getAllStructFieldRefs().size(); @@ -435,6 +439,7 @@ public class ReduceRecordSource implements RecordSource { final int maxSize = batch.getMaxSize(); Preconditions.checkState(maxSize > 0); int rowIdx = 0; + int batchBytes = keyBytes.length; try { for (Object value : values) { if (valueLazyBinaryDeserializeToRow != null) { @@ -442,6 +447,7 @@ public class ReduceRecordSource implements RecordSource { BytesWritable valueWritable = (BytesWritable) value; byte[] valueBytes = valueWritable.getBytes(); int valueLength = valueWritable.getLength(); + batchBytes += valueLength; // l4j.info("ReduceRecordSource processVectorGroup valueBytes " + valueLength + " " + // VectorizedBatchUtil.displayBytes(valueBytes, 0, valueLength)); @@ -450,7 +456,7 @@ public class ReduceRecordSource implements RecordSource { valueLazyBinaryDeserializeToRow.deserialize(batch, rowIdx); } rowIdx++; - if (rowIdx >= maxSize) { + if (rowIdx >= maxSize || batchBytes >= BATCH_BYTES) { // Batch is full. batch.size = rowIdx; @@ -462,6 +468,7 @@ public class ReduceRecordSource implements RecordSource { batch.cols[i].reset(); } rowIdx = 0; + batchBytes = 0; } } if (rowIdx > 0) { http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java index 486d43a..4242262 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError; +import org.apache.tez.runtime.api.TaskFailureType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -41,6 +43,8 @@ import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.ProcessorContext; import org.apache.tez.runtime.library.api.KeyValueWriter; +import com.google.common.base.Throwables; + /** * Hive processor for Tez that forms the vertices in Tez and processes the data. * Does what ExecMapper and ExecReducer does for hive in MR framework. @@ -189,8 +193,11 @@ public class TezProcessor extends AbstractLogicalIOProcessor { } catch (Throwable t) { originalThrowable = t; } finally { - if (originalThrowable != null && originalThrowable instanceof Error) { - LOG.error(StringUtils.stringifyException(originalThrowable)); + if (originalThrowable != null && (originalThrowable instanceof Error || + Throwables.getRootCause(originalThrowable) instanceof Error)) { + LOG.error("Cannot recover from this FATAL error", StringUtils.stringifyException(originalThrowable)); + getContext().reportFailure(TaskFailureType.FATAL, originalThrowable, + "Cannot recover from this error"); throw new RuntimeException(originalThrowable); } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 8f45947..b4d8ffa 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -76,6 +76,7 @@ public class TezSessionPoolManager { private static final Logger LOG = LoggerFactory.getLogger(TezSessionPoolManager.class); private static final Random rdm = new Random(); + private volatile SessionState initSessionState; private BlockingQueue<TezSessionPoolSession> defaultQueuePool; /** Priority queue sorted by expiration time of live sessions that could be expired. */ @@ -136,6 +137,8 @@ public class TezSessionPoolManager { public void startPool() throws Exception { if (initialSessions.isEmpty()) return; + // Hive SessionState available at this point. + initSessionState = SessionState.get(); int threadCount = Math.min(initialSessions.size(), HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS)); Preconditions.checkArgument(threadCount > 0); @@ -259,13 +262,27 @@ public class TezSessionPoolManager { expirationThread = new Thread(new Runnable() { @Override public void run() { - runExpirationThread(); + try { + SessionState.setCurrentSessionState(initSessionState); + runExpirationThread(); + } catch (Exception e) { + LOG.warn("Exception in TezSessionPool-expiration thread. Thread will shut down", e); + } finally { + LOG.info("TezSessionPool-expiration thread exiting"); + } } }, "TezSessionPool-expiration"); restartThread = new Thread(new Runnable() { @Override public void run() { - runRestartThread(); + try { + SessionState.setCurrentSessionState(initSessionState); + runRestartThread(); + } catch (Exception e) { + LOG.warn("Exception in TezSessionPool-cleanup thread. Thread will shut down", e); + } finally { + LOG.info("TezSessionPool-cleanup thread exiting"); + } } }, "TezSessionPool-cleanup"); } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index ed1ba9c..036e918 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -345,6 +345,7 @@ public class TezSessionState { String user, final Configuration conf) throws IOException { // TODO: parts of this should be moved out of TezSession to reuse the clients, but there's // no good place for that right now (HIVE-13698). + // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool. SessionState session = SessionState.get(); boolean isInHs2 = session != null && session.isHiveServerQuery(); Token<LlapTokenIdentifier> token = null; @@ -438,6 +439,7 @@ public class TezSessionState { private void setupSessionAcls(Configuration tezConf, HiveConf hiveConf) throws IOException { + // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool. String user = SessionState.getUserFromAuthenticator(); UserGroupInformation loginUserUgi = UserGroupInformation.getLoginUser(); String loginUser = @@ -451,6 +453,7 @@ public class TezSessionState { TezConfiguration.TEZ_AM_MODIFY_ACLS, addHs2User, user, loginUser); if (LOG.isDebugEnabled()) { + // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool. LOG.debug( "Setting Tez Session access for sessionId={} with viewAclString={}, modifyStr={}", SessionState.get().getSessionId(), viewStr, modifyStr); @@ -592,6 +595,7 @@ public class TezSessionState { */ private Path createTezDir(String sessionId) throws IOException { // tez needs its own scratch dir (per session) + // TODO: De-link from SessionState. A TezSession can be linked to different Hive Sessions via the pool. Path tezDir = new Path(SessionState.get().getHdfsScratchDirURIString(), TEZ_DIR); tezDir = new Path(tezDir, sessionId); FileSystem fs = tezDir.getFileSystem(conf); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 6c8bf29..1c84c6a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -18,6 +18,10 @@ package org.apache.hadoop.hive.ql.exec.tez; +import java.io.Serializable; +import org.apache.hadoop.hive.ql.exec.ConditionalTask; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator; + import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -324,6 +328,14 @@ public class TezTask extends Task<TezWork> { } } + void checkOutputSpec(BaseWork work, JobConf jc) throws IOException { + for (Operator<?> op : work.getAllOperators()) { + if (op instanceof FileSinkOperator) { + ((FileSinkOperator) op).checkOutputSpecs(null, jc); + } + } + } + DAG build(JobConf conf, TezWork work, Path scratchDir, LocalResource appJarLr, List<LocalResource> additionalLr, Context ctx) throws Exception { @@ -357,7 +369,6 @@ public class TezTask extends Task<TezWork> { setAccessControlsForCurrentUser(dag, queryPlan.getQueryId(), conf); for (BaseWork w: ws) { - boolean isFinal = work.getLeaves().contains(w); // translate work to vertex @@ -379,6 +390,8 @@ public class TezTask extends Task<TezWork> { children.add(v); } } + JobConf parentConf = workToConf.get(unionWorkItems.get(0)); + checkOutputSpec(w, parentConf); // create VertexGroup Vertex[] vertexArray = new Vertex[unionWorkItems.size()]; @@ -391,7 +404,7 @@ public class TezTask extends Task<TezWork> { // For a vertex group, all Outputs use the same Key-class, Val-class and partitioner. // Pick any one source vertex to figure out the Edge configuration. - JobConf parentConf = workToConf.get(unionWorkItems.get(0)); + // now hook up the children for (BaseWork v: children) { @@ -404,6 +417,7 @@ public class TezTask extends Task<TezWork> { } else { // Regular vertices JobConf wxConf = utils.initializeVertexConf(conf, ctx, w); + checkOutputSpec(w, wxConf); Vertex wx = utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, fs, ctx, !isFinal, work, work.getVertexType(w)); http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java index eccbbb6..cd3404a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hive.ql.exec.tez.monitoring; import org.apache.hadoop.hive.common.log.InPlaceUpdate; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java index 1400be4..7cb74a5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hive.ql.exec.tez.monitoring; import org.apache.hadoop.hive.conf.HiveConf; @@ -11,6 +28,7 @@ import org.apache.tez.common.counters.TaskCounter; import org.apache.tez.common.counters.TezCounter; import org.apache.tez.common.counters.TezCounters; import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.TezException; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.client.DAGClient; @@ -59,24 +77,58 @@ class DAGSummary implements PrintSummary { this.hiveCounters = hiveCounters(dagClient); } + private long hiveInputRecordsFromTezCounters(String vertexName, String inputVertexName) { + // Get the counters for the input vertex. + Set<StatusGetOpts> statusOptions = new HashSet<>(1); + statusOptions.add(StatusGetOpts.GET_COUNTERS); + VertexStatus inputVertexStatus = vertexStatus(statusOptions, inputVertexName); + final TezCounters inputVertexCounters = inputVertexStatus.getVertexCounters(); + + // eg, group name TaskCounter_Map_7_OUTPUT_Reducer_8, counter name OUTPUT_RECORDS + String groupName = formattedName("TaskCounter", inputVertexName, vertexName); + String counterName = "OUTPUT_RECORDS"; + + // Do not create counter if it does not exist - + // instead fall back to default behavior for determining input records. + TezCounter tezCounter = inputVertexCounters.getGroup(groupName).findCounter(counterName, false); + if (tezCounter == null) { + return -1; + } else { + return tezCounter.getValue(); + } + } + + private long hiveInputRecordsFromHiveCounters(String inputVertexName) { + // The record count from these counters may not be correct if the input vertex has + // edges to more than one vertex, since this value counts the records going to all + // destination vertices. + + String intermediateRecordsCounterName = formattedName( + ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(), + inputVertexName + ); + String recordsOutCounterName = formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(), + inputVertexName); + return hiveCounterValue(intermediateRecordsCounterName) + hiveCounterValue(recordsOutCounterName); + } + private long hiveInputRecordsFromOtherVertices(String vertexName) { List<Vertex> inputVerticesList = dag.getVertex(vertexName).getInputVertices(); long result = 0; for (Vertex inputVertex : inputVerticesList) { - String intermediateRecordsCounterName = formattedName( - ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(), - inputVertex.getName() - ); - String recordsOutCounterName = formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(), - inputVertex.getName()); - result += ( - hiveCounterValue(intermediateRecordsCounterName) - + hiveCounterValue(recordsOutCounterName) - ); + long inputVertexRecords = hiveInputRecordsFromTezCounters(vertexName, inputVertex.getName()); + if (inputVertexRecords < 0) { + inputVertexRecords = hiveInputRecordsFromHiveCounters(inputVertex.getName()); + } + result += inputVertexRecords; } return result; } + private String formattedName(String counterName, String srcVertexName, String destVertexName) { + return String.format("%s_", counterName) + srcVertexName.replace(" ", "_") + "_OUTPUT_" + destVertexName.replace(" ", "_"); + } + private String formattedName(String counterName, String vertexName) { return String.format("%s_", counterName) + vertexName.replace(" ", "_"); } http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java index 0a28edd..fd85504 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hive.ql.exec.tez.monitoring; import org.apache.hadoop.fs.FileSystem; http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java index 81f1755..10e9f57 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java @@ -1,3 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.hive.ql.exec.tez.monitoring; import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
