HIVE-14636 : pass information from FSOP/TezTask to commit to take care of 
speculative execution and failed tasks (Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/87dcab47
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/87dcab47
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/87dcab47

Branch: refs/heads/hive-14535
Commit: 87dcab470f33ace818c775da6b0a9f18b10f66ac
Parents: 2cef25d
Author: Sergey Shelukhin <[email protected]>
Authored: Wed Aug 31 16:06:03 2016 -0700
Committer: Sergey Shelukhin <[email protected]>
Committed: Wed Aug 31 16:06:03 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/FileUtils.java    |   6 +-
 .../hadoop/hive/common/HiveStatsUtils.java      |  14 +-
 .../java/org/apache/hadoop/hive/ql/Context.java |   9 +-
 .../hive/ql/exec/AbstractFileMergeOperator.java |   4 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java   | 194 +++++++++---
 .../apache/hadoop/hive/ql/exec/MoveTask.java    | 315 ++++++++++---------
 .../apache/hadoop/hive/ql/exec/Utilities.java   |  10 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   2 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |   5 +-
 .../hadoop/hive/ql/parse/TypeCheckCtx.java      |   2 +-
 .../hadoop/hive/ql/plan/FileSinkDesc.java       |   9 +
 .../hadoop/hive/ql/plan/LoadFileDesc.java       |   2 +-
 .../hadoop/hive/ql/plan/LoadTableDesc.java      |  19 +-
 ql/src/test/queries/clientpositive/mm_current.q |  18 +-
 .../clientpositive/llap/mm_current.q.out        | 133 +++++++-
 15 files changed, 517 insertions(+), 225 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java 
b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index 3ed2d08..ad43610 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -329,9 +329,13 @@ public final class FileUtils {
    */
   public static void listStatusRecursively(FileSystem fs, FileStatus 
fileStatus,
       List<FileStatus> results) throws IOException {
+    listStatusRecursively(fs, fileStatus, HIDDEN_FILES_PATH_FILTER, results);
+  }
 
+  public static void listStatusRecursively(FileSystem fs, FileStatus 
fileStatus,
+      PathFilter filter, List<FileStatus> results) throws IOException {
     if (fileStatus.isDir()) {
-      for (FileStatus stat : fs.listStatus(fileStatus.getPath(), 
HIDDEN_FILES_PATH_FILTER)) {
+      for (FileStatus stat : fs.listStatus(fileStatus.getPath(), filter)) {
         listStatusRecursively(fs, stat, results);
       }
     } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java 
b/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java
index 7c9d72f..111d99c 100644
--- a/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,15 +51,20 @@ public class HiveStatsUtils {
    * @return array of FileStatus
    * @throws IOException
    */
-  public static FileStatus[] getFileStatusRecurse(Path path, int level, 
FileSystem fs)
+  public static FileStatus[] getFileStatusRecurse(Path path, int level,  
FileSystem fs)
       throws IOException {
+    return getFileStatusRecurse(path, level, fs, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
+  }
+
+  public static FileStatus[] getFileStatusRecurse(
+      Path path, int level, FileSystem fs, PathFilter filter) throws 
IOException {
 
     // if level is <0, the return all files/directories under the specified 
path
-    if ( level < 0) {
+    if (level < 0) {
       List<FileStatus> result = new ArrayList<FileStatus>();
       try {
         FileStatus fileStatus = fs.getFileStatus(path);
-        FileUtils.listStatusRecursively(fs, fileStatus, result);
+        FileUtils.listStatusRecursively(fs, fileStatus, filter, result);
       } catch (IOException e) {
         // globStatus() API returns empty FileStatus[] when the specified path
         // does not exist. But getFileStatus() throw IOException. To mimic the
@@ -75,7 +81,7 @@ public class HiveStatsUtils {
       sb.append(Path.SEPARATOR).append("*");
     }
     Path pathPattern = new Path(path, sb.toString());
-    return fs.globStatus(pathPattern, FileUtils.HIDDEN_FILES_PATH_FILTER);
+    return fs.globStatus(pathPattern, filter);
   }
 
   public static int getNumBitVectorsForNDVEstimation(Configuration conf) 
throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Context.java 
b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
index ceb257c..1013f7c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Context.java
@@ -233,7 +233,8 @@ public class Context {
       // Append task specific info to stagingPathName, instead of creating a 
sub-directory.
       // This way we don't have to worry about deleting the stagingPathName 
separately at
       // end of query execution.
-      dir = fs.makeQualified(new Path(stagingPathName + "_" + this.executionId 
+ "-" + TaskRunner.getTaskRunnerID()));
+      // TODO# HERE
+      dir = fs.makeQualified(new Path(stagingPathName + "_" + 
getExecutionPrefix()));
 
       LOG.debug("Created staging dir = " + dir + " for path = " + inputPath);
 
@@ -819,6 +820,10 @@ public class Context {
     this.skipTableMasking = skipTableMasking;
   }
 
+  public String getExecutionPrefix() {
+    return this.executionId + "-" + TaskRunner.getTaskRunnerID();
+  }
+
   public ExplainConfiguration getExplainConfig() {
     return explainConfig;
   }
@@ -827,7 +832,7 @@ public class Context {
     this.explainConfig = explainConfig;
   }
 
-  public void resetOpContext(){
+  public void resetOpContext() {
     opContext = new CompilationOpContext();
     sequencer = new AtomicInteger();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
index dfad6c1..40c784b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
@@ -254,8 +254,8 @@ public abstract class AbstractFileMergeOperator<T extends 
FileMergeDesc>
       Path outputDir = conf.getOutputPath();
       FileSystem fs = outputDir.getFileSystem(hconf);
       Path backupPath = backupOutputPath(fs, outputDir);
-      Utilities
-          .mvFileToFinalPath(outputDir, hconf, success, LOG, conf.getDpCtx(),
+      // TODO# merge-related move
+      Utilities.mvFileToFinalPath(outputDir, hconf, success, LOG, 
conf.getDpCtx(),
               null, reporter);
       if (success) {
         LOG.info("jobCloseOp moved merged files to output dir: " + outputDir);

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 1f5dfea..b8a2c5a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -27,16 +27,22 @@ import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -79,6 +85,7 @@ import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hive.common.util.HiveStringUtils;
 import org.slf4j.Logger;
@@ -92,6 +99,7 @@ import com.google.common.collect.Lists;
 public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     Serializable {
 
+  private static final String MANIFEST_EXTENSION = ".manifest";
   public static final Logger LOG = 
LoggerFactory.getLogger(FileSinkOperator.class);
   private static final boolean isInfoEnabled = LOG.isInfoEnabled();
   private static final boolean isDebugEnabled = LOG.isDebugEnabled();
@@ -165,7 +173,7 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
       } 
       Utilities.LOG14535.info("new FSPaths for " + numFiles + " files, 
dynParts = " + bDynParts
           + ": tmpPath " + tmpPath + ", task path " + taskOutputTempPath
-          + " (spec path " + specPath + ")", new Exception());
+          + " (spec path " + specPath + ")"/*, new Exception()*/);
 
       outPaths = new Path[numFiles];
       finalPaths = new Path[numFiles];
@@ -187,7 +195,7 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     /**
      * Update the final paths according to tmpPath.
      */
-    public Path getFinalPath(String taskId, Path tmpPath, String extension) {
+    private Path getFinalPath(String taskId, Path tmpPath, String extension) {
       if (extension != null) {
         return new Path(tmpPath, taskId + extension);
       } else {
@@ -218,41 +226,64 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     }
 
     private void commit(FileSystem fs) throws HiveException {
-      if (isMmTable) return;  // TODO#: need to propagate to MoveTask instead
+      List<Path> commitPaths = null;
+      if (isMmTable) {
+        commitPaths = new ArrayList<>();
+      }
       for (int idx = 0; idx < outPaths.length; ++idx) {
         try {
-          if ((bDynParts || isSkewedStoredAsSubDirectories)
-              && !fs.exists(finalPaths[idx].getParent())) {
-            Utilities.LOG14535.info("commit making path for dyn/skew: " + 
finalPaths[idx].getParent());
-            fs.mkdirs(finalPaths[idx].getParent());
-          }
-          boolean needToRename = true;
-          if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
-              conf.getWriteType() == AcidUtils.Operation.DELETE) {
-            // If we're updating or deleting there may be no file to close.  
This can happen
-            // because the where clause strained out all of the records for a 
given bucket.  So
-            // before attempting the rename below, check if our file exists.  
If it doesn't,
-            // then skip the rename.  If it does try it.  We could just 
blindly try the rename
-            // and avoid the extra stat, but that would mask other errors.
-            try {
-              if (outPaths[idx] != null) {
-                FileStatus stat = fs.getFileStatus(outPaths[idx]);
-              }
-            } catch (FileNotFoundException fnfe) {
-              needToRename = false;
-            }
-          }
-          Utilities.LOG14535.info("commit potentially moving " + outPaths[idx] 
+ " to " + finalPaths[idx]);
-          if (needToRename && outPaths[idx] != null && 
!fs.rename(outPaths[idx], finalPaths[idx])) {
-            throw new HiveException("Unable to rename output from: " +
-                outPaths[idx] + " to: " + finalPaths[idx]);
-          }
-          updateProgress();
+          commitOneOutPath(idx, fs, commitPaths);
         } catch (IOException e) {
           throw new HiveException("Unable to rename output from: " +
               outPaths[idx] + " to: " + finalPaths[idx], e);
         }
       }
+      if (isMmTable) {
+        Path manifestPath = new Path(specPath, "_tmp." + getPrefixedTaskId() + 
MANIFEST_EXTENSION);
+        Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with 
" + commitPaths);
+        try {
+          try (FSDataOutputStream out = fs.create(manifestPath)) {
+            out.writeInt(commitPaths.size());
+            for (Path path : commitPaths) {
+              out.writeUTF(path.toString());
+            }
+          }
+        } catch (IOException e) {
+          throw new HiveException(e);
+        }
+      }
+    }
+
+    private String getPrefixedTaskId() {
+      return conf.getExecutionPrefix() + "_" + taskId;
+    }
+
+    private void commitOneOutPath(int idx, FileSystem fs, List<Path> 
commitPaths)
+        throws IOException, HiveException {
+      if ((bDynParts || isSkewedStoredAsSubDirectories)
+          && !fs.exists(finalPaths[idx].getParent())) {
+        Utilities.LOG14535.info("commit making path for dyn/skew: " + 
finalPaths[idx].getParent());
+        fs.mkdirs(finalPaths[idx].getParent());
+      }
+      // If we're updating or deleting there may be no file to close.  This 
can happen
+      // because the where clause strained out all of the records for a given 
bucket.  So
+      // before attempting the rename below, check if our file exists.  If it 
doesn't,
+      // then skip the rename.  If it does try it.  We could just blindly try 
the rename
+      // and avoid the extra stat, but that would mask other errors.
+      boolean needToRename = (conf.getWriteType() != 
AcidUtils.Operation.UPDATE &&
+          conf.getWriteType() != AcidUtils.Operation.DELETE) || 
fs.exists(outPaths[idx]);
+      if (needToRename && outPaths[idx] != null) {
+        Utilities.LOG14535.info("committing " + outPaths[idx] + " to " + 
finalPaths[idx] + " (" + isMmTable + ")");
+        if (isMmTable) {
+          assert outPaths[idx].equals(finalPaths[idx]);
+          commitPaths.add(outPaths[idx]);
+        } else if (!fs.rename(outPaths[idx], finalPaths[idx])) {
+          throw new HiveException("Unable to rename output from: "
+              + outPaths[idx] + " to: " + finalPaths[idx]);
+        }
+      }
+
+      updateProgress();
     }
 
     public void abortWriters(FileSystem fs, boolean abort, boolean delete) 
throws HiveException {
@@ -297,10 +328,10 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
           outPaths[filesIdx] = getTaskOutPath(taskId);
         } else {
           if (!bDynParts && !isSkewedStoredAsSubDirectories) {
-            finalPaths[filesIdx] = getFinalPath(taskId, specPath, extension);
+            finalPaths[filesIdx] = getFinalPath(getPrefixedTaskId(), specPath, 
extension);
           } else {
             // TODO# wrong!
-            finalPaths[filesIdx] = getFinalPath(taskId, specPath, extension);
+            finalPaths[filesIdx] = getFinalPath(getPrefixedTaskId(), specPath, 
extension);
           }
           outPaths[filesIdx] = finalPaths[filesIdx];
         }
@@ -638,7 +669,7 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
       fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable, 
isSkewedStoredAsSubDirectories);
       Utilities.LOG14535.info("createBucketForFileIdx " + filesIdx + ": final 
path " + fsp.finalPaths[filesIdx]
           + "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath 
+ ", tmp path "
-          + fsp.getTmpPath() + ", task " + taskId + ")", new Exception());
+          + fsp.getTmpPath() + ", task " + taskId + ")"/*, new Exception()*/);
 
       if (isInfoEnabled) {
         LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]);
@@ -1150,9 +1181,13 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
         DynamicPartitionCtx dpCtx = conf.getDynPartCtx();
         if (conf.isLinkedFileSink() && (dpCtx != null)) {
           specPath = conf.getParentDir();
+          Utilities.LOG14535.info("Setting specPath to " + specPath + " for 
dynparts");
+        }
+        if (!conf.isMmTable()) {
+          Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, 
conf, reporter); // TODO# other callers
+        } else {
+          handleMmTable(specPath, hconf, success, dpCtx, conf, reporter);
         }
-        Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf,
-          reporter);
       }
     } catch (IOException e) {
       throw new HiveException(e);
@@ -1160,6 +1195,95 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     super.jobCloseOp(hconf, success);
   }
 
+  private static class ExecPrefixPathFilter implements PathFilter {
+    private final String prefix, tmpPrefix;
+    public ExecPrefixPathFilter(String prefix) {
+      this.prefix = prefix;
+      this.tmpPrefix = "_tmp." + prefix;
+    }
+
+    @Override
+    public boolean accept(Path path) {
+      String name = path.getName();
+      return name.startsWith(prefix) || name.startsWith(tmpPrefix);
+    }
+  }
+
+
+  private void handleMmTable(Path specPath, Configuration hconf, boolean 
success,
+      DynamicPartitionCtx dpCtx, FileSinkDesc conf, Reporter reporter)
+          throws IOException, HiveException {
+    FileSystem fs = specPath.getFileSystem(hconf);
+    int targetLevel = (dpCtx == null) ? 1 : dpCtx.getNumDPCols();
+    if (!success) {
+      FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(
+          specPath, targetLevel, fs, new 
ExecPrefixPathFilter(conf.getExecutionPrefix()));
+      for (FileStatus status : statuses) {
+        Utilities.LOG14535.info("Deleting " + status.getPath() + " on 
failure");
+        tryDelete(fs, status.getPath());
+      }
+      return;
+    }
+    FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(
+        specPath, targetLevel, fs, new 
ExecPrefixPathFilter(conf.getExecutionPrefix()));
+    if (statuses == null) return;
+    LinkedList<FileStatus> results = new LinkedList<>();
+    List<Path> manifests = new ArrayList<>(statuses.length);
+    for (FileStatus status : statuses) {
+      if (status.getPath().getName().endsWith(MANIFEST_EXTENSION)) {
+        manifests.add(status.getPath());
+      } else {
+        results.add(status);
+      }
+    }
+    HashSet<String> committed = new HashSet<>();
+    for (Path mfp : manifests) {
+      try (FSDataInputStream mdis = fs.open(mfp)) {
+        int fileCount = mdis.readInt();
+        for (int i = 0; i < fileCount; ++i) {
+          String nextFile = mdis.readUTF();
+          if (!committed.add(nextFile)) {
+            throw new HiveException(nextFile + " was specified in multiple 
manifests");
+          }
+        }
+      }
+    }
+    Iterator<FileStatus> iter = results.iterator();
+    while (iter.hasNext()) {
+      FileStatus rfs = iter.next();
+      if (!committed.remove(rfs.getPath().toString())) {
+        iter.remove();
+        Utilities.LOG14535.info("Deleting " + rfs.getPath() + " that was not 
committed");
+        tryDelete(fs, rfs.getPath());
+      }
+    }
+    if (!committed.isEmpty()) {
+      throw new HiveException("The following files were committed but not 
found: " + committed);
+    }
+    for (Path mfp : manifests) {
+      Utilities.LOG14535.info("Deleting manifest " + mfp);
+      tryDelete(fs, mfp);
+    }
+
+    if (results.isEmpty()) return;
+    FileStatus[] finalResults = results.toArray(new 
FileStatus[results.size()]);
+
+    List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(
+        fs, finalResults, dpCtx, conf, hconf);
+    // create empty buckets if necessary
+    if (emptyBuckets.size() > 0) {
+      Utilities.createEmptyBuckets(hconf, emptyBuckets, conf, reporter);
+    }
+  }
+
+  private void tryDelete(FileSystem fs, Path path) {
+    try {
+      fs.delete(path, false);
+    } catch (IOException ex) {
+      LOG.error("Failed to delete " + path, ex);
+    }
+  }
+
   @Override
   public OperatorType getType() {
     return OperatorType.FILESINK;

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 2ab97f7..e3646da 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -241,6 +241,18 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
     return false;
   }
 
+  private final static class TaskInformation {
+    public List<BucketCol> bucketCols = null;
+    public List<SortCol> sortCols = null;
+    public int numBuckets = -1;
+    public Task task;
+    public String path;
+    public TaskInformation(Task task, String path) {
+      this.task = task;
+      this.path = path;
+    }
+  }
+
   @Override
   public int execute(DriverContext driverContext) {
 
@@ -318,155 +330,15 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
           LOG.info("Partition is: " + tbd.getPartitionSpec().toString());
 
           // Check if the bucketing and/or sorting columns were inferred
-          List<BucketCol> bucketCols = null;
-          List<SortCol> sortCols = null;
-          int numBuckets = -1;
-          Task task = this;
-          String path = tbd.getSourcePath().toUri().toString();
-          // Find the first ancestor of this MoveTask which is some form of 
map reduce task
-          // (Either standard, local, or a merge)
-          while (task.getParentTasks() != null && task.getParentTasks().size() 
== 1) {
-            task = (Task)task.getParentTasks().get(0);
-            // If it was a merge task or a local map reduce task, nothing can 
be inferred
-            if (task instanceof MergeFileTask || task instanceof 
MapredLocalTask) {
-              break;
-            }
-
-            // If it's a standard map reduce task, check what, if anything, it 
inferred about
-            // the directory this move task is moving
-            if (task instanceof MapRedTask) {
-              MapredWork work = (MapredWork)task.getWork();
-              MapWork mapWork = work.getMapWork();
-              bucketCols = mapWork.getBucketedColsByDirectory().get(path);
-              sortCols = mapWork.getSortedColsByDirectory().get(path);
-              if (work.getReduceWork() != null) {
-                numBuckets = work.getReduceWork().getNumReduceTasks();
-              }
-
-              if (bucketCols != null || sortCols != null) {
-                // This must be a final map reduce task (the task containing 
the file sink
-                // operator that writes the final output)
-                assert work.isFinalMapRed();
-              }
-              break;
-            }
-
-            // If it's a move task, get the path the files were moved from, 
this is what any
-            // preceding map reduce task inferred information about, and 
moving does not invalidate
-            // those assumptions
-            // This can happen when a conditional merge is added before the 
final MoveTask, but the
-            // condition for merging is not met, see GenMRFileSink1.
-            if (task instanceof MoveTask) {
-              if (((MoveTask)task).getWork().getLoadFileWork() != null) {
-                path = 
((MoveTask)task).getWork().getLoadFileWork().getSourcePath().toUri().toString();
-              }
-            }
-          }
+          TaskInformation ti = new TaskInformation(this, 
tbd.getSourcePath().toUri().toString());
+          inferTaskInformation(ti);
           // deal with dynamic partitions
           DynamicPartitionCtx dpCtx = tbd.getDPCtx();
           if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic 
partitions
-
-            List<LinkedHashMap<String, String>> dps = 
Utilities.getFullDPSpecs(conf, dpCtx);
-
-            // publish DP columns to its subscribers
-            if (dps != null && dps.size() > 0) {
-              pushFeed(FeedType.DYNAMIC_PARTITIONS, dps);
-            }
-            console.printInfo(System.getProperty("line.separator"));
-            long startTime = System.currentTimeMillis();
-            // load the list of DP partitions and return the list of partition 
specs
-            // TODO: In a follow-up to HIVE-1361, we should refactor 
loadDynamicPartitions
-            // to use Utilities.getFullDPSpecs() to get the list of full 
partSpecs.
-            // After that check the number of DPs created to not exceed the 
limit and
-            // iterate over it and call loadPartition() here.
-            // The reason we don't do inside HIVE-1361 is the latter is large 
and we
-            // want to isolate any potential issue it may introduce.
-            Map<Map<String, String>, Partition> dp =
-              db.loadDynamicPartitions(
-                tbd.getSourcePath(),
-                tbd.getTable().getTableName(),
-                tbd.getPartitionSpec(),
-                tbd.getReplace(),
-                dpCtx.getNumDPCols(),
-                isSkewedStoredAsDirs(tbd),
-                work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID,
-                SessionState.get().getTxnMgr().getCurrentTxnId(), 
hasFollowingStatsTask(),
-                work.getLoadTableWork().getWriteType());
-
-            console.printInfo("\t Time taken to load dynamic partitions: "  +
-                (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
-
-            if (dp.size() == 0 && 
conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
-              throw new HiveException("This query creates no partitions." +
-                  " To turn off this error, set 
hive.error.on.empty.partition=false.");
-            }
-
-            startTime = System.currentTimeMillis();
-            // for each partition spec, get the partition
-            // and put it to WriteEntity for post-exec hook
-            for(Map.Entry<Map<String, String>, Partition> entry : 
dp.entrySet()) {
-              Partition partn = entry.getValue();
-
-              if (bucketCols != null || sortCols != null) {
-                updatePartitionBucketSortColumns(
-                    db, table, partn, bucketCols, numBuckets, sortCols);
-              }
-
-              WriteEntity enty = new WriteEntity(partn,
-                  (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
-                      WriteEntity.WriteType.INSERT));
-              if (work.getOutputs() != null) {
-                work.getOutputs().add(enty);
-              }
-              // Need to update the queryPlan's output as well so that 
post-exec hook get executed.
-              // This is only needed for dynamic partitioning since for SP the 
the WriteEntity is
-              // constructed at compile time and the queryPlan already 
contains that.
-              // For DP, WriteEntity creation is deferred at this stage so we 
need to update
-              // queryPlan here.
-              if (queryPlan.getOutputs() == null) {
-                queryPlan.setOutputs(new LinkedHashSet<WriteEntity>());
-              }
-              queryPlan.getOutputs().add(enty);
-
-              // update columnar lineage for each partition
-              dc = new DataContainer(table.getTTable(), partn.getTPartition());
-
-              // Don't set lineage on delete as we don't have all the columns
-              if (SessionState.get() != null &&
-                  work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.DELETE &&
-                  work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.UPDATE) {
-                
SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), dc,
-                    table.getCols());
-              }
-              LOG.info("\tLoading partition " + entry.getKey());
-            }
-            console.printInfo("\t Time taken for adding to write entity : " +
-                (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
-            dc = null; // reset data container to prevent it being added again.
+            dc = handleDynParts(db, table, tbd, ti, dpCtx);
           } else { // static partitions
-            List<String> partVals = 
MetaStoreUtils.getPvals(table.getPartCols(),
-                tbd.getPartitionSpec());
-            db.validatePartitionNameCharacters(partVals);
-            Utilities.LOG14535.info("loadPartition called from " + 
tbd.getSourcePath() + " into " + tbd.getTable());
-            db.loadPartition(tbd.getSourcePath(), 
tbd.getTable().getTableName(),
-                tbd.getPartitionSpec(), tbd.getReplace(),
-                tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), 
work.isSrcLocal(),
-                work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask(), tbd.isMmTable());
-            Partition partn = db.getPartition(table, tbd.getPartitionSpec(), 
false);
-
-            if (bucketCols != null || sortCols != null) {
-              updatePartitionBucketSortColumns(db, table, partn, bucketCols,
-                  numBuckets, sortCols);
-            }
-
-            dc = new DataContainer(table.getTTable(), partn.getTPartition());
-            // add this partition to post-execution hook
-            if (work.getOutputs() != null) {
-              work.getOutputs().add(new WriteEntity(partn,
-                  (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE
-                      : WriteEntity.WriteType.INSERT)));
-            }
-         }
+            dc = handleStaticParts(db, table, tbd, ti);
+          }
         }
         if (SessionState.get() != null && dc != null) {
           // If we are doing an update or a delete the number of columns in 
the table will not
@@ -500,6 +372,159 @@ public class MoveTask extends Task<MoveWork> implements 
Serializable {
     }
   }
 
+  private DataContainer handleStaticParts(Hive db, Table table, LoadTableDesc 
tbd,
+      TaskInformation ti) throws HiveException, IOException, 
InvalidOperationException {
+    List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),  
tbd.getPartitionSpec());
+    db.validatePartitionNameCharacters(partVals);
+    Utilities.LOG14535.info("loadPartition called from " + tbd.getSourcePath() 
+ " into " + tbd.getTable());
+    db.loadPartition(tbd.getSourcePath(), tbd.getTable().getTableName(),
+        tbd.getPartitionSpec(), tbd.getReplace(),
+        tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), 
work.isSrcLocal(),
+        work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.NOT_ACID, hasFollowingStatsTask(), tbd.isMmTable());
+    Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false);
+
+    if (ti.bucketCols != null || ti.sortCols != null) {
+      updatePartitionBucketSortColumns(db, table, partn, ti.bucketCols,
+          ti.numBuckets, ti.sortCols);
+    }
+
+    DataContainer dc = new DataContainer(table.getTTable(), 
partn.getTPartition());
+    // add this partition to post-execution hook
+    if (work.getOutputs() != null) {
+      work.getOutputs().add(new WriteEntity(partn,
+          (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE
+              : WriteEntity.WriteType.INSERT)));
+    }
+    return dc;
+  }
+
+  private DataContainer handleDynParts(Hive db, Table table, LoadTableDesc tbd,
+      TaskInformation ti, DynamicPartitionCtx dpCtx) throws HiveException,
+      IOException, InvalidOperationException {
+    DataContainer dc;
+    List<LinkedHashMap<String, String>> dps = Utilities.getFullDPSpecs(conf, 
dpCtx);
+
+    // publish DP columns to its subscribers
+    if (dps != null && dps.size() > 0) {
+      pushFeed(FeedType.DYNAMIC_PARTITIONS, dps);
+    }
+    console.printInfo(System.getProperty("line.separator"));
+    long startTime = System.currentTimeMillis();
+    // load the list of DP partitions and return the list of partition specs
+    // TODO: In a follow-up to HIVE-1361, we should refactor 
loadDynamicPartitions
+    // to use Utilities.getFullDPSpecs() to get the list of full partSpecs.
+    // After that check the number of DPs created to not exceed the limit and
+    // iterate over it and call loadPartition() here.
+    // The reason we don't do inside HIVE-1361 is the latter is large and we
+    // want to isolate any potential issue it may introduce.
+    Map<Map<String, String>, Partition> dp =
+      db.loadDynamicPartitions(
+        tbd.getSourcePath(),
+        tbd.getTable().getTableName(),
+        tbd.getPartitionSpec(),
+        tbd.getReplace(),
+        dpCtx.getNumDPCols(),
+        isSkewedStoredAsDirs(tbd),
+        work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID,
+        SessionState.get().getTxnMgr().getCurrentTxnId(), 
hasFollowingStatsTask(),
+        work.getLoadTableWork().getWriteType());
+
+    console.printInfo("\t Time taken to load dynamic partitions: "  +
+        (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
+
+    if (dp.size() == 0 && 
conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
+      throw new HiveException("This query creates no partitions." +
+          " To turn off this error, set hive.error.on.empty.partition=false.");
+    }
+
+    startTime = System.currentTimeMillis();
+    // for each partition spec, get the partition
+    // and put it to WriteEntity for post-exec hook
+    for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) {
+      Partition partn = entry.getValue();
+
+      if (ti.bucketCols != null || ti.sortCols != null) {
+        updatePartitionBucketSortColumns(
+            db, table, partn, ti.bucketCols, ti.numBuckets, ti.sortCols);
+      }
+
+      WriteEntity enty = new WriteEntity(partn,
+          (tbd.getReplace() ? WriteEntity.WriteType.INSERT_OVERWRITE :
+              WriteEntity.WriteType.INSERT));
+      if (work.getOutputs() != null) {
+        work.getOutputs().add(enty);
+      }
+      // Need to update the queryPlan's output as well so that post-exec hook 
get executed.
+      // This is only needed for dynamic partitioning since for SP the the 
WriteEntity is
+      // constructed at compile time and the queryPlan already contains that.
+      // For DP, WriteEntity creation is deferred at this stage so we need to 
update
+      // queryPlan here.
+      if (queryPlan.getOutputs() == null) {
+        queryPlan.setOutputs(new LinkedHashSet<WriteEntity>());
+      }
+      queryPlan.getOutputs().add(enty);
+
+      // update columnar lineage for each partition
+      dc = new DataContainer(table.getTTable(), partn.getTPartition());
+
+      // Don't set lineage on delete as we don't have all the columns
+      if (SessionState.get() != null &&
+          work.getLoadTableWork().getWriteType() != AcidUtils.Operation.DELETE 
&&
+          work.getLoadTableWork().getWriteType() != 
AcidUtils.Operation.UPDATE) {
+        SessionState.get().getLineageState().setLineage(tbd.getSourcePath(), 
dc,
+            table.getCols());
+      }
+      LOG.info("\tLoading partition " + entry.getKey());
+    }
+    console.printInfo("\t Time taken for adding to write entity : " +
+        (System.currentTimeMillis() - startTime)/1000.0 + " seconds");
+    dc = null; // reset data container to prevent it being added again.
+    return dc;
+  }
+
+  private void inferTaskInformation(TaskInformation ti) {
+    // Find the first ancestor of this MoveTask which is some form of map 
reduce task
+    // (Either standard, local, or a merge)
+    while (ti.task.getParentTasks() != null && ti.task.getParentTasks().size() 
== 1) {
+      ti.task = (Task)ti.task.getParentTasks().get(0);
+      // If it was a merge task or a local map reduce task, nothing can be 
inferred
+      if (ti.task instanceof MergeFileTask || ti.task instanceof 
MapredLocalTask) {
+        break;
+      }
+
+      // If it's a standard map reduce task, check what, if anything, it 
inferred about
+      // the directory this move task is moving
+      if (ti.task instanceof MapRedTask) {
+        MapredWork work = (MapredWork)ti.task.getWork();
+        MapWork mapWork = work.getMapWork();
+        ti.bucketCols = mapWork.getBucketedColsByDirectory().get(ti.path);
+        ti.sortCols = mapWork.getSortedColsByDirectory().get(ti.path);
+        if (work.getReduceWork() != null) {
+          ti.numBuckets = work.getReduceWork().getNumReduceTasks();
+        }
+
+        if (ti.bucketCols != null || ti.sortCols != null) {
+          // This must be a final map reduce task (the task containing the 
file sink
+          // operator that writes the final output)
+          assert work.isFinalMapRed();
+        }
+        break;
+      }
+
+      // If it's a move task, get the path the files were moved from, this is 
what any
+      // preceding map reduce task inferred information about, and moving does 
not invalidate
+      // those assumptions
+      // This can happen when a conditional merge is added before the final 
MoveTask, but the
+      // condition for merging is not met, see GenMRFileSink1.
+      if (ti.task instanceof MoveTask) {
+        MoveTask mt = (MoveTask)ti.task;
+        if (mt.getWork().getLoadFileWork() != null) {
+          ti.path = 
mt.getWork().getLoadFileWork().getSourcePath().toUri().toString();
+        }
+      }
+    }
+  }
+
   private void checkFileFormats(Hive db, LoadTableDesc tbd, Table table)
       throws HiveException {
     if (work.getCheckFileFormat()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index a7f7b9f..427f067 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1409,7 +1409,6 @@ public final class Utilities {
     Path tmpPath = Utilities.toTempPath(specPath);
     Path taskTmpPath = Utilities.toTaskTempPath(specPath);
     if (success) {
-      // TODO# specPath instead of tmpPath
       FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(
           tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs);
       if(statuses != null && statuses.length > 0) {
@@ -1423,8 +1422,6 @@ public final class Utilities {
         Utilities.LOG14535.info("Moving tmp dir: " + tmpPath + " to: " + 
specPath);
         Utilities.renameOrMoveFiles(fs, tmpPath, specPath);
       }
-      List<Path> paths = new ArrayList<>();
-      // TODO#: HERE listFilesToCommit(specPath, fs, paths);
     } else {
       Utilities.LOG14535.info("deleting tmpPath " + tmpPath);
       fs.delete(tmpPath, true);
@@ -1445,7 +1442,7 @@ public final class Utilities {
    * @throws HiveException
    * @throws IOException
    */
-  private static void createEmptyBuckets(Configuration hconf, List<Path> paths,
+  static void createEmptyBuckets(Configuration hconf, List<Path> paths,
       FileSinkDesc conf, Reporter reporter)
       throws HiveException, IOException {
 
@@ -1586,19 +1583,18 @@ public final class Utilities {
 
     for (FileStatus one : items) {
       if (isTempPath(one)) {
-        Utilities.LOG14535.info("removeTempOrDuplicateFiles deleting " + 
one.getPath(), new Exception());
+        Utilities.LOG14535.info("removeTempOrDuplicateFiles deleting " + 
one.getPath()/*, new Exception()*/);
         if (!fs.delete(one.getPath(), true)) {
           throw new IOException("Unable to delete tmp file: " + one.getPath());
         }
       } else {
         String taskId = getPrefixedTaskIdFromFilename(one.getPath().getName());
-        Utilities.LOG14535.info("removeTempOrDuplicateFiles pondering " + 
one.getPath() + ", taskId " + taskId, new Exception());
+        Utilities.LOG14535.info("removeTempOrDuplicateFiles pondering " + 
one.getPath() + ", taskId " + taskId/*, new Exception()*/);
 
         FileStatus otherFile = taskIdToFile.get(taskId);
         if (otherFile == null) {
           taskIdToFile.put(taskId, one);
         } else {
-          // TODO# file choice!
           // Compare the file sizes of all the attempt files for the same 
task, the largest win
           // any attempt files could contain partial results (due to task 
failures or
           // speculative runs), but the largest should be the correct one 
since the result

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 7d8c961..e43c600 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1878,7 +1878,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
               Utilities.LOG14535.info("loadPartition called for DPP from " + 
partPath + " to " + tbl.getTableName());
               Partition newPartition = loadPartition(partPath, tbl, 
fullPartSpec,
                   replace, true, listBucketingEnabled,
-                  false, isAcid, hasFollowingStatsTask, false); // TODO# here
+                  false, isAcid, hasFollowingStatsTask, false); // TODO# 
special case #N
               partitionsMap.put(fullPartSpec, newPartition);
 
               if (inPlaceEligible) {

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 6ed379a..499530e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -6658,7 +6658,7 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         acidOp = getAcidType(table_desc.getOutputFileFormatClass());
         checkAcidConstraints(qb, table_desc, dest_tab);
       }
-      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), 
acidOp);
+      ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), 
acidOp, isMmTable);
       ltd.setReplace(!qb.getParseInfo().isInsertIntoTable(dest_tab.getDbName(),
           dest_tab.getTableName()));
       ltd.setLbCtx(lbCtx);
@@ -6860,6 +6860,9 @@ public class SemanticAnalyzer extends 
BaseSemanticAnalyzer {
         dest_path, currentTableId, destTableIsAcid, destTableIsTemporary,
         destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
         canBeMerged, isMmTable);
+    if (isMmTable) {
+      fileSinkDesc.setExecutionPrefix(ctx.getExecutionPrefix());
+    }
 
     Operator output = putOpInsertMap(OperatorFactory.getAndMakeChild(
         fileSinkDesc, fsRS, input), inputRR);

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
index 02896ff..26f1d70 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckCtx.java
@@ -160,7 +160,7 @@ public class TypeCheckCtx implements NodeProcessorCtx {
     if (LOG.isDebugEnabled()) {
       // Logger the callstack from which the error has been set.
       LOG.debug("Setting error: [" + error + "] from "
-          + ((errorSrcNode == null) ? "null" : errorSrcNode.toStringTree()), 
new Exception());
+          + ((errorSrcNode == null) ? "null" : errorSrcNode.toStringTree())/*, 
new Exception()*/);
     }
     this.error = error;
     this.errorSrcNode = errorSrcNode;

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index 0a4848b..f51999d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -97,6 +97,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
   private Path destPath;
   private boolean isHiveServerQuery;
   private boolean isMmTable;
+  private String executionPrefix;
 
   public FileSinkDesc() {
   }
@@ -158,6 +159,7 @@ public class FileSinkDesc extends AbstractOperatorDesc {
     ret.setWriteType(writeType);
     ret.setTransactionId(txnId);
     ret.setStatsTmpDir(statsTmpDir);
+    ret.setExecutionPrefix(executionPrefix);
     return ret;
   }
 
@@ -481,4 +483,11 @@ public class FileSinkDesc extends AbstractOperatorDesc {
     this.statsTmpDir = statsCollectionTempDir;
   }
 
+  public String getExecutionPrefix() {
+    return this.executionPrefix;
+  }
+
+  public void setExecutionPrefix(String value) {
+    this.executionPrefix = value;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
index 5e4e1fe..5cad65c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadFileDesc.java
@@ -56,7 +56,7 @@ public class LoadFileDesc extends LoadDesc implements 
Serializable {
       final boolean isDfsDir, final String columns, final String columnTypes) {
 
     super(sourcePath);
-    Utilities.LOG14535.info("creating LFD from " + sourcePath + " to " + 
targetDir, new Exception());
+    Utilities.LOG14535.info("creating LFD from " + sourcePath + " to " + 
targetDir/*, new Exception()*/);
     this.targetDir = targetDir;
     this.isDfsDir = isDfsDir;
     this.columns = columns;

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
index 1ac831d..3b49197 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java
@@ -52,10 +52,10 @@ public class LoadTableDesc extends 
org.apache.hadoop.hive.ql.plan.LoadDesc
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
       final Map<String, String> partitionSpec,
       final boolean replace,
-      final AcidUtils.Operation writeType) {
+      final AcidUtils.Operation writeType, boolean isMmTable) {
     super(sourcePath);
-    Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to " + 
table.getTableName(), new Exception());
-    init(table, partitionSpec, replace, writeType, false);
+    Utilities.LOG14535.info("creating part LTD from " + sourcePath + " to " + 
table.getTableName()/*, new Exception()*/);
+    init(table, partitionSpec, replace, writeType, isMmTable);
   }
 
   /**
@@ -69,14 +69,16 @@ public class LoadTableDesc extends 
org.apache.hadoop.hive.ql.plan.LoadDesc
                        final TableDesc table,
                        final Map<String, String> partitionSpec,
                        final boolean replace) {
-    this(sourcePath, table, partitionSpec, replace, 
AcidUtils.Operation.NOT_ACID);
+    // TODO# we assume mm=false here
+    this(sourcePath, table, partitionSpec, replace, 
AcidUtils.Operation.NOT_ACID, false);
   }
 
   public LoadTableDesc(final Path sourcePath,
       final org.apache.hadoop.hive.ql.plan.TableDesc table,
       final Map<String, String> partitionSpec,
-      final AcidUtils.Operation writeType) {
-    this(sourcePath, table, partitionSpec, true, writeType);
+      final AcidUtils.Operation writeType, boolean isMmTable) {
+    // TODO# we assume mm=false here
+    this(sourcePath, table, partitionSpec, true, writeType, isMmTable);
   }
 
   /**
@@ -88,7 +90,8 @@ public class LoadTableDesc extends 
org.apache.hadoop.hive.ql.plan.LoadDesc
   public LoadTableDesc(final Path sourcePath,
                        final org.apache.hadoop.hive.ql.plan.TableDesc table,
                        final Map<String, String> partitionSpec) {
-    this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID);
+    // TODO# we assume mm=false here
+    this(sourcePath, table, partitionSpec, true, AcidUtils.Operation.NOT_ACID, 
false);
   }
 
   public LoadTableDesc(final Path sourcePath,
@@ -98,7 +101,7 @@ public class LoadTableDesc extends 
org.apache.hadoop.hive.ql.plan.LoadDesc
       boolean isReplace,
       boolean isMmTable) {
     super(sourcePath);
-    Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + 
table.getTableName(), new Exception());
+    Utilities.LOG14535.info("creating LTD from " + sourcePath + " to " + 
table.getTableName()/*, new Exception()*/);
     this.dpCtx = dpCtx;
     if (dpCtx != null && dpCtx.getPartSpec() != null && partitionSpec == null) 
{
       init(table, dpCtx.getPartSpec(), isReplace, writeType, isMmTable);

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/test/queries/clientpositive/mm_current.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_current.q 
b/ql/src/test/queries/clientpositive/mm_current.q
index 882096b..11259cb 100644
--- a/ql/src/test/queries/clientpositive/mm_current.q
+++ b/ql/src/test/queries/clientpositive/mm_current.q
@@ -2,10 +2,20 @@ set hive.mapred.mode=nonstrict;
 set hive.explain.user=false;
 set hive.exec.dynamic.partition.mode=nonstrict;
 set hive.fetch.task.conversion=none;
+set tez.grouping.min-size=1;
+set tez.grouping.max-size=2;
+set  hive.tez.auto.reducer.parallelism=false;
 
-drop table simple_mm;
-
+create table intermediate(key int) partitioned by (p int) stored as orc;
+insert into table intermediate partition(p='455') select key from src limit 3;
+insert into table intermediate partition(p='456') select key from src limit 3;
+insert into table intermediate partition(p='457') select key from src limit 3;
   
-create table simple_mm(key int) partitioned by (key_mm int) tblproperties 
('hivecommit'='true');
-insert into table simple_mm partition(key_mm='455') select key from src limit 
3;
+create table simple_mm(key int) partitioned by (key_mm int) stored as orc 
tblproperties ('hivecommit'='true');
+
+explain insert into table simple_mm partition(key_mm='455') select key from 
intermediate;
+insert into table simple_mm partition(key_mm='455') select key from 
intermediate;
+
+drop table simple_mm;
+drop table intermediate;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/87dcab47/ql/src/test/results/clientpositive/llap/mm_current.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mm_current.q.out 
b/ql/src/test/results/clientpositive/llap/mm_current.q.out
index 129bb13..8f1af4c 100644
--- a/ql/src/test/results/clientpositive/llap/mm_current.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_current.q.out
@@ -1,21 +1,128 @@
-PREHOOK: query: drop table simple123
-PREHOOK: type: DROPTABLE
-POSTHOOK: query: drop table simple123
-POSTHOOK: type: DROPTABLE
-PREHOOK: query: create table simple123(key int) partitioned by (key123 int) 
tblproperties ('hivecommit'='true')
+PREHOOK: query: create table intermediate(key int) partitioned by (p int) 
stored as orc
 PREHOOK: type: CREATETABLE
 PREHOOK: Output: database:default
-PREHOOK: Output: default@simple123
-POSTHOOK: query: create table simple123(key int) partitioned by (key123 int) 
tblproperties ('hivecommit'='true')
+PREHOOK: Output: default@intermediate
+POSTHOOK: query: create table intermediate(key int) partitioned by (p int) 
stored as orc
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
-POSTHOOK: Output: default@simple123
-PREHOOK: query: insert into table simple123 partition(key123='455') select key 
from src limit 3
+POSTHOOK: Output: default@intermediate
+PREHOOK: query: insert into table intermediate partition(p='455') select key 
from src limit 3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@intermediate@p=455
+POSTHOOK: query: insert into table intermediate partition(p='455') select key 
from src limit 3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@intermediate@p=455
+POSTHOOK: Lineage: intermediate PARTITION(p=455).key EXPRESSION 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: insert into table intermediate partition(p='456') select key 
from src limit 3
 PREHOOK: type: QUERY
 PREHOOK: Input: default@src
-PREHOOK: Output: default@simple123@key123=455
-POSTHOOK: query: insert into table simple123 partition(key123='455') select 
key from src limit 3
+PREHOOK: Output: default@intermediate@p=456
+POSTHOOK: query: insert into table intermediate partition(p='456') select key 
from src limit 3
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
-POSTHOOK: Output: default@simple123@key123=455
-POSTHOOK: Lineage: simple123 PARTITION(key123=455).key EXPRESSION 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+POSTHOOK: Output: default@intermediate@p=456
+POSTHOOK: Lineage: intermediate PARTITION(p=456).key EXPRESSION 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: insert into table intermediate partition(p='457') select key 
from src limit 3
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+PREHOOK: Output: default@intermediate@p=457
+POSTHOOK: query: insert into table intermediate partition(p='457') select key 
from src limit 3
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@src
+POSTHOOK: Output: default@intermediate@p=457
+POSTHOOK: Lineage: intermediate PARTITION(p=457).key EXPRESSION 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
+PREHOOK: query: create table simple_mm(key int) partitioned by (key_mm int) 
stored as orc tblproperties ('hivecommit'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@simple_mm
+POSTHOOK: query: create table simple_mm(key int) partitioned by (key_mm int) 
stored as orc tblproperties ('hivecommit'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@simple_mm
+PREHOOK: query: explain insert into table simple_mm partition(key_mm='455') 
select key from intermediate
+PREHOOK: type: QUERY
+POSTHOOK: query: explain insert into table simple_mm partition(key_mm='455') 
select key from intermediate
+POSTHOOK: type: QUERY
+STAGE DEPENDENCIES:
+  Stage-1 is a root stage
+  Stage-2 depends on stages: Stage-1
+  Stage-0 depends on stages: Stage-2
+  Stage-3 depends on stages: Stage-0
+
+STAGE PLANS:
+  Stage: Stage-1
+    Tez
+#### A masked pattern was here ####
+      Vertices:
+        Map 1 
+            Map Operator Tree:
+                TableScan
+                  alias: intermediate
+                  Statistics: Num rows: 9 Data size: 108 Basic stats: COMPLETE 
Column stats: NONE
+                  Select Operator
+                    expressions: key (type: int)
+                    outputColumnNames: _col0
+                    Statistics: Num rows: 9 Data size: 108 Basic stats: 
COMPLETE Column stats: NONE
+                    File Output Operator
+                      compressed: false
+                      Statistics: Num rows: 9 Data size: 108 Basic stats: 
COMPLETE Column stats: NONE
+                      table:
+                          input format: 
org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+                          output format: 
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+                          serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+                          name: default.simple_mm
+            Execution mode: llap
+            LLAP IO: all inputs
+
+  Stage: Stage-2
+    Dependency Collection
+
+  Stage: Stage-0
+    Move Operator
+      tables:
+          partition:
+            key_mm 455
+          replace: false
+          table:
+              input format: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
+              output format: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
+              serde: org.apache.hadoop.hive.ql.io.orc.OrcSerde
+              name: default.simple_mm
+          micromanaged table: true
+
+  Stage: Stage-3
+    Stats-Aggr Operator
+
+PREHOOK: query: insert into table simple_mm partition(key_mm='455') select key 
from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Input: default@intermediate@p=457
+PREHOOK: Output: default@simple_mm@key_mm=455
+POSTHOOK: query: insert into table simple_mm partition(key_mm='455') select 
key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Input: default@intermediate@p=457
+POSTHOOK: Output: default@simple_mm@key_mm=455
+POSTHOOK: Lineage: simple_mm PARTITION(key_mm=455).key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: drop table simple_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@simple_mm
+PREHOOK: Output: default@simple_mm
+POSTHOOK: query: drop table simple_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@simple_mm
+POSTHOOK: Output: default@simple_mm
+PREHOOK: query: drop table intermediate
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@intermediate
+PREHOOK: Output: default@intermediate
+POSTHOOK: query: drop table intermediate
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Output: default@intermediate

Reply via email to