HIVE-14954 : put FSOP manifests for the instances of the same vertex into a 
directory (Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/65a380dd
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/65a380dd
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/65a380dd

Branch: refs/heads/hive-14535
Commit: 65a380ddb67b3836b5c3f14ba679f5d52abbbeda
Parents: 423537a
Author: Sergey Shelukhin <[email protected]>
Authored: Tue Oct 25 14:05:36 2016 -0700
Committer: Sergey Shelukhin <[email protected]>
Committed: Tue Oct 25 14:05:36 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/common/ValidWriteIds.java       |  2 +-
 .../hadoop/hive/metastore/MmCleanerThread.java  |  2 +-
 .../apache/hadoop/hive/ql/exec/Utilities.java   | 73 +++++++++-----------
 .../rcfile/truncate/ColumnTruncateMapper.java   |  1 -
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  1 -
 .../optimizer/unionproc/UnionProcFactory.java   |  1 -
 .../hadoop/hive/ql/parse/GenTezProcContext.java |  2 +-
 .../hadoop/hive/ql/parse/TaskCompiler.java      |  1 -
 8 files changed, 35 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/65a380dd/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java 
b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
index df0278c..6b38247 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
@@ -133,7 +133,7 @@ public class ValidWriteIds {
     @Override
     public boolean accept(Path path) {
       String name = path.getName();
-      return isMatch == (name.equals(mmDirName) || name.startsWith(tmpPrefix));
+      return isMatch == (name.equals(mmDirName) || name.equals(tmpPrefix));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/65a380dd/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java
----------------------------------------------------------------------
diff --git 
a/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java 
b/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java
index 6a7f588..d99b0d7 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java
@@ -288,7 +288,7 @@ public class MmCleanerThread extends Thread implements 
MetaStoreThread {
         LOG.warn(path + " does not exist; assuming that the cleanup is not 
needed.");
         return;
       }
-      // TODO# do we need to account for any subdirectories here? decide after 
special-case jiras
+      // TODO# this doesn't account for list bucketing. Do nothing now, ACID 
will solve all problems.
       files = fs.listStatus(path);
     } catch (Exception ex) {
       LOG.error("Failed to get files for " + path + "; cannot ensure cleanup 
for any writes");

http://git-wip-us.apache.org/repos/asf/hive/blob/65a380dd/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 0f8384d..a7050ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -35,7 +35,6 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.net.URI;
-import java.net.URISyntaxException;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.net.URLDecoder;
@@ -50,7 +49,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -149,7 +147,6 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
-import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
@@ -213,10 +210,10 @@ import com.google.common.base.Preconditions;
  * Utilities.
  *
  */
-@SuppressWarnings("nls")
+@SuppressWarnings({ "nls", "deprecation" })
 public final class Utilities {
 
-  // TODO: remove when merging
+  // TODO# remove when merging; convert some statements to local loggers, 
remove others
   public static final Logger LOG14535 = LoggerFactory.getLogger("Log14535");
 
   /**
@@ -651,8 +648,8 @@ public final class Utilities {
     }
 
     @Override
-    protected void initialize(Class type, Object oldInstance, Object 
newInstance, Encoder out) {
-      Iterator ite = ((Collection) oldInstance).iterator();
+    protected void initialize(Class<?> type, Object oldInstance, Object 
newInstance, Encoder out) {
+      Iterator<?> ite = ((Collection<?>) oldInstance).iterator();
       while (ite.hasNext()) {
         out.writeStatement(new Statement(oldInstance, "add", new Object[] 
{ite.next()}));
       }
@@ -3798,10 +3795,6 @@ public final class Utilities {
 
   private static final String MANIFEST_EXTENSION = ".manifest";
 
-  private static Path getManifestDir(Path specPath, String unionSuffix) {
-    return (unionSuffix == null) ? specPath : new Path(specPath, unionSuffix);
-  }
-
   private static void tryDelete(FileSystem fs, Path path) {
     try {
       fs.delete(path, true);
@@ -3837,26 +3830,20 @@ public final class Utilities {
         tryDelete(fs, status.getPath());
       }
     }
-    files = HiveStatsUtils.getFileStatusRecurse(manifestDir, 1, fs, filter);
-    if (files != null) {
-      for (FileStatus status : files) {
-        Utilities.LOG14535.info("Deleting " + status.getPath() + " on 
failure");
-        tryDelete(fs, status.getPath());
-      }
-    }
+    Utilities.LOG14535.info("Deleting " + manifestDir + " on failure");
+    fs.delete(manifestDir, true);
   }
 
 
   public static void writeMmCommitManifest(List<Path> commitPaths, Path 
specPath, FileSystem fs,
       String taskId, Long mmWriteId, String unionSuffix) throws HiveException {
     if (commitPaths.isEmpty()) return;
-    Path manifestPath = getManifestDir(specPath, unionSuffix);
-    manifestPath = new Path(manifestPath, "_tmp." + 
ValidWriteIds.getMmFilePrefix(
-        mmWriteId) + "_" + taskId + MANIFEST_EXTENSION);
+    // We assume one FSOP per task (per specPath), so we create it in specPath.
+    Path manifestPath = getManifestDir(specPath, mmWriteId, unionSuffix);
+    manifestPath = new Path(manifestPath, taskId + MANIFEST_EXTENSION);
     Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with " + 
commitPaths);
     try {
       // Don't overwrite the manifest... should fail if we have collisions.
-      // We assume one FSOP per task (per specPath), so we create it in 
specPath.
       try (FSDataOutputStream out = fs.create(manifestPath, false)) {
         if (out == null) {
           throw new HiveException("Failed to create manifest at " + 
manifestPath);
@@ -3871,6 +3858,11 @@ public final class Utilities {
     }
   }
 
+  private static Path getManifestDir(Path specPath, long mmWriteId, String 
unionSuffix) {
+    Path manifestPath = new Path(specPath, "_tmp." + 
ValidWriteIds.getMmFilePrefix(mmWriteId));
+    return (unionSuffix == null) ? manifestPath : new Path(manifestPath, 
unionSuffix);
+  }
+
   public static final class MissingBucketsContext {
     public final TableDesc tableInfo;
     public final int numBuckets;
@@ -3886,17 +3878,16 @@ public final class Utilities {
       boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, 
long mmWriteId,
       Reporter reporter) throws IOException, HiveException {
     FileSystem fs = specPath.getFileSystem(hconf);
-    // Manifests would be at the root level, but the results at target level.
-    Path manifestDir = getManifestDir(specPath, unionSuffix);
-
-    ValidWriteIds.IdPathFilter filter = new 
ValidWriteIds.IdPathFilter(mmWriteId, true);
+    Path manifestDir = getManifestDir(specPath, mmWriteId, unionSuffix);
     if (!success) {
+      ValidWriteIds.IdPathFilter filter = new 
ValidWriteIds.IdPathFilter(mmWriteId, true);
       tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels,
           unionSuffix, filter, mmWriteId);
       return;
     }
-    FileStatus[] files = HiveStatsUtils.getFileStatusRecurse(manifestDir, 1, 
fs, filter);
+
     Utilities.LOG14535.info("Looking for manifests in: " + manifestDir + " (" 
+ mmWriteId + ")");
+    FileStatus[] files = fs.listStatus(manifestDir);
     List<Path> manifests = new ArrayList<>();
     if (files != null) {
       for (FileStatus status : files) {
@@ -3909,6 +3900,7 @@ public final class Utilities {
     }
 
     Utilities.LOG14535.info("Looking for files in: " + specPath);
+    ValidWriteIds.IdPathFilter filter = new 
ValidWriteIds.IdPathFilter(mmWriteId, true);
     files = getMmDirectoryCandidates(
         fs, specPath, dpLevels, lbLevels, filter, mmWriteId);
     ArrayList<FileStatus> mmDirectories = new ArrayList<>();
@@ -3940,6 +3932,18 @@ public final class Utilities {
       }
     }
 
+    Utilities.LOG14535.info("Deleting manifest directory " + manifestDir);
+    tryDelete(fs, manifestDir);
+    if (unionSuffix != null) {
+      // Also delete the parent directory if we are the last union FSOP to 
execute.
+      manifestDir = manifestDir.getParent();
+      FileStatus[] remainingFiles = fs.listStatus(manifestDir);
+      if (remainingFiles == null || remainingFiles.length == 0) {
+        Utilities.LOG14535.info("Deleting manifest directory " + manifestDir);
+        tryDelete(fs, manifestDir);
+      }
+    }
+
     for (FileStatus status : mmDirectories) {
       cleanMmDirectory(status.getPath(), fs, unionSuffix, committed);
     }
@@ -3947,19 +3951,6 @@ public final class Utilities {
     if (!committed.isEmpty()) {
       throw new HiveException("The following files were committed but not 
found: " + committed);
     }
-    for (Path mfp : manifests) {
-      Utilities.LOG14535.info("Deleting manifest " + mfp);
-      tryDelete(fs, mfp);
-    }
-    // Delete the manifest directory if we only created it for manifests; 
otherwise the
-    // dynamic partition loader will find it and try to load it as a 
partition... what a mess.
-    if (manifestDir != specPath) {
-      FileStatus[] remainingFiles = fs.listStatus(manifestDir);
-      if (remainingFiles == null || remainingFiles.length == 0) {
-        Utilities.LOG14535.info("Deleting directory " + manifestDir);
-        tryDelete(fs, manifestDir);
-      }
-    }
 
     if (mmDirectories.isEmpty()) return;
 
@@ -3984,7 +3975,7 @@ public final class Utilities {
         if (committed.remove(childPath.toString())) continue; // A good file.
         deleteUncommitedFile(childPath, fs);
       } else if (!child.isDirectory()) {
-        if (childPath.getName().endsWith(MANIFEST_EXTENSION)) continue;
+        // TODO# needed? if (childPath.getName().endsWith(MANIFEST_EXTENSION)) 
continue;
         if (committed.contains(childPath.toString())) {
           throw new HiveException("Union FSOP has commited "
               + childPath + " outside of union directory" + unionSuffix);

http://git-wip-us.apache.org/repos/asf/hive/blob/65a380dd/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
index d013c6f..bd537cd 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
@@ -234,7 +234,6 @@ public class ColumnTruncateMapper extends MapReduceBase 
implements
       ) throws HiveException, IOException {
     FileSystem fs = outputPath.getFileSystem(job);
     Path backupPath = backupOutputPath(fs, outputPath, job);
-    // TODO# special case - what is this about?
     Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, 
null,
       reporter);
     fs.delete(backupPath, true);

http://git-wip-us.apache.org/repos/asf/hive/blob/65a380dd/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java 
b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 30b22d7..0a29895 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1853,7 +1853,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
       } else {
         // The non-MM path only finds new partitions, as it is looking at the 
temp path.
         // To produce the same effect, we will find all the partitions 
affected by this write ID.
-        // TODO# how would this work with multi-insert into the same table? 
how does the existing one work?
         leafStatus = Utilities.getMmDirectoryCandidates(
             fs, loadPath, numDP, numLB, null, mmWriteId);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/65a380dd/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
index 3c37709..3a38a6d 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
@@ -218,7 +218,6 @@ public final class UnionProcFactory {
         // each parent
         List<FileSinkDesc> fileDescLists = new ArrayList<FileSinkDesc>();
 
-        // TODO# special case #N - unions
         for (Operator<? extends OperatorDesc> parent : parents) {
           FileSinkDesc fileSinkDesc = (FileSinkDesc) 
fileSinkOp.getConf().clone();
           fileSinkDesc.setDirName(new Path(parentDirName, 
parent.getIdentifier()));

http://git-wip-us.apache.org/repos/asf/hive/blob/65a380dd/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
index e1fc103..0c160ac 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
@@ -65,7 +65,7 @@ public class GenTezProcContext implements NodeProcessorCtx{
 
   public final ParseContext parseContext;
   public final HiveConf conf;
-  public final List<Task<MoveWork>> moveTask; // TODO#
+  public final List<Task<MoveWork>> moveTask;
 
   // rootTasks is the entry point for all generated tasks
   public final List<Task<? extends Serializable>> rootTasks;

http://git-wip-us.apache.org/repos/asf/hive/blob/65a380dd/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index d09e401..9b2f005 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -208,7 +208,6 @@ public abstract class TaskCompiler {
       }
     } else if (!isCStats) {
       for (LoadTableDesc ltd : loadTableWork) {
-        // TODO#  What is this path? special case for MM?
         Task<MoveWork> tsk = TaskFactory.get(new MoveWork(null, null, ltd, 
null, false), conf);
         mvTask.add(tsk);
         // Check to see if we are stale'ing any indexes and auto-update them 
if we want

Reply via email to