Author: hashutosh
Date: Tue Dec 10 16:17:37 2013
New Revision: 1549884
URL: http://svn.apache.org/r1549884
Log:
HIVE-5982 : Remove redundant filesystem operations and methods in FileSink
(Ashutosh Chauhan via Thejas Nair)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1549884&r1=1549883&r2=1549884&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
Tue Dec 10 16:17:37 2013
@@ -129,29 +129,7 @@ public class FileSinkOperator extends Te
* Update OutPath according to tmpPath.
*/
public Path getTaskOutPath(String taskId) {
- return getOutPath(taskId, this.taskOutputTempPath);
- }
-
-
- /**
- * Update OutPath according to tmpPath.
- */
- public Path getOutPath(String taskId) {
- return getOutPath(taskId, this.tmpPath);
- }
-
- /**
- * Update OutPath according to tmpPath.
- */
- public Path getOutPath(String taskId, Path tmp) {
- return new Path(tmp, Utilities.toTempPath(taskId));
- }
-
- /**
- * Update the final paths according to tmpPath.
- */
- public Path getFinalPath(String taskId) {
- return getFinalPath(taskId, this.tmpPath, null);
+ return new Path(this.taskOutputTempPath, Utilities.toTempPath(taskId));
}
/**
@@ -229,8 +207,6 @@ public class FileSinkOperator extends Te
private static final long serialVersionUID = 1L;
protected transient FileSystem fs;
protected transient Serializer serializer;
- protected transient BytesWritable commonKey = new BytesWritable();
- protected transient TableIdEnum tabIdEnum = null;
protected transient LongWritable row_count;
private transient boolean isNativeTable = true;
@@ -255,28 +231,6 @@ public class FileSinkOperator extends Te
private transient int timeOut; // JT timeout in msec.
private transient long lastProgressReport = System.currentTimeMillis();
- /**
- * TableIdEnum.
- *
- */
- public static enum TableIdEnum {
- TABLE_ID_1_ROWCOUNT,
- TABLE_ID_2_ROWCOUNT,
- TABLE_ID_3_ROWCOUNT,
- TABLE_ID_4_ROWCOUNT,
- TABLE_ID_5_ROWCOUNT,
- TABLE_ID_6_ROWCOUNT,
- TABLE_ID_7_ROWCOUNT,
- TABLE_ID_8_ROWCOUNT,
- TABLE_ID_9_ROWCOUNT,
- TABLE_ID_10_ROWCOUNT,
- TABLE_ID_11_ROWCOUNT,
- TABLE_ID_12_ROWCOUNT,
- TABLE_ID_13_ROWCOUNT,
- TABLE_ID_14_ROWCOUNT,
- TABLE_ID_15_ROWCOUNT;
- }
-
protected transient boolean autoDelete = false;
protected transient JobConf jc;
Class<? extends Writable> outputClass;
@@ -356,14 +310,7 @@ public class FileSinkOperator extends Te
prtner = (HivePartitioner<HiveKey, Object>)
ReflectionUtils.newInstance(
jc.getPartitionerClass(), null);
}
- int id = conf.getDestTableId();
- if ((id != 0) && (id <= TableIdEnum.values().length)) {
- String enumName = "TABLE_ID_" + String.valueOf(id) + "_ROWCOUNT";
- tabIdEnum = TableIdEnum.valueOf(enumName);
- row_count = new LongWritable();
- statsMap.put(tabIdEnum, row_count);
- }
-
+ row_count = new LongWritable();
if (dpCtx != null) {
dpSetup();
}
@@ -478,7 +425,7 @@ public class FileSinkOperator extends Te
taskId =
Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum);
}
if (isNativeTable) {
- fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId);
+ fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath,
null);
LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]);
fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId);
LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]);
@@ -803,20 +750,6 @@ public class FileSinkOperator extends Te
return FileUtils.makePartName(dpColNames, row);
}
- private String getPartitionSpec(Path path, int level) {
- Stack<String> st = new Stack<String>();
- Path p = path;
- for (int i = 0; i < level; ++i) {
- st.push(p.getName());
- p = p.getParent();
- }
- StringBuilder sb = new StringBuilder();
- while (!st.empty()) {
- sb.append(st.pop());
- }
- return sb.toString();
- }
-
@Override
public void closeOp(boolean abort) throws HiveException {
if (!bDynParts && !filesCreated) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1549884&r1=1549883&r2=1549884&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue
Dec 10 16:17:37 2013
@@ -1623,27 +1623,20 @@ public final class Utilities {
FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
Path tmpPath = Utilities.toTempPath(specPath);
Path taskTmpPath = Utilities.toTaskTempPath(specPath);
- Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName()
- + ".intermediate");
Path finalPath = new Path(specPath);
if (success) {
if (fs.exists(tmpPath)) {
- // Step1: rename tmp output folder to intermediate path. After this
- // point, updates from speculative tasks still writing to tmpPath
- // will not appear in finalPath.
- log.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath);
- Utilities.rename(fs, tmpPath, intermediatePath);
- // Step2: remove any tmp file or double-committed output files
+ // remove any tmp file or double-committed output files
ArrayList<String> emptyBuckets =
- Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx);
+ Utilities.removeTempOrDuplicateFiles(fs, tmpPath, dpCtx);
// create empty buckets if necessary
if (emptyBuckets.size() > 0) {
createEmptyBuckets(hconf, emptyBuckets, conf, reporter);
}
- // Step3: move to the file destination
- log.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath);
- Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);
+ // move to the file destination
+ log.info("Moving tmp dir: " + tmpPath + " to: " + finalPath);
+ Utilities.renameOrMoveFiles(fs, tmpPath, finalPath);
}
} else {
fs.delete(tmpPath, true);