This is an automated email from the ASF dual-hosted git repository.
djaiswal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new fae6256 HIVE-21214 : MoveTask : Use attemptId instead of file size
for deduplication of files compareTempOrDuplicateFiles() (Deepak Jaiswal,
reviewed by Jason Dere)
fae6256 is described below
commit fae6256ace38d106b62d3bcade7b84b51bf4e1ec
Author: Deepak Jaiswal <[email protected]>
AuthorDate: Tue Feb 5 14:06:19 2019 -0800
HIVE-21214 : MoveTask : Use attemptId instead of file size for
deduplication of files compareTempOrDuplicateFiles() (Deepak Jaiswal, reviewed
by Jason Dere)
---
.../org/apache/hadoop/hive/ql/exec/Utilities.java | 71 ++++++++++++++++++----
1 file changed, 59 insertions(+), 12 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 8937b43..b84b052 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1308,7 +1308,7 @@ public final class Utilities {
* filename to extract taskid from
*/
public static String getTaskIdFromFilename(String filename) {
- return getIdFromFilename(filename, FILE_NAME_TO_TASK_ID_REGEX);
+ return getTaskIdFromFilename(filename, FILE_NAME_TO_TASK_ID_REGEX);
}
/**
@@ -1319,10 +1319,19 @@ public final class Utilities {
* filename to extract taskid from
*/
public static String getPrefixedTaskIdFromFilename(String filename) {
- return getIdFromFilename(filename, FILE_NAME_PREFIXED_TASK_ID_REGEX);
+ return getTaskIdFromFilename(filename, FILE_NAME_PREFIXED_TASK_ID_REGEX);
}
- private static String getIdFromFilename(String filename, Pattern pattern) {
+ private static String getTaskIdFromFilename(String filename, Pattern
pattern) {
+ return getIdFromFilename(filename, pattern, 1);
+ }
+
+ public static int getAttemptIdFromFilename(String filename) {
+ String attemptStr = getIdFromFilename(filename,
FILE_NAME_PREFIXED_TASK_ID_REGEX, 3);
+ return Integer.parseInt(attemptStr.substring(1));
+ }
+
+ private static String getIdFromFilename(String filename, Pattern pattern,
int group) {
String taskId = filename;
int dirEnd = filename.lastIndexOf(Path.SEPARATOR);
if (dirEnd != -1) {
@@ -1334,7 +1343,7 @@ public final class Utilities {
LOG.warn("Unable to get task id from file name: {}. Using last component
{}"
+ " as task id.", filename, taskId);
} else {
- taskId = m.group(1);
+ taskId = m.group(group);
}
LOG.debug("TaskId for {} = {}", filename, taskId);
return taskId;
@@ -1823,10 +1832,10 @@ public final class Utilities {
private static FileStatus compareTempOrDuplicateFiles(FileSystem fs,
FileStatus file, FileStatus existingFile) throws IOException {
- // Compare the file sizes of all the attempt files for the same task, the
largest win
- // any attempt files could contain partial results (due to task failures or
- // speculative runs), but the largest should be the correct one since the
result
- // of a successful run should never be smaller than a failed/speculative
run.
+ // Pick the one with mewest attempt ID. For sanity, check the file sizes
too.
+ // If the file size of newest attempt is less than that for older one,
+ // Throw an exception as it maybe a correctness issue causing it.
+ // This breaks speculative execution if it ends prematurely.
FileStatus toDelete = null, toRetain = null;
// "LOAD .. INTO" and "INSERT INTO" commands will generate files with
@@ -1847,12 +1856,26 @@ public final class Utilities {
return existingFile;
}
- if (existingFile.getLen() >= file.getLen()) {
- toDelete = file;
+ int existingFileAttemptId =
getAttemptIdFromFilename(existingFile.getPath().getName());
+ int fileAttemptId = getAttemptIdFromFilename(file.getPath().getName());
+
+ long existingFileSz = getFileSizeRecursively(fs, existingFile);
+ long fileSz = getFileSizeRecursively(fs, file);
+ // Files may come in any order irrespective of their attempt IDs
+ if (existingFileAttemptId > fileAttemptId &&
+ existingFileSz >= fileSz) {
+ // keep existing
toRetain = existingFile;
- } else {
- toDelete = existingFile;
+ toDelete = file;
+ } else if (existingFileAttemptId < fileAttemptId &&
+ existingFileSz <= fileSz) {
+ // keep file
toRetain = file;
+ toDelete = existingFile;
+ } else {
+ throw new IOException(" File " + filePath +
+ " with newer attempt ID " + fileAttemptId + " is smaller than the file
"
+ + existingFile.getPath() + " with older attempt ID " +
existingFileAttemptId);
}
if (!fs.delete(toDelete.getPath(), true)) {
throw new IOException(
@@ -1863,9 +1886,33 @@ public final class Utilities {
+ toDelete.getLen() + ". Existing file: " + toRetain.getPath() + "
with length "
+ toRetain.getLen());
}
+
return toRetain;
}
+ // This function recurisvely fetches the size of all the files in given
directory
+ private static long getFileSizeRecursively(FileSystem fs, FileStatus src)
+ throws IOException {
+ long size = 0;
+ if (src.isDirectory()) {
+ LOG.debug(" src " + src.getPath() + " is a directory");
+ // This is a directory.
+ try {
+ FileStatus[] files = fs.listStatus(src.getPath(),
FileUtils.HIDDEN_FILES_PATH_FILTER);
+ // Recursively fetch sizes of each file
+ for (FileStatus file : files) {
+ size += getFileSizeRecursively(fs, file);
+ }
+ } catch (IOException e) {
+ throw new IOException("Unable to fetch files in directory " +
src.getPath());
+ }
+ } else {
+ size = src.getLen();
+ LOG.debug("src " + src.getPath() + " is a file of size " + size);
+ }
+ return size;
+ }
+
public static boolean isCopyFile(String filename) {
String taskId = filename;
String copyFileSuffix = null;