Author: heyongqiang
Date: Fri Mar 11 21:44:10 2011
New Revision: 1080769
URL: http://svn.apache.org/viewvc?rev=1080769&view=rev
Log:
HIVE-2030 isEmptyPath() to use ContentSummary cache (Siying via He Yongqiang)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1080769&r1=1080768&r2=1080769&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Fri
Mar 11 21:44:10 2011
@@ -377,7 +377,7 @@ public class ExecDriver extends Task<Map
}
}
- addInputPaths(job, work, emptyScratchDirStr);
+ addInputPaths(job, work, emptyScratchDirStr, ctx);
Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());
// remove the pwd from conf file so that job tracker doesn't show this
@@ -790,7 +790,8 @@ public class ExecDriver extends Task<Map
return numEmptyPaths;
}
- private void addInputPaths(JobConf job, MapredWork work, String
hiveScratchDir) throws Exception {
+ private void addInputPaths(JobConf job, MapredWork work, String
hiveScratchDir, Context ctx)
+ throws Exception {
int numEmptyPaths = 0;
List<String> pathsProcessed = new ArrayList<String>();
@@ -817,7 +818,7 @@ public class ExecDriver extends Task<Map
LOG.info("Adding input file " + path);
Path dirPath = new Path(path);
- if (!Utilities.isEmptyPath(job, dirPath)) {
+ if (!Utilities.isEmptyPath(job, path, ctx)) {
FileInputFormat.addInputPath(job, dirPath);
} else {
emptyPaths.add(path);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1080769&r1=1080768&r2=1080769&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri
Mar 11 21:44:10 2011
@@ -81,10 +81,10 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
-import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
-import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
+import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -1140,7 +1140,7 @@ public final class Utilities {
Path pathPattern = new Path(path, sb.toString());
return fs.globStatus(pathPattern);
}
-
+
public static void mvFileToFinalPath(String specPath, Configuration hconf,
boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf)
throws IOException, HiveException {
@@ -1172,7 +1172,7 @@ public final class Utilities {
fs.delete(tmpPath, true);
}
}
-
+
/**
* Check the existence of buckets according to bucket specification. Create
empty buckets if
* needed.
@@ -1545,6 +1545,8 @@ public final class Utilities {
cs = fs.getContentSummary(p);
}
ctx.addCS(path, cs);
+ LOG.info("Cache Content Summary for " + path + " length: " +
cs.getLength() + " file count: "
+ + cs.getFileCount() + " directory count: " +
cs.getDirectoryCount());
}
summary[0] += cs.getLength();
@@ -1553,14 +1555,25 @@ public final class Utilities {
} catch (IOException e) {
LOG.info("Cannot get size of " + path + ". Safely ignored.");
- if (path != null) {
- ctx.addCS(path, new ContentSummary(0, 0, 0));
- }
}
}
return new ContentSummary(summary[0], summary[1], summary[2]);
}
+ public static boolean isEmptyPath(JobConf job, String dirPath, Context ctx)
+ throws Exception {
+ ContentSummary cs = ctx.getCS(dirPath);
+ if (cs != null) {
+ LOG.info("Content Summary " + dirPath + "length: " + cs.getLength() + "
num files: "
+ + cs.getFileCount() + " num directories: " + cs.getDirectoryCount());
+ return (cs.getLength() == 0 && cs.getFileCount() == 0 &&
cs.getDirectoryCount() <= 1);
+ } else {
+ LOG.info("Content Summary not cached for " + dirPath);
+ }
+ Path p = new Path(dirPath);
+ return isEmptyPath(job, p);
+ }
+
public static boolean isEmptyPath(JobConf job, Path dirPath) throws
Exception {
FileSystem inpFs = dirPath.getFileSystem(job);