HIVE-14954 : put FSOP manifests for the instances of the same vertex into a directory (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/65a380dd Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/65a380dd Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/65a380dd Branch: refs/heads/hive-14535 Commit: 65a380ddb67b3836b5c3f14ba679f5d52abbbeda Parents: 423537a Author: Sergey Shelukhin <[email protected]> Authored: Tue Oct 25 14:05:36 2016 -0700 Committer: Sergey Shelukhin <[email protected]> Committed: Tue Oct 25 14:05:36 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hive/common/ValidWriteIds.java | 2 +- .../hadoop/hive/metastore/MmCleanerThread.java | 2 +- .../apache/hadoop/hive/ql/exec/Utilities.java | 73 +++++++++----------- .../rcfile/truncate/ColumnTruncateMapper.java | 1 - .../apache/hadoop/hive/ql/metadata/Hive.java | 1 - .../optimizer/unionproc/UnionProcFactory.java | 1 - .../hadoop/hive/ql/parse/GenTezProcContext.java | 2 +- .../hadoop/hive/ql/parse/TaskCompiler.java | 1 - 8 files changed, 35 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/65a380dd/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java index df0278c..6b38247 100644 --- a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java +++ b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java @@ -133,7 +133,7 @@ public class ValidWriteIds { @Override public boolean accept(Path path) { String name = path.getName(); - return isMatch == (name.equals(mmDirName) || name.startsWith(tmpPrefix)); + return isMatch == (name.equals(mmDirName) || name.equals(tmpPrefix)); } } http://git-wip-us.apache.org/repos/asf/hive/blob/65a380dd/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java b/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java index 6a7f588..d99b0d7 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/MmCleanerThread.java @@ -288,7 +288,7 @@ public class MmCleanerThread extends Thread implements MetaStoreThread { LOG.warn(path + " does not exist; assuming that the cleanup is not needed."); return; } - // TODO# do we need to account for any subdirectories here? decide after special-case jiras + // TODO# this doesn't account for list bucketing. Do nothing now, ACID will solve all problems. files = fs.listStatus(path); } catch (Exception ex) { LOG.error("Failed to get files for " + path + "; cannot ensure cleanup for any writes"); http://git-wip-us.apache.org/repos/asf/hive/blob/65a380dd/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 0f8384d..a7050ab 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -35,7 +35,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.net.URI; -import java.net.URISyntaxException; import java.net.URL; import java.net.URLClassLoader; import java.net.URLDecoder; @@ -50,7 +49,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; import java.util.Collection; -import java.util.Collections; import java.util.Enumeration; import java.util.HashMap; import java.util.HashSet; @@ -149,7 +147,6 @@ import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx; import org.apache.hadoop.hive.ql.plan.FileSinkDesc; -import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.MergeJoinWork; @@ -213,10 +210,10 @@ import com.google.common.base.Preconditions; * Utilities. * */ -@SuppressWarnings("nls") +@SuppressWarnings({ "nls", "deprecation" }) public final class Utilities { - // TODO: remove when merging + // TODO# remove when merging; convert some statements to local loggers, remove others public static final Logger LOG14535 = LoggerFactory.getLogger("Log14535"); /** @@ -651,8 +648,8 @@ public final class Utilities { } @Override - protected void initialize(Class type, Object oldInstance, Object newInstance, Encoder out) { - Iterator ite = ((Collection) oldInstance).iterator(); + protected void initialize(Class<?> type, Object oldInstance, Object newInstance, Encoder out) { + Iterator<?> ite = ((Collection<?>) oldInstance).iterator(); while (ite.hasNext()) { out.writeStatement(new Statement(oldInstance, "add", new Object[] {ite.next()})); } @@ -3798,10 +3795,6 @@ public final class Utilities { private static final String MANIFEST_EXTENSION = ".manifest"; - private static Path getManifestDir(Path specPath, String unionSuffix) { - return (unionSuffix == null) ? specPath : new Path(specPath, unionSuffix); - } - private static void tryDelete(FileSystem fs, Path path) { try { fs.delete(path, true); @@ -3837,26 +3830,20 @@ public final class Utilities { tryDelete(fs, status.getPath()); } } - files = HiveStatsUtils.getFileStatusRecurse(manifestDir, 1, fs, filter); - if (files != null) { - for (FileStatus status : files) { - Utilities.LOG14535.info("Deleting " + status.getPath() + " on failure"); - tryDelete(fs, status.getPath()); - } - } + Utilities.LOG14535.info("Deleting " + manifestDir + " on failure"); + fs.delete(manifestDir, true); } public static void writeMmCommitManifest(List<Path> commitPaths, Path specPath, FileSystem fs, String taskId, Long mmWriteId, String unionSuffix) throws HiveException { if (commitPaths.isEmpty()) return; - Path manifestPath = getManifestDir(specPath, unionSuffix); - manifestPath = new Path(manifestPath, "_tmp." + ValidWriteIds.getMmFilePrefix( - mmWriteId) + "_" + taskId + MANIFEST_EXTENSION); + // We assume one FSOP per task (per specPath), so we create it in specPath. + Path manifestPath = getManifestDir(specPath, mmWriteId, unionSuffix); + manifestPath = new Path(manifestPath, taskId + MANIFEST_EXTENSION); Utilities.LOG14535.info("Writing manifest to " + manifestPath + " with " + commitPaths); try { // Don't overwrite the manifest... should fail if we have collisions. - // We assume one FSOP per task (per specPath), so we create it in specPath. try (FSDataOutputStream out = fs.create(manifestPath, false)) { if (out == null) { throw new HiveException("Failed to create manifest at " + manifestPath); @@ -3871,6 +3858,11 @@ public final class Utilities { } } + private static Path getManifestDir(Path specPath, long mmWriteId, String unionSuffix) { + Path manifestPath = new Path(specPath, "_tmp." + ValidWriteIds.getMmFilePrefix(mmWriteId)); + return (unionSuffix == null) ? manifestPath : new Path(manifestPath, unionSuffix); + } + public static final class MissingBucketsContext { public final TableDesc tableInfo; public final int numBuckets; @@ -3886,17 +3878,16 @@ public final class Utilities { boolean success, int dpLevels, int lbLevels, MissingBucketsContext mbc, long mmWriteId, Reporter reporter) throws IOException, HiveException { FileSystem fs = specPath.getFileSystem(hconf); - // Manifests would be at the root level, but the results at target level. - Path manifestDir = getManifestDir(specPath, unionSuffix); - - ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(mmWriteId, true); + Path manifestDir = getManifestDir(specPath, mmWriteId, unionSuffix); if (!success) { + ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(mmWriteId, true); tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels, unionSuffix, filter, mmWriteId); return; } - FileStatus[] files = HiveStatsUtils.getFileStatusRecurse(manifestDir, 1, fs, filter); + Utilities.LOG14535.info("Looking for manifests in: " + manifestDir + " (" + mmWriteId + ")"); + FileStatus[] files = fs.listStatus(manifestDir); List<Path> manifests = new ArrayList<>(); if (files != null) { for (FileStatus status : files) { @@ -3909,6 +3900,7 @@ public final class Utilities { } Utilities.LOG14535.info("Looking for files in: " + specPath); + ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(mmWriteId, true); files = getMmDirectoryCandidates( fs, specPath, dpLevels, lbLevels, filter, mmWriteId); ArrayList<FileStatus> mmDirectories = new ArrayList<>(); @@ -3940,6 +3932,18 @@ public final class Utilities { } } + Utilities.LOG14535.info("Deleting manifest directory " + manifestDir); + tryDelete(fs, manifestDir); + if (unionSuffix != null) { + // Also delete the parent directory if we are the last union FSOP to execute. + manifestDir = manifestDir.getParent(); + FileStatus[] remainingFiles = fs.listStatus(manifestDir); + if (remainingFiles == null || remainingFiles.length == 0) { + Utilities.LOG14535.info("Deleting manifest directory " + manifestDir); + tryDelete(fs, manifestDir); + } + } + for (FileStatus status : mmDirectories) { cleanMmDirectory(status.getPath(), fs, unionSuffix, committed); } @@ -3947,19 +3951,6 @@ public final class Utilities { if (!committed.isEmpty()) { throw new HiveException("The following files were committed but not found: " + committed); } - for (Path mfp : manifests) { - Utilities.LOG14535.info("Deleting manifest " + mfp); - tryDelete(fs, mfp); - } - // Delete the manifest directory if we only created it for manifests; otherwise the - // dynamic partition loader will find it and try to load it as a partition... what a mess. - if (manifestDir != specPath) { - FileStatus[] remainingFiles = fs.listStatus(manifestDir); - if (remainingFiles == null || remainingFiles.length == 0) { - Utilities.LOG14535.info("Deleting directory " + manifestDir); - tryDelete(fs, manifestDir); - } - } if (mmDirectories.isEmpty()) return; @@ -3984,7 +3975,7 @@ public final class Utilities { if (committed.remove(childPath.toString())) continue; // A good file. deleteUncommitedFile(childPath, fs); } else if (!child.isDirectory()) { - if (childPath.getName().endsWith(MANIFEST_EXTENSION)) continue; + // TODO# needed? if (childPath.getName().endsWith(MANIFEST_EXTENSION)) continue; if (committed.contains(childPath.toString())) { throw new HiveException("Union FSOP has commited " + childPath + " outside of union directory" + unionSuffix); http://git-wip-us.apache.org/repos/asf/hive/blob/65a380dd/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java index d013c6f..bd537cd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java @@ -234,7 +234,6 @@ public class ColumnTruncateMapper extends MapReduceBase implements ) throws HiveException, IOException { FileSystem fs = outputPath.getFileSystem(job); Path backupPath = backupOutputPath(fs, outputPath, job); - // TODO# special case - what is this about? Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null, reporter); fs.delete(backupPath, true); http://git-wip-us.apache.org/repos/asf/hive/blob/65a380dd/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 30b22d7..0a29895 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -1853,7 +1853,6 @@ private void constructOneLBLocationMap(FileStatus fSta, } else { // The non-MM path only finds new partitions, as it is looking at the temp path. // To produce the same effect, we will find all the partitions affected by this write ID. - // TODO# how would this work with multi-insert into the same table? how does the existing one work? leafStatus = Utilities.getMmDirectoryCandidates( fs, loadPath, numDP, numLB, null, mmWriteId); } http://git-wip-us.apache.org/repos/asf/hive/blob/65a380dd/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java index 3c37709..3a38a6d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java @@ -218,7 +218,6 @@ public final class UnionProcFactory { // each parent List<FileSinkDesc> fileDescLists = new ArrayList<FileSinkDesc>(); - // TODO# special case #N - unions for (Operator<? extends OperatorDesc> parent : parents) { FileSinkDesc fileSinkDesc = (FileSinkDesc) fileSinkOp.getConf().clone(); fileSinkDesc.setDirName(new Path(parentDirName, parent.getIdentifier())); http://git-wip-us.apache.org/repos/asf/hive/blob/65a380dd/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java index e1fc103..0c160ac 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java @@ -65,7 +65,7 @@ public class GenTezProcContext implements NodeProcessorCtx{ public final ParseContext parseContext; public final HiveConf conf; - public final List<Task<MoveWork>> moveTask; // TODO# + public final List<Task<MoveWork>> moveTask; // rootTasks is the entry point for all generated tasks public final List<Task<? extends Serializable>> rootTasks; http://git-wip-us.apache.org/repos/asf/hive/blob/65a380dd/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index d09e401..9b2f005 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -208,7 +208,6 @@ public abstract class TaskCompiler { } } else if (!isCStats) { for (LoadTableDesc ltd : loadTableWork) { - // TODO# What is this path? special case for MM? Task<MoveWork> tsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false), conf); mvTask.add(tsk); // Check to see if we are stale'ing any indexes and auto-update them if we want
