Repository: hive
Updated Branches:
  refs/heads/master ee6a53268 -> 758b913f2


HIVE-19282 : don't nest delta directories inside LB directories for ACID tables 
(Sergey Shelukhin, reviewed by Steve Yeom and Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/758b913f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/758b913f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/758b913f

Branch: refs/heads/master
Commit: 758b913f291016c78bf0b0e90b2a9cf8f333bb45
Parents: ee6a532
Author: sergey <[email protected]>
Authored: Mon Apr 30 19:26:44 2018 -0700
Committer: sergey <[email protected]>
Committed: Mon Apr 30 19:26:44 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/exec/FileSinkOperator.java   | 253 +++++++++----------
 .../apache/hadoop/hive/ql/exec/Utilities.java   |  52 +++-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |   5 +-
 ql/src/test/queries/clientpositive/mm_all.q     |   3 +-
 4 files changed, 160 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/758b913f/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 962fc5d..01a5b4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -31,7 +31,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+
 import com.google.common.collect.Lists;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -162,8 +164,10 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
   }
 
   public class FSPaths implements Cloneable {
-    private Path tmpPath;
-    private Path taskOutputTempPath;
+    private Path tmpPathRoot;
+    private String subdirBeforeTxn, subdirAfterTxn;
+    private final String subdirForTxn;
+    private Path taskOutputTempPathRoot;
     Path[] outPaths;
     Path[] finalPaths;
     RecordWriter[] outWriters;
@@ -172,25 +176,23 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     int acidLastBucket = -1;
     int acidFileOffset = -1;
     private boolean isMmTable;
-    private Long writeId;
-    private int stmtId;
-    String dpDir;
+    String dpDirForCounters;
 
     public FSPaths(Path specPath, boolean isMmTable) {
       this.isMmTable = isMmTable;
       if (!isMmTable) {
-        tmpPath = Utilities.toTempPath(specPath);
-        taskOutputTempPath = Utilities.toTaskTempPath(specPath);
+        tmpPathRoot = Utilities.toTempPath(specPath);
+        taskOutputTempPathRoot = Utilities.toTaskTempPath(specPath);
+        subdirForTxn = null;
       } else {
-        tmpPath = specPath;
-        taskOutputTempPath = null; // Should not be used.
-        writeId = conf.getTableWriteId();
-        stmtId = conf.getStatementId();
+        tmpPathRoot = specPath;
+        taskOutputTempPathRoot = null; // Should not be used.
+        subdirForTxn = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(),
+            conf.getTableWriteId(), conf.getTableWriteId(),  
conf.getStatementId());
       }
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
-        Utilities.FILE_OP_LOGGER.trace("new FSPaths for " + numFiles + " 
files, dynParts = " + bDynParts
-          + ": tmpPath " + tmpPath + ", task path " + taskOutputTempPath
-          + " (spec path " + specPath + ")"/*, new Exception()*/);
+        Utilities.FILE_OP_LOGGER.trace("new FSPaths for " + numFiles
+            + " files, dynParts = " + bDynParts + " (spec path " + specPath + 
")");
       }
 
       outPaths = new Path[numFiles];
@@ -203,24 +205,6 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
       stat = new Stat();
     }
 
-    /**
-     * Update OutPath according to tmpPath.
-     */
-    public Path getTaskOutPath(String taskId) {
-      return new Path(this.taskOutputTempPath, Utilities.toTempPath(taskId));
-    }
-
-    /**
-     * Update the final paths according to tmpPath.
-     */
-    private Path getFinalPath(String taskId, Path tmpPath, String extension) {
-      if (extension != null) {
-        return new Path(tmpPath, taskId + extension);
-      } else {
-        return new Path(tmpPath, taskId);
-      }
-    }
-
     public void closeWriters(boolean abort) throws HiveException {
       for (int idx = 0; idx < outWriters.length; idx++) {
         if (outWriters[idx] != null) {
@@ -307,45 +291,31 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
       }
     }
 
-    public void configureDynPartPath(String dirName, String 
childSpecPathDynLinkedPartitions) {
-      dirName = (childSpecPathDynLinkedPartitions == null) ? dirName :
-        dirName + Path.SEPARATOR + childSpecPathDynLinkedPartitions;
-      tmpPath = new Path(tmpPath, dirName);
-      if (taskOutputTempPath != null) {
-        taskOutputTempPath = new Path(taskOutputTempPath, dirName);
-      }
-    }
-
     public void initializeBucketPaths(int filesIdx, String taskId, boolean 
isNativeTable,
         boolean isSkewedStoredAsSubDirectories) {
       if (isNativeTable) {
         String extension = Utilities.getFileExtension(jc, isCompressed, 
hiveOutputFormat);
+        String taskWithExt = extension == null ? taskId : taskId + extension;
         if (!isMmTable) {
           if (!bDynParts && !isSkewedStoredAsSubDirectories) {
-            finalPaths[filesIdx] = getFinalPath(taskId, parent, extension);
+            finalPaths[filesIdx] = new Path(parent, taskWithExt);
           } else {
-            finalPaths[filesIdx] = getFinalPath(taskId, tmpPath, extension);
+            finalPaths[filesIdx] =  new Path(buildTmpPath(), taskWithExt);
           }
-          outPaths[filesIdx] = getTaskOutPath(taskId);
+          outPaths[filesIdx] = new Path(buildTaskOutputTempPath(), 
Utilities.toTempPath(taskId));
         } else {
-          String subdirPath = 
AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(), writeId, writeId, 
stmtId);
-          if (unionPath != null) {
-            // Create the union directory inside the MM directory.
-            subdirPath += Path.SEPARATOR + unionPath;
-          }
-          subdirPath += Path.SEPARATOR + taskId;
+          String taskIdPath = taskId;
           if (conf.isMerge()) {
             // Make sure we don't collide with the source files.
             // MM tables don't support concat so we don't expect the merge of 
merged files.
-            subdirPath += ".merged";
+            taskIdPath += ".merged";
           }
-          Path finalPath = null;
-          if (!bDynParts && !isSkewedStoredAsSubDirectories) {
-            finalPath = getFinalPath(subdirPath, specPath, extension);
-          } else {
-            // Note: tmpPath here has the correct partition key
-            finalPath = getFinalPath(subdirPath, tmpPath, extension);
+          if (extension != null) {
+            taskIdPath += extension;
           }
+
+          Path finalPath = new Path(buildTmpPath(), taskIdPath);
+
           // In the cases that have multi-stage insert, e.g. a 
"hive.skewjoin.key"-based skew join,
           // it can happen that we want multiple commits into the same 
directory from different
           // tasks (not just task instances). In non-MM case, 
Utilities.renameOrMoveFiles ensures
@@ -373,12 +343,31 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
       }
     }
 
-    public Path getTmpPath() {
-      return tmpPath;
+    public Path buildTmpPath() {
+      String pathStr = tmpPathRoot.toString();
+      if (subdirBeforeTxn != null) {
+        pathStr += Path.SEPARATOR + subdirBeforeTxn;
+      }
+      if (subdirForTxn != null) {
+        pathStr += Path.SEPARATOR + subdirForTxn;
+      }
+      if (subdirAfterTxn != null) {
+        pathStr += Path.SEPARATOR + subdirAfterTxn;
+      }
+      return new Path(pathStr);
     }
 
-    public Path getTaskOutputTempPath() {
-      return taskOutputTempPath;
+    public Path buildTaskOutputTempPath() {
+      if (taskOutputTempPathRoot == null) return null;
+      assert subdirForTxn == null;
+      String pathStr = taskOutputTempPathRoot.toString();
+      if (subdirBeforeTxn != null) {
+        pathStr += Path.SEPARATOR + subdirBeforeTxn;
+      }
+      if (subdirAfterTxn != null) {
+        pathStr += Path.SEPARATOR + subdirAfterTxn;
+      }
+      return new Path(pathStr);
     }
 
     public void addToStat(String statType, long amount) {
@@ -440,7 +429,7 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     // 'Parent'
     boolean isLinked = conf.isLinkedFileSink();
     if (!isLinked) {
-      // Simple case - no union.
+      // Simple case - no union. 
       specPath = conf.getDirName();
       unionPath = null;
     } else {
@@ -543,10 +532,11 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
 
       if (!bDynParts) {
         fsp = new FSPaths(specPath, conf.isMmTable());
+        fsp.subdirAfterTxn = 
combinePathFragments(generateListBucketingDirName(null), unionPath);
         if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
           Utilities.FILE_OP_LOGGER.trace("creating new paths " + 
System.identityHashCode(fsp)
-            + " from ctor; childSpec " + unionPath + ": tmpPath " + 
fsp.getTmpPath()
-            + ", task path " + fsp.getTaskOutputTempPath());
+            + " from ctor; childSpec " + unionPath + ": tmpPath " + 
fsp.buildTmpPath()
+            + ", task path " + fsp.buildTaskOutputTempPath());
         }
 
         // Create all the files - this is required because empty files need to 
be created for
@@ -562,7 +552,7 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
       if (isTemporary && fsp != null
           && tmpStorage != StoragePolicyValue.DEFAULT) {
         assert !conf.isMmTable(); // Not supported for temp tables.
-        final Path outputPath = fsp.taskOutputTempPath;
+        final Path outputPath = fsp.buildTaskOutputTempPath();
         StoragePolicyShim shim = ShimLoader.getHadoopShims()
             .getStoragePolicyShim(fs);
         if (shim != null) {
@@ -728,7 +718,7 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
         Utilities.FILE_OP_LOGGER.trace("createBucketForFileIdx " + filesIdx + 
": final path " + fsp.finalPaths[filesIdx]
           + "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath 
+ ", tmp path "
-          + fsp.getTmpPath() + ", task " + taskId + ")");
+          + fsp.buildTmpPath() + ", task " + taskId + ")");
       }
       if (LOG.isInfoEnabled()) {
         LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]);
@@ -782,8 +772,8 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     // 1) Insert overwrite (all partitions are newly created)
     // 2) Insert into table which creates new partitions (some new partitions)
 
-    if (bDynParts && destTablePath != null && fsp.dpDir != null) {
-      Path destPartPath = new Path(destTablePath, fsp.dpDir);
+    if (bDynParts && destTablePath != null && fsp.dpDirForCounters != null) {
+      Path destPartPath = new Path(destTablePath, fsp.dpDirForCounters);
       // For MM tables, directory structure is
       // <table-dir>/<partition-dir>/<delta-dir>/
 
@@ -866,7 +856,9 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
 
     if (!bDynParts && !filesCreated) {
       if (lbDirName != null) {
-        FSPaths fsp2 = lookupListBucketingPaths(lbDirName);
+        if (valToPaths.get(lbDirName) == null) {
+          createNewPaths(null, lbDirName);
+        }
       } else {
         createBucketFiles(fsp);
       }
@@ -915,7 +907,10 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
         recordValue = serializer.serialize(row, subSetOI);
       } else {
         if (lbDirName != null) {
-          fpaths = lookupListBucketingPaths(lbDirName);
+          fpaths = valToPaths.get(lbDirName);
+          if (fpaths == null) {
+            fpaths = createNewPaths(null, lbDirName);
+          }
         } else {
           fpaths = fsp;
         }
@@ -1063,44 +1058,37 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
   }
 
   /**
-   * Lookup list bucketing path.
-   * @param lbDirName
-   * @return
-   * @throws HiveException
-   */
-  protected FSPaths lookupListBucketingPaths(String lbDirName) throws 
HiveException {
-    FSPaths fsp2 = valToPaths.get(lbDirName);
-    if (fsp2 == null) {
-      fsp2 = createNewPaths(lbDirName);
-    }
-    return fsp2;
-  }
-
-  /**
    * create new path.
    *
    * @param dirName
    * @return
    * @throws HiveException
    */
-  private FSPaths createNewPaths(String dirName) throws HiveException {
+  private FSPaths createNewPaths(String dpDir, String lbDir) throws 
HiveException {
     FSPaths fsp2 = new FSPaths(specPath, conf.isMmTable());
-    fsp2.configureDynPartPath(dirName, !conf.isMmTable() && isUnionDp ? 
unionPath : null);
+    fsp2.subdirAfterTxn = combinePathFragments(lbDir, unionPath);
+    fsp2.subdirBeforeTxn = dpDir;
+    String pathKey = combinePathFragments(dpDir, lbDir);
     if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
-      Utilities.FILE_OP_LOGGER.trace("creating new paths " + 
System.identityHashCode(fsp2) + " for "
-        + dirName + ", childSpec " + unionPath + ": tmpPath " + 
fsp2.getTmpPath()
-        + ", task path " + fsp2.getTaskOutputTempPath());
+      Utilities.FILE_OP_LOGGER.trace("creating new paths {} for {}, childSpec 
{}: tmpPath {},"
+          + " task path {}", System.identityHashCode(fsp2), pathKey, unionPath,
+          fsp2.buildTmpPath(), fsp2.buildTaskOutputTempPath());
     }
+
     if (bDynParts) {
-      fsp2.dpDir = dirName;
+      fsp2.dpDirForCounters = pathKey;
     }
     if(!conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
       createBucketFiles(fsp2);
-      valToPaths.put(dirName, fsp2);
+      valToPaths.put(pathKey, fsp2);
     }
     return fsp2;
   }
 
+  private static String combinePathFragments(String first, String second) {
+    return first == null ? second : (second == null ? first : first + 
Path.SEPARATOR + second);
+  }
+
   /**
    * Generate list bucketing directory name from a row.
    * @param row row to process.
@@ -1112,42 +1100,52 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     }
 
     String lbDirName = null;
-    List<Object> standObjs = new ArrayList<Object>();
     List<String> skewedCols = lbCtx.getSkewedColNames();
     List<List<String>> allSkewedVals = lbCtx.getSkewedColValues();
-    List<String> skewedValsCandidate = null;
     Map<List<String>, String> locationMap = lbCtx.getLbLocationMap();
 
-    /* Convert input row to standard objects. */
-    ObjectInspectorUtils.copyToStandardObject(standObjs, row,
-        (StructObjectInspector) inputObjInspectors[0], 
ObjectInspectorCopyOption.WRITABLE);
+    if (row != null) {
+      List<Object> standObjs = new ArrayList<Object>();
+      List<String> skewedValsCandidate = null;
+      /* Convert input row to standard objects. */
+      ObjectInspectorUtils.copyToStandardObject(standObjs, row,
+          (StructObjectInspector) inputObjInspectors[0], 
ObjectInspectorCopyOption.WRITABLE);
 
-    assert (standObjs.size() >= skewedCols.size()) :
-      "The row has less number of columns than no. of skewed column.";
+      assert (standObjs.size() >= skewedCols.size()) :
+        "The row has less number of columns than no. of skewed column.";
 
-    skewedValsCandidate = new ArrayList<String>(skewedCols.size());
-    for (SkewedColumnPositionPair posPair : lbCtx.getRowSkewedIndex()) {
-      skewedValsCandidate.add(posPair.getSkewColPosition(),
-          standObjs.get(posPair.getTblColPosition()).toString());
-    }
-    /* The row matches skewed column names. */
-    if (allSkewedVals.contains(skewedValsCandidate)) {
-      /* matches skewed values. */
-      lbDirName = FileUtils.makeListBucketingDirName(skewedCols, 
skewedValsCandidate);
-      locationMap.put(skewedValsCandidate, lbDirName);
+      skewedValsCandidate = new ArrayList<String>(skewedCols.size());
+      for (SkewedColumnPositionPair posPair : lbCtx.getRowSkewedIndex()) {
+        skewedValsCandidate.add(posPair.getSkewColPosition(),
+            standObjs.get(posPair.getTblColPosition()).toString());
+      }
+      /* The row matches skewed column names. */
+      if (allSkewedVals.contains(skewedValsCandidate)) {
+        /* matches skewed values. */
+        lbDirName = FileUtils.makeListBucketingDirName(skewedCols, 
skewedValsCandidate);
+        locationMap.put(skewedValsCandidate, lbDirName);
+      } else {
+        lbDirName = createDefaultLbDir(skewedCols, locationMap);
+      }
     } else {
-      /* create default directory. */
-      lbDirName = FileUtils.makeDefaultListBucketingDirName(skewedCols,
+      lbDirName = createDefaultLbDir(skewedCols, locationMap);
+    }
+    return lbDirName;
+  }
+
+  private String createDefaultLbDir(List<String> skewedCols,
+      Map<List<String>, String> locationMap) {
+    String lbDirName;
+    lbDirName = FileUtils.makeDefaultListBucketingDirName(skewedCols,
           lbCtx.getDefaultDirName());
-      List<String> defaultKey = Lists.newArrayList(lbCtx.getDefaultKey());
-      if (!locationMap.containsKey(defaultKey)) {
-        locationMap.put(defaultKey, lbDirName);
-      }
+    List<String> defaultKey = Lists.newArrayList(lbCtx.getDefaultKey());
+    if (!locationMap.containsKey(defaultKey)) {
+      locationMap.put(defaultKey, lbDirName);
     }
     return lbDirName;
   }
 
-  protected FSPaths getDynOutPaths(List<String> row, String lbDirName) throws 
HiveException {
+  protected FSPaths getDynOutPaths(List<String> row, String lbDir) throws 
HiveException {
 
     FSPaths fp;
 
@@ -1156,12 +1154,12 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
 
     String pathKey = null;
     if (dpDir != null) {
-      dpDir = appendToSource(lbDirName, dpDir);
-      pathKey = dpDir;
-      if(conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
+      String dpAndLbDir = combinePathFragments(dpDir, lbDir);
+      pathKey = dpAndLbDir;
+      if (conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
         String buckNum = row.get(row.size() - 1);
         taskId = Utilities.replaceTaskIdFromFilename(taskId, buckNum);
-        pathKey = appendToSource(taskId, dpDir);
+        pathKey = dpAndLbDir + Path.SEPARATOR + taskId;
       }
       FSPaths fsp2 = valToPaths.get(pathKey);
 
@@ -1205,7 +1203,7 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
           prevFsp = null;
         }
 
-        fsp2 = createNewPaths(dpDir);
+        fsp2 = createNewPaths(dpDir, lbDir);
         if (prevFsp == null) {
           prevFsp = fsp2;
         }
@@ -1223,19 +1221,6 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
     return fp;
   }
 
-  /**
-   * Append dir to source dir
-   * @param appendDir
-   * @param srcDir
-   * @return
-   */
-  private String appendToSource(String appendDir, String srcDir) {
-    StringBuilder builder = new StringBuilder(srcDir);
-    srcDir = (appendDir == null) ? srcDir : 
builder.append(Path.SEPARATOR).append(appendDir)
-          .toString();
-    return srcDir;
-  }
-
   // given the current input row, the mapping for input col info to dp 
columns, and # of dp cols,
   // return the relative path corresponding to the row.
   // e.g., ds=2008-04-08/hr=11
@@ -1462,7 +1447,7 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
       FSPaths fspValue = entry.getValue();
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
         Utilities.FILE_OP_LOGGER.trace("Observing entry for stats " + fspKey
-          + " => FSP with tmpPath " + fspValue.getTmpPath());
+          + " => FSP with tmpPath " + fspValue.buildTmpPath());
       }
       // for bucketed tables, hive.optimize.sort.dynamic.partition optimization
       // adds the taskId to the fspKey.

http://git-wip-us.apache.org/repos/asf/hive/blob/758b913f/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index b5a7853..31846a3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -4072,9 +4072,9 @@ public final class Utilities {
   }
 
   public static Path[] getMmDirectoryCandidates(FileSystem fs, Path path, int 
dpLevels,
-      int lbLevels, PathFilter filter, long writeId, int stmtId, Configuration 
conf,
+      PathFilter filter, long writeId, int stmtId, Configuration conf,
       Boolean isBaseDir) throws IOException {
-    int skipLevels = dpLevels + lbLevels;
+    int skipLevels = dpLevels;
     if (filter == null) {
       filter = new AcidUtils.IdPathFilter(writeId, stmtId);
     }
@@ -4186,7 +4186,7 @@ public final class Utilities {
       int dpLevels, int lbLevels, AcidUtils.IdPathFilter filter, long writeId, 
int stmtId,
       Configuration conf) throws IOException {
     Path[] files = getMmDirectoryCandidates(
-        fs, specPath, dpLevels, lbLevels, filter, writeId, stmtId, conf, null);
+        fs, specPath, dpLevels, filter, writeId, stmtId, conf, null);
     if (files != null) {
       for (Path path : files) {
         Utilities.FILE_OP_LOGGER.info("Deleting {} on failure", path);
@@ -4281,7 +4281,7 @@ public final class Utilities {
       FileUtils.mkdir(fs, specPath, hconf);
     }
     Path[] files = getMmDirectoryCandidates(
-        fs, specPath, dpLevels, lbLevels, filter, writeId, stmtId, hconf, 
isInsertOverwrite);
+        fs, specPath, dpLevels, filter, writeId, stmtId, hconf, 
isInsertOverwrite);
     ArrayList<Path> mmDirectories = new ArrayList<>();
     if (files != null) {
       for (Path path : files) {
@@ -4296,6 +4296,7 @@ public final class Utilities {
         int fileCount = mdis.readInt();
         for (int i = 0; i < fileCount; ++i) {
           String nextFile = mdis.readUTF();
+          Utilities.FILE_OP_LOGGER.trace("Looking at committed file: {}", 
nextFile);
           if (!committed.add(nextFile)) {
             throw new HiveException(nextFile + " was specified in multiple 
manifests");
           }
@@ -4318,7 +4319,7 @@ public final class Utilities {
     }
 
     for (Path path : mmDirectories) {
-      cleanMmDirectory(path, fs, unionSuffix, committed);
+      cleanMmDirectory(path, fs, unionSuffix, lbLevels, committed);
     }
 
     if (!committed.isEmpty()) {
@@ -4356,10 +4357,28 @@ public final class Utilities {
     }
   }
 
-  private static void cleanMmDirectory(Path dir, FileSystem fs,
-      String unionSuffix, HashSet<String> committed) throws IOException, 
HiveException {
+  private static void cleanMmDirectory(Path dir, FileSystem fs, String 
unionSuffix,
+      int lbLevels, HashSet<String> committed) throws IOException, 
HiveException {
     for (FileStatus child : fs.listStatus(dir)) {
       Path childPath = child.getPath();
+      if (lbLevels > 0) {
+        // We need to recurse into some LB directories. We don't check the 
directories themselves
+        // for matches; if they are empty they don't matter, and we do will 
delete bad files.
+        // This recursion is not the most efficient way to do this but LB is 
rarely used.
+        if (child.isDirectory()) {
+          Utilities.FILE_OP_LOGGER.trace(
+              "Recursion into LB directory {}; levels remaining ", childPath, 
lbLevels - 1);
+          cleanMmDirectory(childPath, fs, unionSuffix, lbLevels - 1, 
committed);
+        } else {
+          if (committed.contains(childPath.toString())) {
+            throw new HiveException("LB FSOP has commited "
+                + childPath + " outside of LB directory levels " + lbLevels);
+          }
+          deleteUncommitedFile(childPath, fs);
+        }
+        continue;
+      }
+      // No more LB directories expected.
       if (unionSuffix == null) {
         if (committed.remove(childPath.toString())) {
           continue; // A good file.
@@ -4368,15 +4387,21 @@ public final class Utilities {
       } else if (!child.isDirectory()) {
         if (committed.contains(childPath.toString())) {
           throw new HiveException("Union FSOP has commited "
-              + childPath + " outside of union directory" + unionSuffix);
+              + childPath + " outside of union directory " + unionSuffix);
         }
         deleteUncommitedFile(childPath, fs);
       } else if (childPath.getName().equals(unionSuffix)) {
         // Found the right union directory; treat it as "our" MM directory.
-        cleanMmDirectory(childPath, fs, null, committed);
+        cleanMmDirectory(childPath, fs, null, 0, committed);
       } else {
-        Utilities.FILE_OP_LOGGER.trace("FSOP for {} is ignoring the other side 
of the union {}",
-          unionSuffix, childPath);
+        String childName = childPath.getName();
+        if 
(!childName.startsWith(AbstractFileMergeOperator.UNION_SUDBIR_PREFIX)
+            && !childName.startsWith(".") && !childName.startsWith("_")) {
+          throw new HiveException("Union FSOP has an unknown directory "
+              + childPath + " outside of union directory " + unionSuffix);
+        }
+        Utilities.FILE_OP_LOGGER.trace(
+            "FSOP for {} is ignoring the other side of the union {}", 
unionSuffix, childPath);
       }
     }
   }
@@ -4395,13 +4420,12 @@ public final class Utilities {
    * if the entire directory is valid (has no uncommitted/temporary files).
    */
   public static List<Path> getValidMmDirectoriesFromTableOrPart(Path path, 
Configuration conf,
-      ValidWriteIdList validWriteIdList, int lbLevels) throws IOException {
+      ValidWriteIdList validWriteIdList) throws IOException {
     Utilities.FILE_OP_LOGGER.trace("Looking for valid MM paths under {}", 
path);
     // NULL means this directory is entirely valid.
     List<Path> result = null;
     FileSystem fs = path.getFileSystem(conf);
-    FileStatus[] children = (lbLevels == 0) ? fs.listStatus(path)
-        : fs.globStatus(new Path(path, StringUtils.repeat("*" + 
Path.SEPARATOR, lbLevels) + "*"));
+    FileStatus[] children = fs.listStatus(path);
     for (int i = 0; i < children.length; ++i) {
       FileStatus file = children[i];
       Path childPath = file.getPath();

http://git-wip-us.apache.org/repos/asf/hive/blob/758b913f/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 4661881..f6608eb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2074,12 +2074,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
         Utilities.FILE_OP_LOGGER.trace(
             "Looking for dynamic partitions in {} ({} levels)", loadPath, 
numDP);
         Path[] leafStatus = Utilities.getMmDirectoryCandidates(
-            fs, loadPath, numDP, numLB, null, writeId, -1, conf, 
isInsertOverwrite);
+            fs, loadPath, numDP, null, writeId, -1, conf, isInsertOverwrite);
         for (Path p : leafStatus) {
           Path dpPath = p.getParent(); // Skip the MM directory that we have 
found.
-          for (int i = 0; i < numLB; ++i) {
-            dpPath = dpPath.getParent(); // Now skip the LB directories, if 
any...
-          }
           if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
             Utilities.FILE_OP_LOGGER.trace("Found DP " + dpPath);
           }

http://git-wip-us.apache.org/repos/asf/hive/blob/758b913f/ql/src/test/queries/clientpositive/mm_all.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/mm_all.q 
b/ql/src/test/queries/clientpositive/mm_all.q
index 4ffbb6b..ceef0c0 100644
--- a/ql/src/test/queries/clientpositive/mm_all.q
+++ b/ql/src/test/queries/clientpositive/mm_all.q
@@ -9,7 +9,6 @@ set hive.exec.dynamic.partition.mode=nonstrict;
 set hive.support.concurrency=true;
 set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
 
-
 -- Force multiple writers when reading
 drop table intermediate;
 create table intermediate(key int) partitioned by (p int) stored as orc;
@@ -119,6 +118,8 @@ drop table partunion_mm;
 
 
 
+set mapreduce.input.fileinputformat.input.dir.recursive=true;
+
 create table skew_mm(k1 int, k2 int, k4 int) skewed by (k1, k4) on 
((0,0),(1,1),(2,2),(3,3))
  stored as directories tblproperties ("transactional"="true", 
"transactional_properties"="insert_only");
 

Reply via email to