http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
index af1fa66..afe1484 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
@@ -38,6 +38,7 @@ import 
org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorSparkHashTableSinkOperator;
 import 
org.apache.hadoop.hive.ql.exec.vector.VectorSparkPartitionPruningSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
+import 
org.apache.hadoop.hive.ql.exec.vector.reducesink.VectorReduceSinkCommonOperator;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
 import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
@@ -143,13 +144,17 @@ public final class OperatorFactory {
 
   public static <T extends OperatorDesc> Operator<T> getVectorOperator(
     Class<? extends Operator<?>> opClass, CompilationOpContext cContext, T 
conf,
-        VectorizationContext vContext) throws HiveException {
+        VectorizationContext vContext, Operator<? extends OperatorDesc> 
originalOp) throws HiveException {
     try {
       VectorDesc vectorDesc = ((AbstractOperatorDesc) conf).getVectorDesc();
       vectorDesc.setVectorOp(opClass);
-      Operator<T> op = (Operator<T>) opClass.getDeclaredConstructor(
-          CompilationOpContext.class, VectorizationContext.class, 
OperatorDesc.class)
-          .newInstance(cContext, vContext, conf);
+      Operator<T> op = (Operator<T>) 
opClass.getDeclaredConstructor(CompilationOpContext.class,
+          VectorizationContext.class, 
OperatorDesc.class).newInstance(cContext, vContext, conf);
+      op.setOperatorId(originalOp.getOperatorId());
+      if (op instanceof VectorReduceSinkOperator || op instanceof 
VectorReduceSinkCommonOperator) {
+        ((ReduceSinkDesc) op.getConf()).setOutputOperators(((ReduceSinkDesc) 
originalOp.getConf())
+            .getOutputOperators());
+      }
       return op;
     } catch (Exception e) {
       e.printStackTrace();
@@ -158,11 +163,12 @@ public final class OperatorFactory {
   }
 
   public static <T extends OperatorDesc> Operator<T> getVectorOperator(
-      CompilationOpContext cContext, T conf, VectorizationContext vContext) 
throws HiveException {
+      CompilationOpContext cContext, T conf, VectorizationContext vContext,
+      Operator<? extends OperatorDesc> originalOp) throws HiveException {
     Class<T> descClass = (Class<T>) conf.getClass();
-    Class<?> opClass = vectorOpvec.get(descClass);
+    Class<? extends Operator<? extends OperatorDesc>> opClass = 
vectorOpvec.get(descClass);
     if (opClass != null) {
-      return getVectorOperator(vectorOpvec.get(descClass), cContext, conf, 
vContext);
+      return getVectorOperator(opClass, cContext, conf, vContext, originalOp);
     }
     throw new HiveException("No vector operator for descriptor class " + 
descClass.getName());
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 3b10bfd..5412ef1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -107,8 +107,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> 
implements Serializable {
       srcFiles.addAll(Arrays.asList(srcs));
       LOG.debug("ReplCopyTask numFiles:" + (srcFiles == null ? "null" : 
srcFiles.size()));
 
-      boolean inheritPerms = 
conf.getBoolVar(HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
-      if (!FileUtils.mkdir(dstFs, toPath, inheritPerms, conf)) {
+      if (!FileUtils.mkdir(dstFs, toPath, conf)) {
         console.printError("Cannot make target directory: " + 
toPath.toString());
         return 2;
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
index 247d589..01a652d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SerializationUtilities.java
@@ -33,10 +33,12 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.CopyOnFirstWriteProperties;
 import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
@@ -223,6 +225,7 @@ public class SerializationUtilities {
       kryo.register(java.sql.Timestamp.class, new TimestampSerializer());
       kryo.register(Path.class, new PathSerializer());
       kryo.register(Arrays.asList("").getClass(), new 
ArraysAsListSerializer());
+      kryo.register(CopyOnFirstWriteProperties.class, new 
CopyOnFirstWritePropertiesSerializer());
 
       ((Kryo.DefaultInstantiatorStrategy) kryo.getInstantiatorStrategy())
           .setFallbackInstantiatorStrategy(
@@ -422,6 +425,33 @@ public class SerializationUtilities {
   }
 
   /**
+   * CopyOnFirstWriteProperties needs a special serializer, since it extends 
Properties,
+   * which implements Map, so MapSerializer would be used for it by default. 
Yet it has
+   * the additional 'interned' field that the standard MapSerializer doesn't 
handle
+   * properly. But FieldSerializer doesn't work for it as well, because the 
Hashtable
+   * superclass declares most of its fields transient.
+   */
+  private static class CopyOnFirstWritePropertiesSerializer extends
+      com.esotericsoftware.kryo.serializers.MapSerializer {
+
+    @Override
+    public void write(Kryo kryo, Output output, Map map) {
+      super.write(kryo, output, map);
+      CopyOnFirstWriteProperties p = (CopyOnFirstWriteProperties) map;
+      Properties ip = p.getInterned();
+      kryo.writeObjectOrNull(output, ip, Properties.class);
+    }
+
+    @Override
+    public Map read(Kryo kryo, Input input, Class<Map> type) {
+      Map map = super.read(kryo, input, type);
+      Properties ip = kryo.readObjectOrNull(input, Properties.class);
+      ((CopyOnFirstWriteProperties) map).setInterned(ip);
+      return map;
+    }
+  }
+
+  /**
    * Serializes the plan.
    *
    * @param plan The plan, such as QueryPlan, MapredWork, etc.

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
index 65227e9..65363ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
@@ -347,14 +347,15 @@ public class StatsNoJobTask extends Task<StatsNoJobWork> 
implements Serializable
     try {
 
       // Wait a while for existing tasks to terminate
-      if (!threadPool.awaitTermination(100, TimeUnit.SECONDS)) {
-        // Cancel currently executing tasks
-        threadPool.shutdownNow();
+      while (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) {
+        LOG.debug("Waiting for all stats tasks to finish...");
+      }
+      // Cancel currently executing tasks
+      threadPool.shutdownNow();
 
-        // Wait a while for tasks to respond to being cancelled
-        if (!threadPool.awaitTermination(100, TimeUnit.SECONDS)) {
-          LOG.debug("Stats collection thread pool did not terminate");
-        }
+      // Wait a while for tasks to respond to being cancelled
+      if (!threadPool.awaitTermination(100, TimeUnit.SECONDS)) {
+        LOG.debug("Stats collection thread pool did not terminate");
       }
     } catch (InterruptedException ie) {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
index a596e92..eddc31e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskRunner.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.session.OperationLog;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,7 +34,6 @@ public class TaskRunner extends Thread {
   protected Task<? extends Serializable> tsk;
   protected TaskResult result;
   protected SessionState ss;
-  private OperationLog operationLog;
   private static AtomicLong taskCounter = new AtomicLong(0);
   private static ThreadLocal<Long> taskRunnerID = new ThreadLocal<Long>() {
     @Override
@@ -74,7 +72,6 @@ public class TaskRunner extends Thread {
   public void run() {
     runner = Thread.currentThread();
     try {
-      OperationLog.setCurrentOperationLog(operationLog);
       SessionState.start(ss);
       runSequential();
     } finally {
@@ -113,8 +110,4 @@ public class TaskRunner extends Thread {
   public static long getTaskRunnerID () {
     return taskRunnerID.get();
   }
-
-  public void setOperationLog(OperationLog operationLog) {
-    this.operationLog = operationLog;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
index f3c7c77..48ae02f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
@@ -27,6 +27,7 @@ import java.util.TreeMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.llap.LlapDaemonInfo;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -105,8 +106,8 @@ public class TopNHash {
     }
 
     final boolean isTez = HiveConf.getVar(hconf, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez");
-    final boolean isLlap = isTez && HiveConf.getVar(hconf, 
HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("llap");
-    final int numExecutors = isLlap ? HiveConf.getIntVar(hconf, 
HiveConf.ConfVars.LLAP_DAEMON_NUM_EXECUTORS) : 1;
+    final boolean isLlap = LlapDaemonInfo.INSTANCE.isLlap();
+    final int numExecutors = isLlap ? 
LlapDaemonInfo.INSTANCE.getNumExecutors() : 1;
 
     // Used Memory = totalMemory() - freeMemory();
     // Total Free Memory = maxMemory() - Used Memory;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 5b5ddc3..777c119 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -76,6 +76,7 @@ import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.WordUtils;
@@ -109,6 +110,8 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Driver.DriverState;
+import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
@@ -203,7 +206,6 @@ import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.Shell;
 import org.apache.hive.common.util.ACLConfigurationParser;
 import org.apache.hive.common.util.ReflectionUtil;
 import org.slf4j.Logger;
@@ -280,11 +282,11 @@ public final class Utilities {
    * The object in the reducer are composed of these top level fields.
    */
 
-  public static String HADOOP_LOCAL_FS = "file:///";
+  public static final String HADOOP_LOCAL_FS = "file:///";
   public static final String HADOOP_LOCAL_FS_SCHEME = "file";
-  public static String MAP_PLAN_NAME = "map.xml";
-  public static String REDUCE_PLAN_NAME = "reduce.xml";
-  public static String MERGE_PLAN_NAME = "merge.xml";
+  public static final String MAP_PLAN_NAME = "map.xml";
+  public static final String REDUCE_PLAN_NAME = "reduce.xml";
+  public static final String MERGE_PLAN_NAME = "merge.xml";
   public static final String INPUT_NAME = "iocontext.input.name";
   public static final String HAS_MAP_WORK = "has.map.work";
   public static final String HAS_REDUCE_WORK = "has.reduce.work";
@@ -293,11 +295,11 @@ public final class Utilities {
   public static final String HIVE_ADDED_JARS = "hive.added.jars";
   public static final String VECTOR_MODE = "VECTOR_MODE";
   public static final String USE_VECTORIZED_INPUT_FILE_FORMAT = 
"USE_VECTORIZED_INPUT_FILE_FORMAT";
-  public static String MAPNAME = "Map ";
-  public static String REDUCENAME = "Reducer ";
+  public static final String MAPNAME = "Map ";
+  public static final String REDUCENAME = "Reducer ";
 
   @Deprecated
-  protected static String DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX = 
"mapred.dfsclient.parallelism.max";
+  protected static final String DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX = 
"mapred.dfsclient.parallelism.max";
 
   /**
    * ReduceField:
@@ -603,7 +605,7 @@ public final class Utilities {
   public static void setMapRedWork(Configuration conf, MapredWork w, Path 
hiveScratchDir) {
     String useName = conf.get(INPUT_NAME);
     if (useName == null) {
-      useName = "mapreduce";
+      useName = "mapreduce:" + hiveScratchDir;
     }
     conf.set(INPUT_NAME, useName);
     setMapWork(conf, w.getMapWork(), hiveScratchDir, true);
@@ -767,8 +769,8 @@ public final class Utilities {
   // Note: When DDL supports specifying what string to represent null,
   // we should specify "NULL" to represent null in the temp table, and then
   // we can make the following translation deprecated.
-  public static String nullStringStorage = "\\N";
-  public static String nullStringOutput = "NULL";
+  public static final String nullStringStorage = "\\N";
+  public static final String nullStringOutput = "NULL";
 
   public static Random randGen = new Random();
 
@@ -2681,7 +2683,7 @@ public final class Utilities {
     setColumnTypeList(jobConf, rowSchema, excludeVCs);
   }
 
-  public static String suffix = ".hashtable";
+  public static final String suffix = ".hashtable";
 
   public static Path generatePath(Path basePath, String dumpFilePrefix,
       Byte tag, String bigBucketFileName) {
@@ -3162,6 +3164,7 @@ public final class Utilities {
 
     Set<Path> pathsProcessed = new HashSet<Path>();
     List<Path> pathsToAdd = new LinkedList<Path>();
+    LockedDriverState lDrvStat = LockedDriverState.getLockedDriverState();
     // AliasToWork contains all the aliases
     Collection<String> aliasToWork = work.getAliasToWork().keySet();
     if (!skipDummy) {
@@ -3182,6 +3185,9 @@ public final class Utilities {
       boolean hasLogged = false;
       Path path = null;
       for (Map.Entry<Path, ArrayList<String>> e : pathToAliases) {
+        if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+          throw new IOException("Operation is Canceled. ");
+
         Path file = e.getKey();
         List<String> aliases = e.getValue();
         if (aliases.contains(alias)) {
@@ -3235,6 +3241,8 @@ public final class Utilities {
     List<Path> finalPathsToAdd = new LinkedList<>();
     List<Future<Path>> futures = new LinkedList<>();
     for (final Path path : pathsToAdd) {
+      if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+        throw new IOException("Operation is Canceled. ");
       if (pool == null) {
         finalPathsToAdd.add(new GetInputPathsCallable(path, job, work, 
hiveScratchDir, ctx, skipDummy).call());
       } else {
@@ -3244,6 +3252,8 @@ public final class Utilities {
 
     if (pool != null) {
       for (Future<Path> future : futures) {
+        if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+          throw new IOException("Operation is Canceled. ");
         finalPathsToAdd.add(future.get());
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java
new file mode 100644
index 0000000..4ad4f98
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionError.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.mapjoin;
+
+/**
+ * When this Error is thrown, better not retry.
+ */
+public class MapJoinMemoryExhaustionError extends Error {
+  private static final long serialVersionUID = 3678353959830506881L;
+  public MapJoinMemoryExhaustionError(String msg) {
+    super(msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java
deleted file mode 100644
index dbe00b6..0000000
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.mapjoin;
-
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-
-
-
-public class MapJoinMemoryExhaustionException extends HiveException {
-  private static final long serialVersionUID = 3678353959830506881L;
-  public MapJoinMemoryExhaustionException(String msg) {
-    super(msg);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
index 7fc3226..d5e81e1 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mapjoin/MapJoinMemoryExhaustionHandler.java
@@ -86,17 +86,17 @@ public class MapJoinMemoryExhaustionHandler {
    *
    * @param tableContainerSize currently table container size
    * @param numRows number of rows processed
-   * @throws MapJoinMemoryExhaustionException
+   * @throws MapJoinMemoryExhaustionError
    */
   public void checkMemoryStatus(long tableContainerSize, long numRows)
-  throws MapJoinMemoryExhaustionException {
+  throws MapJoinMemoryExhaustionError {
     long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
     double percentage = (double) usedMemory / (double) maxHeapSize;
     String msg = Utilities.now() + "\tProcessing rows:\t" + numRows + 
"\tHashtable size:\t"
         + tableContainerSize + "\tMemory usage:\t" + usedMemory + 
"\tpercentage:\t" + percentageNumberFormat.format(percentage);
     console.printInfo(msg);
     if(percentage > maxMemoryUsage) {
-      throw new MapJoinMemoryExhaustionException(msg);
+      throw new MapJoinMemoryExhaustionError(msg);
     }
    }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index 1945163..93a36c6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -32,6 +32,7 @@ import java.util.Properties;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hadoop.hive.ql.log.LogDivertAppenderForTest;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,6 +69,7 @@ import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
 import org.apache.hadoop.hive.ql.io.IOPrepareCache;
+import org.apache.hadoop.hive.ql.log.LogDivertAppender;
 import org.apache.hadoop.hive.ql.log.NullAppender;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
@@ -116,6 +118,8 @@ public class ExecDriver extends Task<MapredWork> implements 
Serializable, Hadoop
   protected transient JobConf job;
   public static MemoryMXBean memoryMXBean;
   protected HadoopJobExecHelper jobExecHelper;
+  private transient boolean isShutdown = false;
+  private transient boolean jobKilled = false;
 
   protected static transient final Logger LOG = 
LoggerFactory.getLogger(ExecDriver.class);
 
@@ -412,10 +416,7 @@ public class ExecDriver extends Task<MapredWork> 
implements Serializable, Hadoop
 
       if (driverContext.isShutdown()) {
         LOG.warn("Task was cancelled");
-        if (rj != null) {
-          rj.killJob();
-          rj = null;
-        }
+        killJob();
         return 5;
       }
 
@@ -448,7 +449,7 @@ public class ExecDriver extends Task<MapredWork> implements 
Serializable, Hadoop
 
         if (rj != null) {
           if (returnVal != 0) {
-            rj.killJob();
+            killJob();
           }
           jobID = rj.getID().toString();
         }
@@ -632,6 +633,8 @@ public class ExecDriver extends Task<MapredWork> implements 
Serializable, Hadoop
   private static void setupChildLog4j(Configuration conf) {
     try {
       LogUtils.initHiveExecLog4j();
+      LogDivertAppender.registerRoutingAppender(conf);
+      LogDivertAppenderForTest.registerRoutingAppenderIfInTest(conf);
     } catch (LogInitializationException e) {
       System.err.println(e.getMessage());
     }
@@ -703,6 +706,8 @@ public class ExecDriver extends Task<MapredWork> implements 
Serializable, Hadoop
     }
     System.setProperty(HiveConf.ConfVars.HIVEQUERYID.toString(), queryId);
 
+    LogUtils.registerLoggingContext(conf);
+
     if (noLog) {
       // If started from main(), and noLog is on, we should not output
       // any logs. To turn the log on, please set -Dtest.silent=false
@@ -853,22 +858,37 @@ public class ExecDriver extends Task<MapredWork> 
implements Serializable, Hadoop
     ss.getHiveHistory().logPlanProgress(queryPlan);
   }
 
+  public boolean isTaskShutdown() {
+    return isShutdown;
+  }
+
   @Override
   public void shutdown() {
     super.shutdown();
-    if (rj != null) {
+    killJob();
+    isShutdown = true;
+  }
+
+  @Override
+  public String getExternalHandle() {
+    return this.jobID;
+  }
+
+  private void killJob() {
+    boolean needToKillJob = false;
+    synchronized(this) {
+      if (rj != null && !jobKilled) {
+        jobKilled = true;
+        needToKillJob = true;
+      }
+    }
+    if (needToKillJob) {
       try {
         rj.killJob();
       } catch (Exception e) {
         LOG.warn("failed to kill job " + rj.getID(), e);
       }
-      rj = null;
     }
   }
-
-  @Override
-  public String getExternalHandle() {
-    return this.jobID;
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
index 591ea97..c5d4f9a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
@@ -60,7 +60,7 @@ import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionException;
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -69,7 +69,6 @@ import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.ql.session.OperationLog;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.session.SessionState.LogHelper;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -327,18 +326,8 @@ public class MapredLocalTask extends Task<MapredLocalWork> 
implements Serializab
 
       CachingPrintStream errPrintStream = new CachingPrintStream(System.err);
 
-      StreamPrinter outPrinter;
-      StreamPrinter errPrinter;
-      OperationLog operationLog = OperationLog.getCurrentOperationLog();
-      if (operationLog != null) {
-        outPrinter = new StreamPrinter(executor.getInputStream(), null, 
System.out,
-            operationLog.getPrintStream());
-        errPrinter = new StreamPrinter(executor.getErrorStream(), null, 
errPrintStream,
-            operationLog.getPrintStream());
-      } else {
-        outPrinter = new StreamPrinter(executor.getInputStream(), null, 
System.out);
-        errPrinter = new StreamPrinter(executor.getErrorStream(), null, 
errPrintStream);
-      }
+      StreamPrinter outPrinter = new StreamPrinter(executor.getInputStream(), 
null, System.out);
+      StreamPrinter errPrinter = new StreamPrinter(executor.getErrorStream(), 
null, errPrintStream);
 
       outPrinter.start();
       errPrinter.start();
@@ -395,7 +384,7 @@ public class MapredLocalTask extends Task<MapredLocalWork> 
implements Serializab
           + Utilities.showTime(elapsed) + " sec.");
     } catch (Throwable throwable) {
       if (throwable instanceof OutOfMemoryError
-          || (throwable instanceof MapJoinMemoryExhaustionException)) {
+          || (throwable instanceof MapJoinMemoryExhaustionError)) {
         l4j.error("Hive Runtime Error: Map local work exhausted memory", 
throwable);
         return 3;
       } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
index 04e24bd..360b639 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java
@@ -24,6 +24,8 @@ import java.util.Map;
 import java.util.TreeMap;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.hive.common.MemoryEstimate;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -46,7 +48,7 @@ import com.google.common.annotations.VisibleForTesting;
  * Initially inspired by HPPC LongLongOpenHashMap; however, the code is almost 
completely reworked
  * and there's very little in common left save for quadratic probing (and that 
with some changes).
  */
-public final class BytesBytesMultiHashMap {
+public final class BytesBytesMultiHashMap implements MemoryEstimate {
   public static final Logger LOG = 
LoggerFactory.getLogger(BytesBytesMultiHashMap.class);
 
   /*
@@ -521,7 +523,18 @@ public final class BytesBytesMultiHashMap {
    * @return number of bytes
    */
   public long memorySize() {
-    return writeBuffers.size() + refs.length * 8 + 100;
+    return getEstimatedMemorySize();
+  }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    JavaDataModel jdm = JavaDataModel.get();
+    long size = 0;
+    size += writeBuffers.getEstimatedMemorySize();
+    size += jdm.lengthForLongArrayOfSize(refs.length);
+    // 11 primitive1 fields, 2 refs above with alignment
+    size += JavaDataModel.alignUp(15 * jdm.primitive1(), jdm.memoryAlign());
+    return size;
   }
 
   public void seal() {

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
index a3bccc6..adf1a90 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
@@ -53,7 +53,7 @@ import org.apache.hadoop.io.Writable;
 public class HashMapWrapper extends AbstractMapJoinTableContainer implements 
Serializable {
   private static final long serialVersionUID = 1L;
   protected static final Logger LOG = 
LoggerFactory.getLogger(HashMapWrapper.class);
-
+  private static final long DEFAULT_HASHMAP_ENTRY_SIZE = 1024L;
   // default threshold for using main memory based HashMap
   private static final int THRESHOLD = 1000000;
   private static final float LOADFACTOR = 0.75f;
@@ -140,6 +140,14 @@ public class HashMapWrapper extends 
AbstractMapJoinTableContainer implements Ser
     return new GetAdaptor(keyTypeFromLoader);
   }
 
+  @Override
+  public long getEstimatedMemorySize() {
+    // TODO: Key and Values are Object[] which can be eagerly deserialized or 
lazily deserialized. To accurately
+    // estimate the entry size, every possible Objects in Key, Value should 
implement MemoryEstimate interface which
+    // is very intrusive. So assuming default entry size here.
+    return size() * DEFAULT_HASHMAP_ENTRY_SIZE;
+  }
+
   private class GetAdaptor implements ReusableGetAdaptor {
 
     private Object[] currentKey;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
index 04e89e8..6523f00 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java
@@ -118,6 +118,11 @@ public class HybridHashTableContainer
 
   private final String spillLocalDirs;
 
+  @Override
+  public long getEstimatedMemorySize() {
+    return memoryUsed;
+  }
+
   /**
    * This class encapsulates the triplet together since they are closely 
related to each other
    * The triplet: hashmap (either in memory or on disk), small table 
container, big table container

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
index c86e5f5..014d17a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.MemoryEstimate;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
 import org.apache.hadoop.hive.ql.exec.JoinUtil;
@@ -33,6 +34,7 @@ import 
org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper;
 import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch;
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.ByteStream.Output;
 import org.apache.hadoop.hive.serde2.ByteStream.RandomAccessOutput;
 import org.apache.hadoop.hive.serde2.AbstractSerDe;
@@ -72,6 +74,11 @@ public class MapJoinBytesTableContainer
          implements MapJoinTableContainer, MapJoinTableContainerDirectAccess {
   private static final Logger LOG = 
LoggerFactory.getLogger(MapJoinTableContainer.class);
 
+  // TODO: For object inspector fields, assigning 16KB for now. To better 
estimate the memory size every
+  // object inspectors have to implement MemoryEstimate interface which is a 
lot of change with little benefit compared
+  // to writing an instrumentation agent for object size estimation
+  public static final long DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE = 16 * 1024L;
+
   private final BytesBytesMultiHashMap hashMap;
   /** The OI used to deserialize values. We never deserialize keys. */
   private LazyBinaryStructObjectInspector internalValueOi;
@@ -147,7 +154,7 @@ public class MapJoinBytesTableContainer
     this.notNullMarkers = notNullMarkers;
   }
 
-  public static interface KeyValueHelper extends 
BytesBytesMultiHashMap.KvSource {
+  public static interface KeyValueHelper extends 
BytesBytesMultiHashMap.KvSource, MemoryEstimate {
     void setKeyValue(Writable key, Writable val) throws SerDeException;
     /** Get hash value from the key. */
     int getHashFromKey() throws SerDeException;
@@ -216,6 +223,22 @@ public class MapJoinBytesTableContainer
     public int getHashFromKey() throws SerDeException {
       throw new UnsupportedOperationException("Not supported for 
MapJoinBytesTableContainer");
     }
+
+    @Override
+    public long getEstimatedMemorySize() {
+      JavaDataModel jdm = JavaDataModel.get();
+      long size = 0;
+      size += keySerDe == null ? 0 : jdm.object();
+      size += valSerDe == null ? 0 : jdm.object();
+      size += keySoi == null ? 0 : DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
+      size += valSoi == null ? 0 : DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
+      size += keyOis == null ? 0 : jdm.arrayList() + keyOis.size() * 
DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
+      size += valOis == null ? 0 : jdm.arrayList() + valOis.size() * 
DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
+      size += keyObjs == null ? 0 : jdm.array() + keyObjs.length * 
jdm.object();
+      size += valObjs == null ? 0 : jdm.array() + valObjs.length * 
jdm.object();
+      size += jdm.primitive1();
+      return size;
+    }
   }
 
   static class LazyBinaryKvWriter implements KeyValueHelper {
@@ -319,6 +342,15 @@ public class MapJoinBytesTableContainer
       aliasFilter &= filterGetter.getShort();
       return aliasFilter;
     }
+
+    @Override
+    public long getEstimatedMemorySize() {
+      JavaDataModel jdm = JavaDataModel.get();
+      long size = 0;
+      size += (4 * jdm.object());
+      size += jdm.primitive1();
+      return size;
+    }
   }
 
   /*
@@ -361,6 +393,15 @@ public class MapJoinBytesTableContainer
       int keyLength = key.getLength();
       return HashCodeUtil.murmurHash(keyBytes, 0, keyLength);
     }
+
+    @Override
+    public long getEstimatedMemorySize() {
+      JavaDataModel jdm = JavaDataModel.get();
+      long size = 0;
+      size += jdm.object() + (key == null ? 0 : key.getCapacity());
+      size += jdm.object() + (val == null ? 0 : val.getCapacity());
+      return size;
+    }
   }
 
   @Override
@@ -768,4 +809,19 @@ public class MapJoinBytesTableContainer
   public int size() {
     return hashMap.size();
   }
+
+  @Override
+  public long getEstimatedMemorySize() {
+    JavaDataModel jdm = JavaDataModel.get();
+    long size = 0;
+    size += hashMap.getEstimatedMemorySize();
+    size += directWriteHelper == null ? 0 : 
directWriteHelper.getEstimatedMemorySize();
+    size += writeHelper == null ? 0 : writeHelper.getEstimatedMemorySize();
+    size += sortableSortOrders == null ? 0 : 
jdm.lengthForBooleanArrayOfSize(sortableSortOrders.length);
+    size += nullMarkers == null ? 0 : 
jdm.lengthForByteArrayOfSize(nullMarkers.length);
+    size += notNullMarkers == null ? 0 : 
jdm.lengthForByteArrayOfSize(notNullMarkers.length);
+    size += jdm.arrayList(); // empty list
+    size += DEFAULT_OBJECT_INSPECTOR_MEMORY_SIZE;
+    return size;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
index 6d71fef..5ca5ff6 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainer.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
 import org.apache.hadoop.hive.ql.exec.JoinUtil;
+import org.apache.hadoop.hive.common.MemoryEstimate;
 import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapper;
 import org.apache.hadoop.hive.ql.exec.vector.VectorHashKeyWrapperBatch;
 import 
org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
@@ -31,7 +32,7 @@ import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.Writable;
 
-public interface MapJoinTableContainer {
+public interface MapJoinTableContainer extends MemoryEstimate {
   /**
    * Retrieve rows from hashtable key by key, one key at a time, w/o copying 
the structures
    * for each key. "Old" HashMapWrapper will still create/retrieve new objects 
for java HashMap;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
index 4c69899..4ca8f93 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/RemoteHiveSparkClient.java
@@ -356,12 +356,9 @@ public class RemoteHiveSparkClient implements 
HiveSparkClient {
     private void logConfigurations(JobConf localJobConf) {
       if (LOG.isInfoEnabled()) {
         LOG.info("Logging job configuration: ");
-        StringWriter outWriter = new StringWriter();
-        try {
-          Configuration.dumpConfiguration(localJobConf, outWriter);
-        } catch (IOException e) {
-          LOG.warn("Error logging job configuration", e);
-        }
+        StringBuilder outWriter = new StringBuilder();
+        // redact sensitive information before logging
+        HiveConfUtil.dumpConfig(localJobConf, outWriter);
         LOG.info(outWriter.toString());
       }
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
index 12a76a7..5f85f9e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
@@ -298,12 +298,13 @@ public class SparkPlanGenerator {
       throw new IllegalArgumentException(msg, e);
     }
     if (work instanceof MapWork) {
+      MapWork mapWork = (MapWork) work;
       cloned.setBoolean("mapred.task.is.map", true);
-      List<Path> inputPaths = Utilities.getInputPaths(cloned, (MapWork) work,
+      List<Path> inputPaths = Utilities.getInputPaths(cloned, mapWork,
           scratchDir, context, false);
       Utilities.setInputPaths(cloned, inputPaths);
-      Utilities.setMapWork(cloned, (MapWork) work, scratchDir, false);
-      Utilities.createTmpDirs(cloned, (MapWork) work);
+      Utilities.setMapWork(cloned, mapWork, scratchDir, false);
+      Utilities.createTmpDirs(cloned, mapWork);
       if (work instanceof MergeFileWork) {
         MergeFileWork mergeFileWork = (MergeFileWork) work;
         cloned.set(Utilities.MAPRED_MAPPER_CLASS, 
MergeFileMapper.class.getName());
@@ -313,9 +314,21 @@ public class SparkPlanGenerator {
       } else {
         cloned.set(Utilities.MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
       }
-      if (((MapWork) work).getMinSplitSize() != null) {
+      if (mapWork.getMaxSplitSize() != null) {
+        HiveConf.setLongVar(cloned, HiveConf.ConfVars.MAPREDMAXSPLITSIZE,
+            mapWork.getMaxSplitSize());
+      }
+      if (mapWork.getMinSplitSize() != null) {
         HiveConf.setLongVar(cloned, HiveConf.ConfVars.MAPREDMINSPLITSIZE,
-            ((MapWork) work).getMinSplitSize());
+            mapWork.getMinSplitSize());
+      }
+      if (mapWork.getMinSplitSizePerNode() != null) {
+        HiveConf.setLongVar(cloned, 
HiveConf.ConfVars.MAPREDMINSPLITSIZEPERNODE,
+            mapWork.getMinSplitSizePerNode());
+      }
+      if (mapWork.getMinSplitSizePerRack() != null) {
+        HiveConf.setLongVar(cloned, 
HiveConf.ConfVars.MAPREDMINSPLITSIZEPERRACK,
+            mapWork.getMinSplitSizePerRack());
       }
       // remember the JobConf cloned for each MapWork, so we won't clone for 
it again
       workToJobConf.put(work, cloned);

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
index 27bed9c..7eaad18 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
@@ -95,6 +95,7 @@ public class SparkReduceRecordHandler extends 
SparkRecordHandler {
   // number of columns pertaining to keys in a vectorized row batch
   private int keysColumnOffset;
   private static final int BATCH_SIZE = VectorizedRowBatch.DEFAULT_SIZE;
+  private static final int BATCH_BYTES = VectorizedRowBatch.DEFAULT_BYTES;
   private StructObjectInspector keyStructInspector;
   private StructObjectInspector[] valueStructInspectors;
   /* this is only used in the error code path */
@@ -373,6 +374,7 @@ public class SparkReduceRecordHandler extends 
SparkRecordHandler {
     }
 
     int rowIdx = 0;
+    int batchBytes = 0;
     try {
       while (values.hasNext()) {
         /* deserialize value into columns */
@@ -381,11 +383,13 @@ public class SparkReduceRecordHandler extends 
SparkRecordHandler {
 
         VectorizedBatchUtil.addRowToBatchFrom(valueObj, 
valueStructInspectors[tag], rowIdx,
             keysColumnOffset, batch, buffer);
+        batchBytes += valueWritable.getLength();
         rowIdx++;
-        if (rowIdx >= BATCH_SIZE) {
+        if (rowIdx >= BATCH_SIZE || batchBytes > BATCH_BYTES) {
           VectorizedBatchUtil.setBatchSize(batch, rowIdx);
           reducer.process(batch, tag);
           rowIdx = 0;
+          batchBytes = 0;
           if (isLogInfoEnabled) {
             logMemoryInfo();
           }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 4c01329..98b1605 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -83,6 +83,8 @@ public class SparkTask extends Task<SparkWork> {
   private transient int totalTaskCount;
   private transient int failedTaskCount;
   private transient List<Integer> stageIds;
+  private transient SparkJobRef jobRef = null;
+  private transient boolean isShutdown = false;
 
   @Override
   public void initialize(QueryState queryState, QueryPlan queryPlan, 
DriverContext driverContext,
@@ -107,7 +109,7 @@ public class SparkTask extends Task<SparkWork> {
 
       perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
       submitTime = perfLogger.getStartTime(PerfLogger.SPARK_SUBMIT_JOB);
-      SparkJobRef jobRef = sparkSession.submit(driverContext, sparkWork);
+      jobRef = sparkSession.submit(driverContext, sparkWork);
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.SPARK_SUBMIT_JOB);
 
       addToHistory(jobRef);
@@ -127,8 +129,14 @@ public class SparkTask extends Task<SparkWork> {
         // TODO: If the timeout is because of lack of resources in the 
cluster, we should
         // ideally also cancel the app request here. But w/o facilities from 
Spark or YARN,
         // it's difficult to do it on hive side alone. See HIVE-12650.
+        LOG.info("Failed to submit Spark job " + sparkJobID);
+        jobRef.cancelJob();
+      } else if (rc == 4) {
+        LOG.info("The number of tasks reaches above the limit " + 
conf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS) +
+            ". Cancelling Spark job " + sparkJobID + " with application ID " + 
jobID );
         jobRef.cancelJob();
       }
+
       if (this.jobID == null) {
         this.jobID = sparkJobStatus.getAppID();
       }
@@ -290,6 +298,23 @@ public class SparkTask extends Task<SparkWork> {
     return finishTime;
   }
 
+  public boolean isTaskShutdown() {
+    return isShutdown;
+  }
+
+  @Override
+  public void shutdown() {
+    super.shutdown();
+    if (jobRef != null && !isShutdown) {
+      try {
+        jobRef.cancelJob();
+      } catch (Exception e) {
+        LOG.warn("failed to kill job", e);
+      }
+    }
+    isShutdown = true;
+  }
+
   /**
    * Set the number of reducers for the spark work.
    */

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
index 7d18c0a..eb9883a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkUtilities.java
@@ -78,7 +78,7 @@ public class SparkUtilities {
     Path localFile = new Path(source.getPath());
     Path remoteFile = new 
Path(SessionState.get().getSparkSession().getHDFSSessionDir(),
         getFileName(source));
-    FileSystem fileSystem = FileSystem.get(conf);
+    FileSystem fileSystem = FileSystem.get(remoteFile.toUri(), conf);
     // Overwrite if the remote file already exists. Whether the file can be 
added
     // on executor is up to spark, i.e. spark.files.overwrite
     fileSystem.copyFromLocalFile(false, true, localFile, remoteFile);
@@ -92,7 +92,7 @@ public class SparkUtilities {
     String deployMode = sparkConf.contains("spark.submit.deployMode") ?
         sparkConf.get("spark.submit.deployMode") : null;
     return SparkClientUtilities.isYarnClusterMode(master, deployMode) &&
-        !source.getScheme().equals("hdfs");
+        !(source.getScheme().equals("hdfs") || 
source.getScheme().equals("viewfs"));
   }
 
   private static String getFileName(URI uri) {

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index dd73f3e..9dfb65e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -34,7 +34,8 @@ import org.apache.spark.JobExecutionStatus;
  * It print current job status to console and sleep current thread between 
monitor interval.
  */
 public class RemoteSparkJobMonitor extends SparkJobMonitor {
-
+  private int sparkJobMaxTaskCount = -1;
+  private int totalTaskCount = 0;
   private RemoteSparkJobStatus sparkJobStatus;
   private final HiveConf hiveConf;
 
@@ -42,6 +43,7 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
     super(hiveConf);
     this.sparkJobStatus = sparkJobStatus;
     this.hiveConf = hiveConf;
+    sparkJobMaxTaskCount = 
hiveConf.getIntVar(HiveConf.ConfVars.SPARK_JOB_MAX_TASKS);
   }
 
   @Override
@@ -100,6 +102,17 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor 
{
               } else {
                 console.logInfo(format);
               }
+            } else {
+              // Count the number of tasks, and kill application if it goes 
beyond the limit.
+              if (sparkJobMaxTaskCount != -1 && totalTaskCount == 0) {
+                totalTaskCount = getTotalTaskCount(progressMap);
+                if (totalTaskCount > sparkJobMaxTaskCount) {
+                  rc = 4;
+                  done = true;
+                  console.printInfo("\nThe total number of task in the Spark 
job [" + totalTaskCount + "] is greater than the limit [" +
+                      sparkJobMaxTaskCount + "]. The Spark job will be 
cancelled.");
+                }
+              }
             }
 
             printStatus(progressMap, lastProgressMap);

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
index 0b224f2..41730b5 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/SparkJobMonitor.java
@@ -66,7 +66,6 @@ abstract class SparkJobMonitor {
   private int lines = 0;
   private final PrintStream out;
 
-
   private static final int COLUMN_1_WIDTH = 16;
   private static final String HEADER_FORMAT = "%16s%10s %13s  %5s  %9s  %7s  
%7s  %6s  ";
   private static final String STAGE_FORMAT = "%-16s%10s %13s  %5s  %9s  %7s  
%7s  %6s  ";
@@ -173,6 +172,15 @@ abstract class SparkJobMonitor {
     lastPrintTime = System.currentTimeMillis();
   }
 
+  protected int getTotalTaskCount(Map<String, SparkStageProgress> progressMap) 
{
+    int totalTasks = 0;
+    for (SparkStageProgress progress: progressMap.values() ) {
+      totalTasks += progress.getTotalTaskCount();
+    }
+
+    return totalTasks;
+  }
+
   private String getReport(Map<String, SparkStageProgress> progressMap) {
     StringBuilder reportBuffer = new StringBuilder();
     SimpleDateFormat dt = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
index 951dbb4..67db303 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/impl/RemoteSparkJobStatus.java
@@ -67,6 +67,9 @@ public class RemoteSparkJobStatus implements SparkJobStatus {
       return getAppID.get(sparkClientTimeoutInSeconds, TimeUnit.SECONDS);
     } catch (Exception e) {
       LOG.warn("Failed to get APP ID.", e);
+      if (Thread.interrupted()) {
+        error = e;
+      }
       return null;
     }
   }
@@ -186,6 +189,9 @@ public class RemoteSparkJobStatus implements SparkJobStatus 
{
   }
 
   public JobHandle.State getRemoteJobState() {
+    if (error != null) {
+      return JobHandle.State.FAILED;
+    }
     return jobHandle.getState();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index aa2dfc7..6497495 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -336,6 +336,11 @@ public class DagUtils {
       setupAutoReducerParallelism(edgeProp, w);
       break;
     }
+    case CUSTOM_SIMPLE_EDGE: {
+      setupQuickStart(edgeProp, w);
+      break;
+    }
+
     default:
       // nothing
     }
@@ -965,10 +970,9 @@ public class DagUtils {
    * @return true if the file names match else returns false.
    * @throws IOException when any file system related call fails
    */
-  private boolean checkPreExisting(Path src, Path dest, Configuration conf)
+  private boolean checkPreExisting(FileSystem sourceFS, Path src, Path dest, 
Configuration conf)
     throws IOException {
     FileSystem destFS = dest.getFileSystem(conf);
-    FileSystem sourceFS = src.getFileSystem(conf);
     FileStatus destStatus = FileUtils.getFileStatusOrNull(destFS, dest);
     if (destStatus != null) {
       return (sourceFS.getFileStatus(src).getLen() == destStatus.getLen());
@@ -988,7 +992,9 @@ public class DagUtils {
   public LocalResource localizeResource(
       Path src, Path dest, LocalResourceType type, Configuration conf) throws 
IOException {
     FileSystem destFS = dest.getFileSystem(conf);
-    if (src != null && !checkPreExisting(src, dest, conf)) {
+    // We call copyFromLocal below, so we basically assume src is a local file.
+    FileSystem srcFs = FileSystem.getLocal(conf);
+    if (src != null && !checkPreExisting(srcFs, src, dest, conf)) {
       // copy the src to the destination and create local resource.
       // do not overwrite.
       String srcStr = src.toString();
@@ -1000,7 +1006,7 @@ public class DagUtils {
       // authoritative one), don't wait infinitely for the notifier, just wait 
a little bit
       // and check HDFS before and after.
       if (notifierOld != null
-          && checkOrWaitForTheFile(src, dest, conf, notifierOld, 1, 150, 
false)) {
+          && checkOrWaitForTheFile(srcFs, src, dest, conf, notifierOld, 1, 
150, false)) {
         return createLocalResource(destFS, dest, type, 
LocalResourceVisibility.PRIVATE);
       }
       try {
@@ -1022,7 +1028,7 @@ public class DagUtils {
             conf, HiveConf.ConfVars.HIVE_LOCALIZE_RESOURCE_WAIT_INTERVAL, 
TimeUnit.MILLISECONDS);
         // Only log on the first wait, and check after wait on the last 
iteration.
         if (!checkOrWaitForTheFile(
-            src, dest, conf, notifierOld, waitAttempts, sleepInterval, true)) {
+            srcFs, src, dest, conf, notifierOld, waitAttempts, sleepInterval, 
true)) {
           LOG.error("Could not find the jar that was being uploaded");
           throw new IOException("Previous writer likely failed to write " + 
dest +
               ". Failing because I am unlikely to write too.");
@@ -1037,10 +1043,10 @@ public class DagUtils {
         LocalResourceVisibility.PRIVATE);
   }
 
-  public boolean checkOrWaitForTheFile(Path src, Path dest, Configuration 
conf, Object notifier,
-      int waitAttempts, long sleepInterval, boolean doLog) throws IOException {
+  public boolean checkOrWaitForTheFile(FileSystem srcFs, Path src, Path dest, 
Configuration conf,
+      Object notifier, int waitAttempts, long sleepInterval, boolean doLog) 
throws IOException {
     for (int i = 0; i < waitAttempts; i++) {
-      if (checkPreExisting(src, dest, conf)) return true;
+      if (checkPreExisting(srcFs, src, dest, conf)) return true;
       if (doLog && i == 0) {
         LOG.info("Waiting for the file " + dest + " (" + waitAttempts + " 
attempts, with "
             + sleepInterval + "ms interval)");
@@ -1059,7 +1065,7 @@ public class DagUtils {
         throw new IOException(interruptedException);
       }
     }
-    return checkPreExisting(src, dest, conf); // One last check.
+    return checkPreExisting(srcFs, src, dest, conf); // One last check.
   }
 
   /**
@@ -1265,6 +1271,20 @@ public class DagUtils {
     }
   }
 
+  private void setupQuickStart(TezEdgeProperty edgeProp, Vertex v)
+    throws IOException {
+    if (!edgeProp.isSlowStart()) {
+      Configuration pluginConf = new Configuration(false);
+      VertexManagerPluginDescriptor desc =
+              
VertexManagerPluginDescriptor.create(ShuffleVertexManager.class.getName());
+      
pluginConf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION,
 0);
+      
pluginConf.setFloat(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION,
 0);
+      UserPayload payload = TezUtils.createUserPayloadFromConf(pluginConf);
+      desc.setUserPayload(payload);
+      v.setVertexManagerPlugin(desc);
+    }
+  }
+
   public String createDagName(Configuration conf, QueryPlan plan) {
     String name = getUserSpecifiedDagName(conf);
     if (name == null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
index 7b13e90..7011d23 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -146,7 +147,20 @@ public class HashTableLoader implements 
org.apache.hadoop.hive.ql.exec.HashTable
       }
       nwayConf.setNumberOfPartitions(numPartitions);
     }
-
+    final float inflationFactor = HiveConf.getFloatVar(hconf, 
HiveConf.ConfVars.HIVE_HASH_TABLE_INFLATION_FACTOR);
+    final long memoryCheckInterval = HiveConf.getLongVar(hconf,
+      HiveConf.ConfVars.LLAP_MAPJOIN_MEMORY_MONITOR_CHECK_INTERVAL);
+    final boolean isLlap = "llap".equals(HiveConf.getVar(hconf, 
HiveConf.ConfVars.HIVE_EXECUTION_MODE));
+    long numEntries = 0;
+    long noCondTaskSize = desc.getNoConditionalTaskSize();
+    boolean doMemCheck = isLlap && inflationFactor > 0.0f && noCondTaskSize > 
0 && memoryCheckInterval > 0;
+    if (!doMemCheck) {
+      LOG.info("Not doing hash table memory monitoring. isLlap: {} 
inflationFactor: {} noConditionalTaskSize: {} " +
+        "memoryCheckInterval: {}", isLlap, inflationFactor, noCondTaskSize, 
memoryCheckInterval);
+    } else {
+      LOG.info("Memory monitoring for hash table loader enabled. 
noconditionalTaskSize: {} inflationFactor: {} ",
+        noCondTaskSize, inflationFactor);
+    }
     for (int pos = 0; pos < mapJoinTables.length; pos++) {
       if (pos == desc.getPosBigTable()) {
         continue;
@@ -205,12 +219,32 @@ public class HashTableLoader implements 
org.apache.hadoop.hive.ql.exec.HashTable
           tableContainer = new HashMapWrapper(hconf, keyCount);
         }
 
-        LOG.info("Using tableContainer " + 
tableContainer.getClass().getSimpleName());
+        LOG.info("Using tableContainer: " + 
tableContainer.getClass().getSimpleName());
 
         tableContainer.setSerde(keyCtx, valCtx);
         while (kvReader.next()) {
-          tableContainer.putRow(
-              (Writable)kvReader.getCurrentKey(), 
(Writable)kvReader.getCurrentValue());
+          tableContainer.putRow((Writable) kvReader.getCurrentKey(), 
(Writable) kvReader.getCurrentValue());
+          numEntries++;
+          if (doMemCheck && ((numEntries % memoryCheckInterval) == 0)) {
+            final long estMemUsage = tableContainer.getEstimatedMemorySize();
+            final long threshold = (long) (inflationFactor * noCondTaskSize);
+            // guard against poor configuration of noconditional task size. We 
let hash table grow till 2/3'rd memory
+            // available for container/executor
+            final long effectiveThreshold = (long) Math.max(threshold, 
(2.0/3.0) * desc.getMaxMemoryAvailable());
+            if (estMemUsage > effectiveThreshold) {
+              String msg = "Hash table loading exceeded memory limits." +
+                " estimatedMemoryUsage: " + estMemUsage + " 
noconditionalTaskSize: " + noCondTaskSize +
+                " inflationFactor: " + inflationFactor + " threshold: " + 
threshold +
+                " effectiveThreshold: " + effectiveThreshold;
+              LOG.error(msg);
+              throw new MapJoinMemoryExhaustionError(msg);
+            } else {
+              if (LOG.isInfoEnabled()) {
+                LOG.info("Checking hash table loader memory usage.. 
numEntries: {} estimatedMemoryUsage: {} " +
+                  "effectiveThreshold: {}", numEntries, estMemUsage, 
effectiveThreshold);
+              }
+            }
+          }
         }
         tableContainer.seal();
         LOG.info("Finished loading hashtable using " + 
tableContainer.getClass() + ". Small table position: " + pos);

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
index ad8b9e0..60660ac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
@@ -103,6 +103,8 @@ public class ReduceRecordSource implements RecordSource {
   // number of columns pertaining to keys in a vectorized row batch
   private int firstValueColumnOffset;
 
+  private final int BATCH_BYTES = VectorizedRowBatch.DEFAULT_BYTES;
+
   private StructObjectInspector keyStructInspector;
   private StructObjectInspector valueStructInspectors;
 
@@ -190,7 +192,9 @@ public class ReduceRecordSource implements RecordSource {
                                   
VectorizedBatchUtil.typeInfosFromStructObjectInspector(
                                       keyStructInspector),
                                   /* useExternalBuffer */ true,
-                                  binarySortableSerDe.getSortOrders()));
+                                  binarySortableSerDe.getSortOrders(),
+                                  binarySortableSerDe.getNullMarkers(),
+                                  binarySortableSerDe.getNotNullMarkers()));
         keyBinarySortableDeserializeToRow.init(0);
 
         final int valuesSize = 
valueStructInspectors.getAllStructFieldRefs().size();
@@ -435,6 +439,7 @@ public class ReduceRecordSource implements RecordSource {
     final int maxSize = batch.getMaxSize();
     Preconditions.checkState(maxSize > 0);
     int rowIdx = 0;
+    int batchBytes = keyBytes.length;
     try {
       for (Object value : values) {
         if (valueLazyBinaryDeserializeToRow != null) {
@@ -442,6 +447,7 @@ public class ReduceRecordSource implements RecordSource {
           BytesWritable valueWritable = (BytesWritable) value;
           byte[] valueBytes = valueWritable.getBytes();
           int valueLength = valueWritable.getLength();
+          batchBytes += valueLength;
 
           // l4j.info("ReduceRecordSource processVectorGroup valueBytes " + 
valueLength + " " +
           //     VectorizedBatchUtil.displayBytes(valueBytes, 0, valueLength));
@@ -450,7 +456,7 @@ public class ReduceRecordSource implements RecordSource {
           valueLazyBinaryDeserializeToRow.deserialize(batch, rowIdx);
         }
         rowIdx++;
-        if (rowIdx >= maxSize) {
+        if (rowIdx >= maxSize || batchBytes >= BATCH_BYTES) {
 
           // Batch is full.
           batch.size = rowIdx;
@@ -462,6 +468,7 @@ public class ReduceRecordSource implements RecordSource {
             batch.cols[i].reset();
           }
           rowIdx = 0;
+          batchBytes = 0;
         }
       }
       if (rowIdx > 0) {

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
index 486d43a..4242262 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezProcessor.java
@@ -23,6 +23,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.hadoop.hive.ql.exec.mapjoin.MapJoinMemoryExhaustionError;
+import org.apache.tez.runtime.api.TaskFailureType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -41,6 +43,8 @@ import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
+import com.google.common.base.Throwables;
+
 /**
  * Hive processor for Tez that forms the vertices in Tez and processes the 
data.
  * Does what ExecMapper and ExecReducer does for hive in MR framework.
@@ -189,8 +193,11 @@ public class TezProcessor extends 
AbstractLogicalIOProcessor {
     } catch (Throwable t) {
       originalThrowable = t;
     } finally {
-      if (originalThrowable != null && originalThrowable instanceof Error) {
-        LOG.error(StringUtils.stringifyException(originalThrowable));
+      if (originalThrowable != null && (originalThrowable instanceof Error ||
+        Throwables.getRootCause(originalThrowable) instanceof Error)) {
+        LOG.error("Cannot recover from this FATAL error", 
StringUtils.stringifyException(originalThrowable));
+        getContext().reportFailure(TaskFailureType.FATAL, originalThrowable,
+                      "Cannot recover from this error");
         throw new RuntimeException(originalThrowable);
       }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 8f45947..b4d8ffa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -76,6 +76,7 @@ public class TezSessionPoolManager {
   private static final Logger LOG = 
LoggerFactory.getLogger(TezSessionPoolManager.class);
   private static final Random rdm = new Random();
 
+  private volatile SessionState initSessionState;
   private BlockingQueue<TezSessionPoolSession> defaultQueuePool;
 
   /** Priority queue sorted by expiration time of live sessions that could be 
expired. */
@@ -136,6 +137,8 @@ public class TezSessionPoolManager {
 
   public void startPool() throws Exception {
     if (initialSessions.isEmpty()) return;
+    // Hive SessionState available at this point.
+    initSessionState = SessionState.get();
     int threadCount = Math.min(initialSessions.size(),
         HiveConf.getIntVar(initConf, 
ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS));
     Preconditions.checkArgument(threadCount > 0);
@@ -259,13 +262,27 @@ public class TezSessionPoolManager {
       expirationThread = new Thread(new Runnable() {
         @Override
         public void run() {
-          runExpirationThread();
+          try {
+            SessionState.setCurrentSessionState(initSessionState);
+            runExpirationThread();
+          } catch (Exception e) {
+            LOG.warn("Exception in TezSessionPool-expiration thread. Thread 
will shut down", e);
+          } finally {
+            LOG.info("TezSessionPool-expiration thread exiting");
+          }
         }
       }, "TezSessionPool-expiration");
       restartThread = new Thread(new Runnable() {
         @Override
         public void run() {
-          runRestartThread();
+          try {
+            SessionState.setCurrentSessionState(initSessionState);
+            runRestartThread();
+          } catch (Exception e) {
+            LOG.warn("Exception in TezSessionPool-cleanup thread. Thread will 
shut down", e);
+          } finally {
+            LOG.info("TezSessionPool-cleanup thread exiting");
+          }
         }
       }, "TezSessionPool-cleanup");
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index ed1ba9c..036e918 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -345,6 +345,7 @@ public class TezSessionState {
       String user, final Configuration conf) throws IOException {
     // TODO: parts of this should be moved out of TezSession to reuse the 
clients, but there's
     //       no good place for that right now (HIVE-13698).
+    // TODO: De-link from SessionState. A TezSession can be linked to 
different Hive Sessions via the pool.
     SessionState session = SessionState.get();
     boolean isInHs2 = session != null && session.isHiveServerQuery();
     Token<LlapTokenIdentifier> token = null;
@@ -438,6 +439,7 @@ public class TezSessionState {
   private void setupSessionAcls(Configuration tezConf, HiveConf hiveConf) 
throws
       IOException {
 
+    // TODO: De-link from SessionState. A TezSession can be linked to 
different Hive Sessions via the pool.
     String user = SessionState.getUserFromAuthenticator();
     UserGroupInformation loginUserUgi = UserGroupInformation.getLoginUser();
     String loginUser =
@@ -451,6 +453,7 @@ public class TezSessionState {
             TezConfiguration.TEZ_AM_MODIFY_ACLS, addHs2User, user, loginUser);
 
     if (LOG.isDebugEnabled()) {
+      // TODO: De-link from SessionState. A TezSession can be linked to 
different Hive Sessions via the pool.
       LOG.debug(
           "Setting Tez Session access for sessionId={} with viewAclString={}, 
modifyStr={}",
           SessionState.get().getSessionId(), viewStr, modifyStr);
@@ -592,6 +595,7 @@ public class TezSessionState {
    */
   private Path createTezDir(String sessionId) throws IOException {
     // tez needs its own scratch dir (per session)
+    // TODO: De-link from SessionState. A TezSession can be linked to 
different Hive Sessions via the pool.
     Path tezDir = new Path(SessionState.get().getHdfsScratchDirURIString(), 
TEZ_DIR);
     tezDir = new Path(tezDir, sessionId);
     FileSystem fs = tezDir.getFileSystem(conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 6c8bf29..1c84c6a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import java.io.Serializable;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -324,6 +328,14 @@ public class TezTask extends Task<TezWork> {
     }
   }
 
+  void checkOutputSpec(BaseWork work, JobConf jc) throws IOException {
+    for (Operator<?> op : work.getAllOperators()) {
+      if (op instanceof FileSinkOperator) {
+        ((FileSinkOperator) op).checkOutputSpecs(null, jc);
+      }
+    }
+  }
+
   DAG build(JobConf conf, TezWork work, Path scratchDir,
       LocalResource appJarLr, List<LocalResource> additionalLr, Context ctx)
       throws Exception {
@@ -357,7 +369,6 @@ public class TezTask extends Task<TezWork> {
     setAccessControlsForCurrentUser(dag, queryPlan.getQueryId(), conf);
 
     for (BaseWork w: ws) {
-
       boolean isFinal = work.getLeaves().contains(w);
 
       // translate work to vertex
@@ -379,6 +390,8 @@ public class TezTask extends Task<TezWork> {
             children.add(v);
           }
         }
+        JobConf parentConf = workToConf.get(unionWorkItems.get(0));
+        checkOutputSpec(w, parentConf);
 
         // create VertexGroup
         Vertex[] vertexArray = new Vertex[unionWorkItems.size()];
@@ -391,7 +404,7 @@ public class TezTask extends Task<TezWork> {
 
         // For a vertex group, all Outputs use the same Key-class, Val-class 
and partitioner.
         // Pick any one source vertex to figure out the Edge configuration.
-        JobConf parentConf = workToConf.get(unionWorkItems.get(0));
+       
 
         // now hook up the children
         for (BaseWork v: children) {
@@ -404,6 +417,7 @@ public class TezTask extends Task<TezWork> {
       } else {
         // Regular vertices
         JobConf wxConf = utils.initializeVertexConf(conf, ctx, w);
+        checkOutputSpec(w, wxConf);
         Vertex wx =
             utils.createVertex(wxConf, w, scratchDir, appJarLr, additionalLr, 
fs, ctx, !isFinal,
                 work, work.getVertexType(w));

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java
index eccbbb6..cd3404a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/Constants.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hive.ql.exec.tez.monitoring;
 
 import org.apache.hadoop.hive.common.log.InPlaceUpdate;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
index 1400be4..7cb74a5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/DAGSummary.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hive.ql.exec.tez.monitoring;
 
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -11,6 +28,7 @@ import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.api.DAG;
+import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
@@ -59,24 +77,58 @@ class DAGSummary implements PrintSummary {
     this.hiveCounters = hiveCounters(dagClient);
   }
 
+  private long hiveInputRecordsFromTezCounters(String vertexName, String 
inputVertexName) {
+    // Get the counters for the input vertex.
+    Set<StatusGetOpts> statusOptions = new HashSet<>(1);
+    statusOptions.add(StatusGetOpts.GET_COUNTERS);
+    VertexStatus inputVertexStatus = vertexStatus(statusOptions, 
inputVertexName);
+    final TezCounters inputVertexCounters = 
inputVertexStatus.getVertexCounters();
+
+    // eg, group name TaskCounter_Map_7_OUTPUT_Reducer_8, counter name 
OUTPUT_RECORDS
+    String groupName = formattedName("TaskCounter", inputVertexName, 
vertexName);
+    String counterName = "OUTPUT_RECORDS";
+
+    // Do not create counter if it does not exist -
+    // instead fall back to default behavior for determining input records.
+    TezCounter tezCounter = 
inputVertexCounters.getGroup(groupName).findCounter(counterName, false);
+    if (tezCounter == null) {
+      return -1;
+    } else {
+      return tezCounter.getValue();
+    }
+  }
+
+  private long hiveInputRecordsFromHiveCounters(String inputVertexName) {
+    // The record count from these counters may not be correct if the input 
vertex has
+    // edges to more than one vertex, since this value counts the records 
going to all
+    // destination vertices.
+
+    String intermediateRecordsCounterName = formattedName(
+        ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(),
+        inputVertexName
+    );
+    String recordsOutCounterName = 
formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(),
+        inputVertexName);
+    return hiveCounterValue(intermediateRecordsCounterName) + 
hiveCounterValue(recordsOutCounterName);
+  }
+
   private long hiveInputRecordsFromOtherVertices(String vertexName) {
     List<Vertex> inputVerticesList = 
dag.getVertex(vertexName).getInputVertices();
     long result = 0;
     for (Vertex inputVertex : inputVerticesList) {
-      String intermediateRecordsCounterName = formattedName(
-          ReduceSinkOperator.Counter.RECORDS_OUT_INTERMEDIATE.toString(),
-          inputVertex.getName()
-      );
-      String recordsOutCounterName = 
formattedName(FileSinkOperator.Counter.RECORDS_OUT.toString(),
-          inputVertex.getName());
-      result += (
-          hiveCounterValue(intermediateRecordsCounterName)
-              + hiveCounterValue(recordsOutCounterName)
-      );
+      long inputVertexRecords = hiveInputRecordsFromTezCounters(vertexName, 
inputVertex.getName());
+      if (inputVertexRecords < 0) {
+        inputVertexRecords = 
hiveInputRecordsFromHiveCounters(inputVertex.getName());
+      }
+      result += inputVertexRecords;
     }
     return result;
   }
 
+  private String formattedName(String counterName, String srcVertexName, 
String destVertexName) {
+    return String.format("%s_", counterName) + srcVertexName.replace(" ", "_") 
+ "_OUTPUT_" + destVertexName.replace(" ", "_");
+  }
+
   private String formattedName(String counterName, String vertexName) {
     return String.format("%s_", counterName) + vertexName.replace(" ", "_");
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
index 0a28edd..fd85504 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/FSCountersSummary.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hive.ql.exec.tez.monitoring;
 
 import org.apache.hadoop.fs.FileSystem;

http://git-wip-us.apache.org/repos/asf/hive/blob/187eb760/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java
index 81f1755..10e9f57 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/monitoring/LLAPioSummary.java
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.hadoop.hive.ql.exec.tez.monitoring;
 
 import org.apache.hadoop.hive.llap.counters.LlapIOCounters;

Reply via email to