HIVE-14638 : handle unions (Sergey Shelukhin)

Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0ce24b93
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0ce24b93
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0ce24b93

Branch: refs/heads/hive-14535
Commit: 0ce24b93e1ba92930c316dee0eb1262a27a101c8
Parents: 754443e
Author: Sergey Shelukhin <[email protected]>
Authored: Tue Oct 4 11:46:19 2016 -0700
Committer: Sergey Shelukhin <[email protected]>
Committed: Tue Oct 4 11:46:19 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/common/HiveStatsUtils.java      |  37 +-
 .../hadoop/hive/common/ValidWriteIds.java       |   5 -
 .../hadoop/hive/ql/exec/FileSinkOperator.java   | 137 ++++---
 .../hadoop/hive/ql/io/HiveInputFormat.java      |  46 ++-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   3 +
 ql/src/test/queries/clientpositive/mm_all.q     |  67 +++-
 ql/src/test/queries/clientpositive/mm_current.q |  31 +-
 .../results/clientpositive/llap/mm_all.q.out    | 377 ++++++++++++++++---
 .../clientpositive/llap/mm_current.q.out        | 186 +++++----
 9 files changed, 690 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0ce24b93/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java 
b/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java
index 111d99c..745a868 100644
--- a/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/HiveStatsUtils.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.common;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -30,6 +31,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Lists;
+
 /**
  * HiveStatsUtils.
  * A collection of utilities used for hive statistics.
@@ -53,11 +56,17 @@ public class HiveStatsUtils {
    */
   public static FileStatus[] getFileStatusRecurse(Path path, int level,  
FileSystem fs)
       throws IOException {
-    return getFileStatusRecurse(path, level, fs, 
FileUtils.HIDDEN_FILES_PATH_FILTER);
+    return getFileStatusRecurse(path, level, fs, 
FileUtils.HIDDEN_FILES_PATH_FILTER, false);
   }
 
   public static FileStatus[] getFileStatusRecurse(
       Path path, int level, FileSystem fs, PathFilter filter) throws 
IOException {
+    return getFileStatusRecurse(path, level, fs, filter, false);
+  }
+
+  public static FileStatus[] getFileStatusRecurse(
+      Path path, int level, FileSystem fs, PathFilter filter, boolean 
allLevelsBelow)
+          throws IOException {
 
     // if level is <0, the return all files/directories under the specified 
path
     if (level < 0) {
@@ -81,7 +90,31 @@ public class HiveStatsUtils {
       sb.append(Path.SEPARATOR).append("*");
     }
     Path pathPattern = new Path(path, sb.toString());
-    return fs.globStatus(pathPattern, filter);
+    if (!allLevelsBelow) {
+      return fs.globStatus(pathPattern, filter);
+    }
+    LinkedList<FileStatus> queue = new LinkedList<>();
+    List<FileStatus> results = new ArrayList<FileStatus>();
+    for (FileStatus status : fs.globStatus(pathPattern)) {
+      if (filter.accept(status.getPath())) {
+        results.add(status);
+      }
+      if (status.isDirectory()) {
+        queue.add(status);
+      }
+    }
+    while (!queue.isEmpty()) {
+      FileStatus status = queue.poll();
+      for (FileStatus child : fs.listStatus(status.getPath())) {
+        if (filter.accept(child.getPath())) {
+          results.add(child);
+        }
+        if (child.isDirectory()) {
+          queue.add(child);
+        }
+      }
+    }
+    return results.toArray(new FileStatus[results.size()]);
   }
 
   public static int getNumBitVectorsForNDVEstimation(Configuration conf) 
throws Exception {

http://git-wip-us.apache.org/repos/asf/hive/blob/0ce24b93/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java 
b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
index b939b43..160f4c0 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
@@ -116,11 +116,6 @@ public class ValidWriteIds {
     return ids != null && (areIdsValid == ids.contains(writeId));
   }
 
-  public boolean isValidInput(Path file) {
-    Long writeId = extractWriteId(file);
-    return (writeId != null) && isValid(writeId);
-  }
-
   public static String getMmFilePrefix(long mmWriteId) {
     return MM_PREFIX + "_" + mmWriteId;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0ce24b93/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index f11a7c3..00115fe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -20,15 +20,12 @@ package org.apache.hadoop.hive.ql.exec;
 
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_TEMPORARY_TABLE_STORAGE;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -40,6 +37,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.StatsSetupConst;
@@ -96,6 +94,7 @@ import com.google.common.collect.Lists;
 /**
  * File Sink operator implementation.
  **/
+@SuppressWarnings("deprecation")
 public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     Serializable {
 
@@ -386,12 +385,16 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     // 'Parent'
     if ((!conf.isLinkedFileSink()) || (dpCtx == null)) {
       specPath = conf.getDirName();
+      Utilities.LOG14535.info("Setting up FSOP " + 
System.identityHashCode(this) + " ("
+          + conf.isLinkedFileSink() + ") with " + taskId + " and " + specPath);
       childSpecPathDynLinkedPartitions = null;
       return;
     }
 
     specPath = conf.getParentDir();
     childSpecPathDynLinkedPartitions = conf.getDirName().getName();
+    Utilities.LOG14535.info("Setting up FSOP " + System.identityHashCode(this) 
+ " ("
+        + conf.isLinkedFileSink() + ") with " + taskId + " and " + specPath);
   }
 
   /** Kryo ctor. */
@@ -1126,7 +1129,8 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
         }
       }
       if (!commitPaths.isEmpty()) {
-        Path manifestPath = new Path(specPath, "_tmp." + 
ValidWriteIds.getMmFilePrefix(
+        Path manifestPath = getManifestDir(specPath, 
childSpecPathDynLinkedPartitions);
+        manifestPath = new Path(manifestPath, "_tmp." + 
ValidWriteIds.getMmFilePrefix(
             conf.getMmWriteId()) + "_" + taskId + MANIFEST_EXTENSION);
         Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with 
" + commitPaths);
         try {
@@ -1161,6 +1165,10 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     super.closeOp(abort);
   }
 
+  private static Path getManifestDir(Path specPath, String unionSuffix) {
+    return (unionSuffix == null) ? specPath : new Path(specPath, unionSuffix);
+  }
+
   /**
    * @return the name of the operator
    */
@@ -1179,15 +1187,17 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     try {
       if ((conf != null) && isNativeTable) {
         Path specPath = conf.getDirName();
+        String unionSuffix = null;
         DynamicPartitionCtx dpCtx = conf.getDynPartCtx();
         if (conf.isLinkedFileSink() && (dpCtx != null)) {
           specPath = conf.getParentDir();
           Utilities.LOG14535.info("Setting specPath to " + specPath + " for 
dynparts");
+          unionSuffix = conf.getDirName().getName();
         }
         if (!conf.isMmTable()) {
           Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, 
conf, reporter); // TODO# other callers
         } else {
-          handleMmTable(specPath, hconf, success, dpCtx, conf, reporter);
+          handleMmTable(specPath, unionSuffix, hconf, success, dpCtx, conf, 
reporter);
         }
       }
     } catch (IOException e) {
@@ -1196,33 +1206,64 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     super.jobCloseOp(hconf, success);
   }
 
-  private void handleMmTable(Path specPath, Configuration hconf, boolean 
success,
-      DynamicPartitionCtx dpCtx, FileSinkDesc conf, Reporter reporter)
+  private static FileStatus[] getMmDirectoryCandidates(FileSystem fs, Path 
path,
+      int dpLevels, String unionSuffix, PathFilter filter) throws IOException {
+    StringBuilder sb = new StringBuilder(path.toUri().getPath());
+    for (int i = 0; i < dpLevels; i++) {
+      sb.append(Path.SEPARATOR).append("*");
+    }
+    if (unionSuffix != null) {
+      sb.append(Path.SEPARATOR).append(unionSuffix);
+    }
+    sb.append(Path.SEPARATOR).append("*"); // TODO: we could add exact mm 
prefix here
+    Path pathPattern = new Path(path, sb.toString());
+    return fs.globStatus(pathPattern, filter);
+  }
+
+  private void handleMmTable(Path specPath, String unionSuffix, Configuration 
hconf,
+      boolean success, DynamicPartitionCtx dpCtx, FileSinkDesc conf, Reporter 
reporter)
           throws IOException, HiveException {
     FileSystem fs = specPath.getFileSystem(hconf);
     // Manifests would be at the root level, but the results at target level.
     // TODO# special case - doesn't take bucketing into account
-    int targetLevel = (dpCtx == null) ? 1 : (dpCtx.getNumDPCols() + 1);
-    int manifestLevel = 1;
+    Path manifestDir = getManifestDir(specPath, unionSuffix);
+
     ValidWriteIds.IdPathFilter filter = new 
ValidWriteIds.IdPathFilter(conf.getMmWriteId(), true);
     if (!success) {
-      deleteMatchingFiles(specPath, fs, targetLevel, filter);
-      deleteMatchingFiles(specPath, fs, manifestLevel, filter);
+      tryDeleteAllMmFiles(fs, specPath, manifestDir, dpCtx, unionSuffix, 
filter);
       return;
     }
-    FileStatus[] files = HiveStatsUtils.getFileStatusRecurse(specPath, 
manifestLevel, fs, filter);
-    List<Path> manifests = new ArrayList<>(files.length);
+    FileStatus[] files = HiveStatsUtils.getFileStatusRecurse(manifestDir, 1, 
fs, filter);
+    Utilities.LOG14535.info("Looking for manifests in: " + manifestDir);
+    List<Path> manifests = new ArrayList<>();
     if (files != null) {
       for (FileStatus status : files) {
-        if (status.getPath().getName().endsWith(MANIFEST_EXTENSION)) {
-          manifests.add(status.getPath());
-        } else if (!status.isDirectory()) {
-          Path path = status.getPath();
-          Utilities.LOG14535.warn("Unknown file found - neither a manifest nor 
directory: " + path);
-          tryDelete(fs, path);
+        Path path = status.getPath();
+        if (path.getName().endsWith(MANIFEST_EXTENSION)) {
+          manifests.add(path);
         }
       }
     }
+
+    Utilities.LOG14535.info("Looking for files in: " + specPath);
+    files = getMmDirectoryCandidates(fs, specPath,
+        dpCtx == null ? 0 : dpCtx.getNumDPCols(), unionSuffix, filter);
+    ArrayList<FileStatus> results = new ArrayList<>();
+    if (files != null) {
+      for (FileStatus status : files) {
+        Path path = status.getPath();
+        Utilities.LOG14535.info("Looking at path: " + path + " from " + 
System.identityHashCode(this));
+        if (!status.isDirectory()) {
+          if (!path.getName().endsWith(MANIFEST_EXTENSION)) {
+            Utilities.LOG14535.warn("Unknown file found, deleting: " + path);
+            tryDelete(fs, path);
+          }
+        } else {
+          results.add(status);
+        }
+      }
+    }
+
     HashSet<String> committed = new HashSet<>();
     for (Path mfp : manifests) {
       try (FSDataInputStream mdis = fs.open(mfp)) {
@@ -1236,22 +1277,14 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
       }
     }
 
-    files = HiveStatsUtils.getFileStatusRecurse(specPath, targetLevel, fs, 
filter);
-    LinkedList<FileStatus> results = new LinkedList<>();
-    for (FileStatus status : files) {
-      if (!status.isDirectory()) {
-        Path path = status.getPath();
-        Utilities.LOG14535.warn("Unknown file found - neither a manifest nor 
directory: " + path);
-        tryDelete(fs, path);
-      } else {
-        for (FileStatus child : fs.listStatus(status.getPath())) {
-          Path path = child.getPath();
-          if (committed.remove(path.toString())) continue; // A good file.
-          Utilities.LOG14535.info("Deleting " + path + " that was not 
committed");
-          // We should actually succeed here - if we fail, don't commit the 
query.
-          if (!fs.delete(path, true)) {
-            throw new HiveException("Failed to delete an uncommitted path " + 
path);
-          }
+    for (FileStatus status : results) {
+      for (FileStatus child : fs.listStatus(status.getPath())) {
+        Path childPath = child.getPath();
+        if (committed.remove(childPath.toString())) continue; // A good file.
+        Utilities.LOG14535.info("Deleting " + childPath + " that was not 
committed");
+        // We should actually succeed here - if we fail, don't commit the 
query.
+        if (!fs.delete(childPath, true)) {
+          throw new HiveException("Failed to delete an uncommitted path " + 
childPath);
         }
       }
     }
@@ -1263,11 +1296,19 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
       Utilities.LOG14535.info("Deleting manifest " + mfp);
       tryDelete(fs, mfp);
     }
+    // Delete the manifest directory if we only created it for manifests; 
otherwise the
+    // dynamic partition loader will find it and try to load it as a 
partition... what a mess.
+    if (manifestDir != specPath) {
+      FileStatus[] remainingFiles = fs.listStatus(manifestDir);
+      if (remainingFiles == null || remainingFiles.length == 0) {
+        Utilities.LOG14535.info("Deleting directory " + manifestDir);
+        tryDelete(fs, manifestDir);
+      }
+    }
 
     if (results.isEmpty()) return;
     FileStatus[] finalResults = results.toArray(new 
FileStatus[results.size()]);
 
-    // TODO# dp may break - removeTempOrDuplicateFiles assumes dirs in 
results. Why? We recurse...
     List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(
         fs, finalResults, dpCtx, conf, hconf);
     // create empty buckets if necessary
@@ -1276,15 +1317,27 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     }
   }
 
-  private void deleteMatchingFiles(Path specPath, FileSystem fs,
-      int targetLevel, ValidWriteIds.IdPathFilter filter) throws IOException {
-    for (FileStatus status : HiveStatsUtils.getFileStatusRecurse(specPath, 
targetLevel, fs,
-        filter)) {
-      Utilities.LOG14535.info("Deleting " + status.getPath() + " on failure");
-      tryDelete(fs, status.getPath());
+  private void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path 
manifestDir,
+      DynamicPartitionCtx dpCtx, String unionSuffix,
+      ValidWriteIds.IdPathFilter filter) throws IOException {
+    FileStatus[] files = getMmDirectoryCandidates(fs, specPath,
+        dpCtx == null ? 0 : dpCtx.getNumDPCols(), unionSuffix, filter);
+    if (files != null) {
+      for (FileStatus status : files) {
+        Utilities.LOG14535.info("Deleting " + status.getPath() + " on 
failure");
+        tryDelete(fs, status.getPath());
+      }
+    }
+    files = HiveStatsUtils.getFileStatusRecurse(manifestDir, 1, fs, filter);
+    if (files != null) {
+      for (FileStatus status : files) {
+        Utilities.LOG14535.info("Deleting " + status.getPath() + " on 
failure");
+        tryDelete(fs, status.getPath());
+      }
     }
   }
 
+
   private void tryDelete(FileSystem fs, Path path) {
     try {
       fs.delete(path, true);

http://git-wip-us.apache.org/repos/asf/hive/blob/0ce24b93/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 0510e08..c3e2681 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -29,6 +29,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -42,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.ValidWriteIds;
@@ -352,7 +354,9 @@ public class HiveInputFormat<K extends WritableComparable, 
V extends Writable>
       TableDesc table, Map<String, ValidWriteIds> writeIdMap, List<InputSplit> 
result)
           throws IOException {
     ValidWriteIds writeIds = extractWriteIds(writeIdMap, conf, 
table.getTableName());
-    Utilities.LOG14535.info("Observing " + table.getTableName() + ": " + 
writeIds);
+    if (writeIds != null) {
+      Utilities.LOG14535.info("Observing " + table.getTableName() + ": " + 
writeIds);
+    }
 
     Utilities.copyTablePropertiesToConf(table, conf);
 
@@ -394,22 +398,40 @@ public class HiveInputFormat<K extends 
WritableComparable, V extends Writable>
 
   private void processForWriteIds(Path dir, JobConf conf,
       ValidWriteIds writeIds, List<Path> finalPaths) throws IOException {
-    FileStatus[] files = dir.getFileSystem(conf).listStatus(dir); // TODO: 
batch?
+    FileSystem fs = dir.getFileSystem(conf);
+    FileStatus[] files = fs.listStatus(dir); // TODO: batch?
+    LinkedList<Path> subdirs = new LinkedList<>();
     for (FileStatus file : files) {
-      Path subdir = file.getPath();
-      if (!file.isDirectory()) {
-        Utilities.LOG14535.warn("Found a file not in subdirectory " + subdir);
-        continue;
-      }
-      if (!writeIds.isValidInput(subdir)) {
-        Utilities.LOG14535.warn("Ignoring an uncommitted directory " + subdir);
-        continue;
+      handleNonMmDirChild(file, writeIds, subdirs, finalPaths);
+    }
+    while (!subdirs.isEmpty()) {
+      Path subdir = subdirs.poll();
+      for (FileStatus file : fs.listStatus(subdir)) {
+        handleNonMmDirChild(file, writeIds, subdirs, finalPaths);
       }
-      Utilities.LOG14535.info("Adding input " + subdir);
-      finalPaths.add(subdir);
     }
   }
 
+  private void handleNonMmDirChild(FileStatus file, ValidWriteIds writeIds,
+      LinkedList<Path> subdirs, List<Path> finalPaths) {
+    Path path = file.getPath();
+    if (!file.isDirectory()) {
+      Utilities.LOG14535.warn("Ignoring a file not in MM directory " + path);
+      return;
+    }
+    Long writeId = ValidWriteIds.extractWriteId(path);
+    if (writeId == null) {
+      subdirs.add(path);
+      return;
+    }
+    if (!writeIds.isValid(writeId)) {
+      Utilities.LOG14535.warn("Ignoring an uncommitted directory " + path);
+      return;
+    }
+    Utilities.LOG14535.info("Adding input " + path);
+    finalPaths.add(path);
+  }
+
   Path[] getInputPaths(JobConf job) throws IOException {
     Path[] dirs;
     if (HiveConf.getVar(job, 
HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {

http://git-wip-us.apache.org/repos/asf/hive/blob/0ce24b93/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 6cd0500..73a3b19 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1597,6 +1597,7 @@ public class Hive {
               getConf(), new ValidWriteIds.IdPathFilter(mmWriteId, false));
         }
       } else {
+        Utilities.LOG14535.info("moving " + loadPath + " to " + newPartPath);
         if (replace || (oldPart == null && !isAcid)) {
           replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, 
getConf(),
               isSrcLocal);
@@ -2016,6 +2017,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
       newFiles = Collections.synchronizedList(new ArrayList<Path>());
     }
     if (mmWriteId == null) {
+      Utilities.LOG14535.info("moving " + loadPath + " to " + tbl.getPath());
       if (replace) {
         Path tableDest = tbl.getPath();
         replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, 
isSrcLocal);
@@ -2029,6 +2031,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
         }
       }
     } else {
+      Utilities.LOG14535.info("not moving " + loadPath + " to " + 
tbl.getPath());
       if (replace) {
         Path tableDest = tbl.getPath();
         deleteOldPathForReplace(tableDest, tableDest, sessionConf,

http://git-wip-us.apache.org/repos/asf/hive/blob/0ce24b93/ql/src/test/queries/clientpositive/mm_all.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_all.q 
b/ql/src/test/queries/clientpositive/mm_all.q
index 59171af..cc44c19 100644
--- a/ql/src/test/queries/clientpositive/mm_all.q
+++ b/ql/src/test/queries/clientpositive/mm_all.q
@@ -1,6 +1,10 @@
 set hive.mapred.mode=nonstrict;
 set hive.explain.user=false;
 set hive.fetch.task.conversion=none;
+set tez.grouping.min-size=1;
+set tez.grouping.max-size=2;
+set hive.exec.dynamic.partition.mode=nonstrict;
+
 
 -- Force multiple writers when reading
 drop table intermediate;
@@ -22,6 +26,8 @@ create table simple_mm(key int) stored as orc tblproperties 
('hivecommit'='true'
 insert into table simple_mm select key from intermediate;
 insert overwrite table simple_mm select key from intermediate;
 select * from simple_mm;
+insert into table simple_mm select key from intermediate;
+select * from simple_mm;
 drop table simple_mm;
 
 
@@ -50,6 +56,65 @@ drop table dp_no_mm;
 drop table dp_mm;
 
 
+-- union
+
+create table union_mm(id int)  tblproperties ('hivecommit'='true'); 
+insert into table union_mm 
+select temps.p from (
+select key as p from intermediate 
+union all 
+select key + 1 as p from intermediate ) temps;
+
+select * from union_mm order by id;
+
+insert into table union_mm 
+select p from
+(
+select key + 1 as p from intermediate
+union all
+select key from intermediate
+) tab group by p
+union all
+select key + 2 as p from intermediate;
+
+select * from union_mm order by id;
+
+insert into table union_mm
+SELECT p FROM
+(
+  SELECT key + 1 as p FROM intermediate
+  UNION ALL
+  SELECT key as p FROM ( 
+    SELECT distinct key FROM (
+      SELECT key FROM (
+        SELECT key + 2 as key FROM intermediate
+        UNION ALL
+        SELECT key FROM intermediate
+      )t1 
+    group by key)t2
+  )t3
+)t4
+group by p;
+
+
+select * from union_mm order by id;
+drop table union_mm;
+
+
+create table partunion_mm(id int) partitioned by (key int) tblproperties 
('hivecommit'='true'); 
+insert into table partunion_mm partition(key)
+select temps.* from (
+select key as p, key from intermediate 
+union all 
+select key + 1 as p, key + 1 from intermediate ) temps;
+
+select * from partunion_mm;
+drop table partunion_mm;
+
+-- TODO# from here, fix it
+
+
+
 
 -- future
 
@@ -110,7 +175,7 @@ drop table dp_mm;
 --INSERT OVERWRITE TABLE skew_mm
 --SELECT a.key as k1, a.val as k2, b.key as k3, b.val as k4 FROM T1 a JOIN T2 
b ON a.key = b.key;
 --
----- TODO load, acid, etc
+---- TODO load, multi-insert etc
 --
 --
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0ce24b93/ql/src/test/queries/clientpositive/mm_current.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_current.q 
b/ql/src/test/queries/clientpositive/mm_current.q
index b551176..e1fb3d9 100644
--- a/ql/src/test/queries/clientpositive/mm_current.q
+++ b/ql/src/test/queries/clientpositive/mm_current.q
@@ -12,27 +12,28 @@ insert into table intermediate partition(p='455') select 
key from src limit 2;
 insert into table intermediate partition(p='456') select key from src limit 2;
 
 
+create table partunion_no_mm(id int) partitioned by (key int); 
+insert into table partunion_no_mm partition(key)
+select temps.* from (
+select key as p, key from intermediate 
+union all 
+select key + 1 as p, key + 1 from intermediate ) temps;
 
-drop table dp_no_mm;
-drop table dp_mm;
+select * from partunion_no_mm;
+drop table partunion_no_mm;
 
-set hive.merge.mapredfiles=false;
-set hive.merge.sparkfiles=false;
-set hive.merge.tezfiles=false;
 
-create table dp_no_mm (key int) partitioned by (key1 string, key2 int) stored 
as orc;
-create table dp_mm (key int) partitioned by (key1 string, key2 int) stored as 
orc
-  tblproperties ('hivecommit'='true');
+create table partunion_mm(id int) partitioned by (key int) tblproperties 
('hivecommit'='true'); 
+insert into table partunion_mm partition(key)
+select temps.* from (
+select key as p, key from intermediate 
+union all 
+select key + 1 as p, key + 1 from intermediate ) temps;
 
-insert into table dp_no_mm partition (key1='123', key2) select key, key from 
intermediate;
+select * from partunion_mm;
+drop table partunion_mm;
 
-insert into table dp_mm partition (key1='123', key2) select key, key from 
intermediate;
 
-select * from dp_no_mm;
-select * from dp_mm;
-
-drop table dp_no_mm;
-drop table dp_mm;
 
 drop table intermediate;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0ce24b93/ql/src/test/results/clientpositive/llap/mm_all.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mm_all.q.out 
b/ql/src/test/results/clientpositive/llap/mm_all.q.out
index b0c9c0a..0a8bb40 100644
--- a/ql/src/test/results/clientpositive/llap/mm_all.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_all.q.out
@@ -147,18 +147,18 @@ POSTHOOK: Input: default@part_mm
 POSTHOOK: Input: default@part_mm@key_mm=455
 POSTHOOK: Input: default@part_mm@key_mm=456
 #### A masked pattern was here ####
-238    455
-86     455
-238    455
-86     455
-238    455
-86     455
-238    455
-86     455
-238    456
-86     456
-238    456
-86     456
+0      455
+455    455
+0      455
+455    455
+0      455
+455    455
+0      455
+455    455
+0      456
+455    456
+0      456
+455    456
 PREHOOK: query: drop table part_mm
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@part_mm
@@ -213,10 +213,39 @@ POSTHOOK: query: select * from simple_mm
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@simple_mm
 #### A masked pattern was here ####
-238
-86
-238
-86
+0
+455
+0
+455
+PREHOOK: query: insert into table simple_mm select key from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@simple_mm
+POSTHOOK: query: insert into table simple_mm select key from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@simple_mm
+POSTHOOK: Lineage: simple_mm.key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from simple_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@simple_mm
+#### A masked pattern was here ####
+POSTHOOK: query: select * from simple_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@simple_mm
+#### A masked pattern was here ####
+0
+455
+0
+455
+0
+455
+0
+455
 PREHOOK: query: drop table simple_mm
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@simple_mm
@@ -264,10 +293,10 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: default@intermediate
 POSTHOOK: Input: default@intermediate@p=455
 POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Output: default@dp_no_mm@key1=123/key2=238
-POSTHOOK: Output: default@dp_no_mm@key1=123/key2=86
-POSTHOOK: Lineage: dp_no_mm PARTITION(key1=123,key2=238).key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-POSTHOOK: Lineage: dp_no_mm PARTITION(key1=123,key2=86).key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Output: default@dp_no_mm@key1=123/key2=0
+POSTHOOK: Output: default@dp_no_mm@key1=123/key2=455
+POSTHOOK: Lineage: dp_no_mm PARTITION(key1=123,key2=0).key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: dp_no_mm PARTITION(key1=123,key2=455).key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
 PREHOOK: query: insert into table dp_mm partition (key1='123', key2) select 
key, key from intermediate
 PREHOOK: type: QUERY
 PREHOOK: Input: default@intermediate
@@ -279,42 +308,42 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: default@intermediate
 POSTHOOK: Input: default@intermediate@p=455
 POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Output: default@dp_mm@key1=123/key2=238
-POSTHOOK: Output: default@dp_mm@key1=123/key2=86
-POSTHOOK: Lineage: dp_mm PARTITION(key1=123,key2=238).key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-POSTHOOK: Lineage: dp_mm PARTITION(key1=123,key2=86).key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Output: default@dp_mm@key1=123/key2=0
+POSTHOOK: Output: default@dp_mm@key1=123/key2=455
+POSTHOOK: Lineage: dp_mm PARTITION(key1=123,key2=0).key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: dp_mm PARTITION(key1=123,key2=455).key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
 PREHOOK: query: select * from dp_no_mm
 PREHOOK: type: QUERY
 PREHOOK: Input: default@dp_no_mm
-PREHOOK: Input: default@dp_no_mm@key1=123/key2=238
-PREHOOK: Input: default@dp_no_mm@key1=123/key2=86
+PREHOOK: Input: default@dp_no_mm@key1=123/key2=0
+PREHOOK: Input: default@dp_no_mm@key1=123/key2=455
 #### A masked pattern was here ####
 POSTHOOK: query: select * from dp_no_mm
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@dp_no_mm
-POSTHOOK: Input: default@dp_no_mm@key1=123/key2=238
-POSTHOOK: Input: default@dp_no_mm@key1=123/key2=86
+POSTHOOK: Input: default@dp_no_mm@key1=123/key2=0
+POSTHOOK: Input: default@dp_no_mm@key1=123/key2=455
 #### A masked pattern was here ####
-238    123     238
-238    123     238
-86     123     86
-86     123     86
+455    123     455
+455    123     455
+0      123     0
+0      123     0
 PREHOOK: query: select * from dp_mm
 PREHOOK: type: QUERY
 PREHOOK: Input: default@dp_mm
-PREHOOK: Input: default@dp_mm@key1=123/key2=238
-PREHOOK: Input: default@dp_mm@key1=123/key2=86
+PREHOOK: Input: default@dp_mm@key1=123/key2=0
+PREHOOK: Input: default@dp_mm@key1=123/key2=455
 #### A masked pattern was here ####
 POSTHOOK: query: select * from dp_mm
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@dp_mm
-POSTHOOK: Input: default@dp_mm@key1=123/key2=238
-POSTHOOK: Input: default@dp_mm@key1=123/key2=86
+POSTHOOK: Input: default@dp_mm@key1=123/key2=0
+POSTHOOK: Input: default@dp_mm@key1=123/key2=455
 #### A masked pattern was here ####
-238    123     238
-238    123     238
-86     123     86
-86     123     86
+455    123     455
+455    123     455
+0      123     0
+0      123     0
 PREHOOK: query: drop table dp_no_mm
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@dp_no_mm
@@ -331,7 +360,262 @@ POSTHOOK: query: drop table dp_mm
 POSTHOOK: type: DROPTABLE
 POSTHOOK: Input: default@dp_mm
 POSTHOOK: Output: default@dp_mm
-PREHOOK: query: -- future
+PREHOOK: query: -- union
+
+create table union_mm(id int)  tblproperties ('hivecommit'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@union_mm
+POSTHOOK: query: -- union
+
+create table union_mm(id int)  tblproperties ('hivecommit'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@union_mm
+PREHOOK: query: insert into table union_mm 
+select temps.p from (
+select key as p from intermediate 
+union all 
+select key + 1 as p from intermediate ) temps
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@union_mm
+POSTHOOK: query: insert into table union_mm 
+select temps.p from (
+select key as p from intermediate 
+union all 
+select key + 1 as p from intermediate ) temps
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@union_mm
+POSTHOOK: Lineage: union_mm.id EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from union_mm order by id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@union_mm
+#### A masked pattern was here ####
+POSTHOOK: query: select * from union_mm order by id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@union_mm
+#### A masked pattern was here ####
+0
+0
+1
+1
+455
+455
+456
+456
+PREHOOK: query: insert into table union_mm 
+select p from
+(
+select key + 1 as p from intermediate
+union all
+select key from intermediate
+) tab group by p
+union all
+select key + 2 as p from intermediate
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@union_mm
+POSTHOOK: query: insert into table union_mm 
+select p from
+(
+select key + 1 as p from intermediate
+union all
+select key from intermediate
+) tab group by p
+union all
+select key + 2 as p from intermediate
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@union_mm
+POSTHOOK: Lineage: union_mm.id EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from union_mm order by id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@union_mm
+#### A masked pattern was here ####
+POSTHOOK: query: select * from union_mm order by id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@union_mm
+#### A masked pattern was here ####
+0
+0
+0
+1
+1
+1
+2
+2
+455
+455
+455
+456
+456
+456
+457
+457
+PREHOOK: query: insert into table union_mm
+SELECT p FROM
+(
+  SELECT key + 1 as p FROM intermediate
+  UNION ALL
+  SELECT key as p FROM ( 
+    SELECT distinct key FROM (
+      SELECT key FROM (
+        SELECT key + 2 as key FROM intermediate
+        UNION ALL
+        SELECT key FROM intermediate
+      )t1 
+    group by key)t2
+  )t3
+)t4
+group by p
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@union_mm
+POSTHOOK: query: insert into table union_mm
+SELECT p FROM
+(
+  SELECT key + 1 as p FROM intermediate
+  UNION ALL
+  SELECT key as p FROM ( 
+    SELECT distinct key FROM (
+      SELECT key FROM (
+        SELECT key + 2 as key FROM intermediate
+        UNION ALL
+        SELECT key FROM intermediate
+      )t1 
+    group by key)t2
+  )t3
+)t4
+group by p
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@union_mm
+POSTHOOK: Lineage: union_mm.id EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from union_mm order by id
+PREHOOK: type: QUERY
+PREHOOK: Input: default@union_mm
+#### A masked pattern was here ####
+POSTHOOK: query: select * from union_mm order by id
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@union_mm
+#### A masked pattern was here ####
+0
+0
+0
+0
+1
+1
+1
+1
+2
+2
+2
+455
+455
+455
+455
+456
+456
+456
+456
+457
+457
+457
+PREHOOK: query: drop table union_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@union_mm
+PREHOOK: Output: default@union_mm
+POSTHOOK: query: drop table union_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@union_mm
+POSTHOOK: Output: default@union_mm
+PREHOOK: query: create table partunion_mm(id int) partitioned by (key int) 
tblproperties ('hivecommit'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@partunion_mm
+POSTHOOK: query: create table partunion_mm(id int) partitioned by (key int) 
tblproperties ('hivecommit'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@partunion_mm
+PREHOOK: query: insert into table partunion_mm partition(key)
+select temps.* from (
+select key as p, key from intermediate 
+union all 
+select key + 1 as p, key + 1 from intermediate ) temps
+PREHOOK: type: QUERY
+PREHOOK: Input: default@intermediate
+PREHOOK: Input: default@intermediate@p=455
+PREHOOK: Input: default@intermediate@p=456
+PREHOOK: Output: default@partunion_mm
+POSTHOOK: query: insert into table partunion_mm partition(key)
+select temps.* from (
+select key as p, key from intermediate 
+union all 
+select key + 1 as p, key + 1 from intermediate ) temps
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@intermediate
+POSTHOOK: Input: default@intermediate@p=455
+POSTHOOK: Input: default@intermediate@p=456
+POSTHOOK: Output: default@partunion_mm@key=0
+POSTHOOK: Output: default@partunion_mm@key=1
+POSTHOOK: Output: default@partunion_mm@key=455
+POSTHOOK: Output: default@partunion_mm@key=456
+POSTHOOK: Lineage: partunion_mm PARTITION(key=0).id EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: partunion_mm PARTITION(key=1).id EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: partunion_mm PARTITION(key=455).id EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: partunion_mm PARTITION(key=456).id EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from partunion_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@partunion_mm
+PREHOOK: Input: default@partunion_mm@key=0
+PREHOOK: Input: default@partunion_mm@key=1
+PREHOOK: Input: default@partunion_mm@key=455
+PREHOOK: Input: default@partunion_mm@key=456
+#### A masked pattern was here ####
+POSTHOOK: query: select * from partunion_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@partunion_mm
+POSTHOOK: Input: default@partunion_mm@key=0
+POSTHOOK: Input: default@partunion_mm@key=1
+POSTHOOK: Input: default@partunion_mm@key=455
+POSTHOOK: Input: default@partunion_mm@key=456
+#### A masked pattern was here ####
+0      0
+0      0
+1      1
+1      1
+455    455
+455    455
+456    456
+456    456
+PREHOOK: query: drop table partunion_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@partunion_mm
+PREHOOK: Output: default@partunion_mm
+POSTHOOK: query: drop table partunion_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@partunion_mm
+POSTHOOK: Output: default@partunion_mm
+PREHOOK: query: -- TODO# from here, fix it
+
+
+
+
+-- future
 
 
 
@@ -390,7 +674,7 @@ PREHOOK: query: -- future
 --INSERT OVERWRITE TABLE skew_mm
 --SELECT a.key as k1, a.val as k2, b.key as k3, b.val as k4 FROM T1 a JOIN T2 
b ON a.key = b.key;
 --
----- TODO load, acid, etc
+---- TODO load, multi-insert etc
 --
 --
 
@@ -398,7 +682,12 @@ drop table intermediate
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@intermediate
 PREHOOK: Output: default@intermediate
-POSTHOOK: query: -- future
+POSTHOOK: query: -- TODO# from here, fix it
+
+
+
+
+-- future
 
 
 
@@ -457,7 +746,7 @@ POSTHOOK: query: -- future
 --INSERT OVERWRITE TABLE skew_mm
 --SELECT a.key as k1, a.val as k2, b.key as k3, b.val as k4 FROM T1 a JOIN T2 
b ON a.key = b.key;
 --
----- TODO load, acid, etc
+---- TODO load, multi-insert etc
 --
 --
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0ce24b93/ql/src/test/results/clientpositive/llap/mm_current.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientpositive/llap/mm_current.q.out 
b/ql/src/test/results/clientpositive/llap/mm_current.q.out
index fe1caee..87214ba 100644
--- a/ql/src/test/results/clientpositive/llap/mm_current.q.out
+++ b/ql/src/test/results/clientpositive/llap/mm_current.q.out
@@ -28,110 +28,140 @@ POSTHOOK: type: QUERY
 POSTHOOK: Input: default@src
 POSTHOOK: Output: default@intermediate@p=456
 POSTHOOK: Lineage: intermediate PARTITION(p=456).key EXPRESSION 
[(src)src.FieldSchema(name:key, type:string, comment:default), ]
-PREHOOK: query: drop table dp_no_mm
-PREHOOK: type: DROPTABLE
-POSTHOOK: query: drop table dp_no_mm
-POSTHOOK: type: DROPTABLE
-PREHOOK: query: drop table dp_mm
-PREHOOK: type: DROPTABLE
-POSTHOOK: query: drop table dp_mm
-POSTHOOK: type: DROPTABLE
-PREHOOK: query: create table dp_no_mm (key int) partitioned by (key1 string, 
key2 int) stored as orc
-PREHOOK: type: CREATETABLE
-PREHOOK: Output: database:default
-PREHOOK: Output: default@dp_no_mm
-POSTHOOK: query: create table dp_no_mm (key int) partitioned by (key1 string, 
key2 int) stored as orc
-POSTHOOK: type: CREATETABLE
-POSTHOOK: Output: database:default
-POSTHOOK: Output: default@dp_no_mm
-PREHOOK: query: create table dp_mm (key int) partitioned by (key1 string, key2 
int) stored as orc
-  tblproperties ('hivecommit'='true')
+PREHOOK: query: create table partunion_no_mm(id int) partitioned by (key int)
 PREHOOK: type: CREATETABLE
 PREHOOK: Output: database:default
-PREHOOK: Output: default@dp_mm
-POSTHOOK: query: create table dp_mm (key int) partitioned by (key1 string, 
key2 int) stored as orc
-  tblproperties ('hivecommit'='true')
+PREHOOK: Output: default@partunion_no_mm
+POSTHOOK: query: create table partunion_no_mm(id int) partitioned by (key int)
 POSTHOOK: type: CREATETABLE
 POSTHOOK: Output: database:default
-POSTHOOK: Output: default@dp_mm
-PREHOOK: query: insert into table dp_no_mm partition (key1='123', key2) select 
key, key from intermediate
+POSTHOOK: Output: default@partunion_no_mm
+PREHOOK: query: insert into table partunion_no_mm partition(key)
+select temps.* from (
+select key as p, key from intermediate 
+union all 
+select key + 1 as p, key + 1 from intermediate ) temps
 PREHOOK: type: QUERY
 PREHOOK: Input: default@intermediate
 PREHOOK: Input: default@intermediate@p=455
 PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Output: default@dp_no_mm@key1=123
-POSTHOOK: query: insert into table dp_no_mm partition (key1='123', key2) 
select key, key from intermediate
+PREHOOK: Output: default@partunion_no_mm
+POSTHOOK: query: insert into table partunion_no_mm partition(key)
+select temps.* from (
+select key as p, key from intermediate 
+union all 
+select key + 1 as p, key + 1 from intermediate ) temps
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@intermediate
 POSTHOOK: Input: default@intermediate@p=455
 POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Output: default@dp_no_mm@key1=123/key2=0
-POSTHOOK: Output: default@dp_no_mm@key1=123/key2=455
-POSTHOOK: Lineage: dp_no_mm PARTITION(key1=123,key2=0).key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-POSTHOOK: Lineage: dp_no_mm PARTITION(key1=123,key2=455).key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: insert into table dp_mm partition (key1='123', key2) select 
key, key from intermediate
+POSTHOOK: Output: default@partunion_no_mm@key=0
+POSTHOOK: Output: default@partunion_no_mm@key=1
+POSTHOOK: Output: default@partunion_no_mm@key=455
+POSTHOOK: Output: default@partunion_no_mm@key=456
+POSTHOOK: Lineage: partunion_no_mm PARTITION(key=0).id EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: partunion_no_mm PARTITION(key=1).id EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: partunion_no_mm PARTITION(key=455).id EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: partunion_no_mm PARTITION(key=456).id EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from partunion_no_mm
+PREHOOK: type: QUERY
+PREHOOK: Input: default@partunion_no_mm
+PREHOOK: Input: default@partunion_no_mm@key=0
+PREHOOK: Input: default@partunion_no_mm@key=1
+PREHOOK: Input: default@partunion_no_mm@key=455
+PREHOOK: Input: default@partunion_no_mm@key=456
+#### A masked pattern was here ####
+POSTHOOK: query: select * from partunion_no_mm
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@partunion_no_mm
+POSTHOOK: Input: default@partunion_no_mm@key=0
+POSTHOOK: Input: default@partunion_no_mm@key=1
+POSTHOOK: Input: default@partunion_no_mm@key=455
+POSTHOOK: Input: default@partunion_no_mm@key=456
+#### A masked pattern was here ####
+0      0
+0      0
+1      1
+1      1
+455    455
+455    455
+456    456
+456    456
+PREHOOK: query: drop table partunion_no_mm
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@partunion_no_mm
+PREHOOK: Output: default@partunion_no_mm
+POSTHOOK: query: drop table partunion_no_mm
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@partunion_no_mm
+POSTHOOK: Output: default@partunion_no_mm
+PREHOOK: query: create table partunion_mm(id int) partitioned by (key int) 
tblproperties ('hivecommit'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@partunion_mm
+POSTHOOK: query: create table partunion_mm(id int) partitioned by (key int) 
tblproperties ('hivecommit'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@partunion_mm
+PREHOOK: query: insert into table partunion_mm partition(key)
+select temps.* from (
+select key as p, key from intermediate 
+union all 
+select key + 1 as p, key + 1 from intermediate ) temps
 PREHOOK: type: QUERY
 PREHOOK: Input: default@intermediate
 PREHOOK: Input: default@intermediate@p=455
 PREHOOK: Input: default@intermediate@p=456
-PREHOOK: Output: default@dp_mm@key1=123
-POSTHOOK: query: insert into table dp_mm partition (key1='123', key2) select 
key, key from intermediate
+PREHOOK: Output: default@partunion_mm
+POSTHOOK: query: insert into table partunion_mm partition(key)
+select temps.* from (
+select key as p, key from intermediate 
+union all 
+select key + 1 as p, key + 1 from intermediate ) temps
 POSTHOOK: type: QUERY
 POSTHOOK: Input: default@intermediate
 POSTHOOK: Input: default@intermediate@p=455
 POSTHOOK: Input: default@intermediate@p=456
-POSTHOOK: Output: default@dp_mm@key1=123/key2=0
-POSTHOOK: Output: default@dp_mm@key1=123/key2=455
-POSTHOOK: Lineage: dp_mm PARTITION(key1=123,key2=0).key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-POSTHOOK: Lineage: dp_mm PARTITION(key1=123,key2=455).key SIMPLE 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
-PREHOOK: query: select * from dp_no_mm
+POSTHOOK: Output: default@partunion_mm@key=0
+POSTHOOK: Output: default@partunion_mm@key=1
+POSTHOOK: Output: default@partunion_mm@key=455
+POSTHOOK: Output: default@partunion_mm@key=456
+POSTHOOK: Lineage: partunion_mm PARTITION(key=0).id EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: partunion_mm PARTITION(key=1).id EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: partunion_mm PARTITION(key=455).id EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+POSTHOOK: Lineage: partunion_mm PARTITION(key=456).id EXPRESSION 
[(intermediate)intermediate.FieldSchema(name:key, type:int, comment:null), ]
+PREHOOK: query: select * from partunion_mm
 PREHOOK: type: QUERY
-PREHOOK: Input: default@dp_no_mm
-PREHOOK: Input: default@dp_no_mm@key1=123/key2=0
-PREHOOK: Input: default@dp_no_mm@key1=123/key2=455
+PREHOOK: Input: default@partunion_mm
+PREHOOK: Input: default@partunion_mm@key=0
+PREHOOK: Input: default@partunion_mm@key=1
+PREHOOK: Input: default@partunion_mm@key=455
+PREHOOK: Input: default@partunion_mm@key=456
 #### A masked pattern was here ####
-POSTHOOK: query: select * from dp_no_mm
+POSTHOOK: query: select * from partunion_mm
 POSTHOOK: type: QUERY
-POSTHOOK: Input: default@dp_no_mm
-POSTHOOK: Input: default@dp_no_mm@key1=123/key2=0
-POSTHOOK: Input: default@dp_no_mm@key1=123/key2=455
+POSTHOOK: Input: default@partunion_mm
+POSTHOOK: Input: default@partunion_mm@key=0
+POSTHOOK: Input: default@partunion_mm@key=1
+POSTHOOK: Input: default@partunion_mm@key=455
+POSTHOOK: Input: default@partunion_mm@key=456
 #### A masked pattern was here ####
-455    123     455
-455    123     455
-0      123     0
-0      123     0
-PREHOOK: query: select * from dp_mm
-PREHOOK: type: QUERY
-PREHOOK: Input: default@dp_mm
-PREHOOK: Input: default@dp_mm@key1=123/key2=0
-PREHOOK: Input: default@dp_mm@key1=123/key2=455
-#### A masked pattern was here ####
-POSTHOOK: query: select * from dp_mm
-POSTHOOK: type: QUERY
-POSTHOOK: Input: default@dp_mm
-POSTHOOK: Input: default@dp_mm@key1=123/key2=0
-POSTHOOK: Input: default@dp_mm@key1=123/key2=455
-#### A masked pattern was here ####
-455    123     455
-455    123     455
-0      123     0
-0      123     0
-PREHOOK: query: drop table dp_no_mm
-PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@dp_no_mm
-PREHOOK: Output: default@dp_no_mm
-POSTHOOK: query: drop table dp_no_mm
-POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@dp_no_mm
-POSTHOOK: Output: default@dp_no_mm
-PREHOOK: query: drop table dp_mm
+0      0
+0      0
+1      1
+1      1
+455    455
+455    455
+456    456
+456    456
+PREHOOK: query: drop table partunion_mm
 PREHOOK: type: DROPTABLE
-PREHOOK: Input: default@dp_mm
-PREHOOK: Output: default@dp_mm
-POSTHOOK: query: drop table dp_mm
+PREHOOK: Input: default@partunion_mm
+PREHOOK: Output: default@partunion_mm
+POSTHOOK: query: drop table partunion_mm
 POSTHOOK: type: DROPTABLE
-POSTHOOK: Input: default@dp_mm
-POSTHOOK: Output: default@dp_mm
+POSTHOOK: Input: default@partunion_mm
+POSTHOOK: Output: default@partunion_mm
 PREHOOK: query: drop table intermediate
 PREHOOK: type: DROPTABLE
 PREHOOK: Input: default@intermediate

Reply via email to