Author: hashutosh
Date: Tue Dec 10 16:17:37 2013
New Revision: 1549884

URL: http://svn.apache.org/r1549884
Log:
HIVE-5982 : Remove redundant filesystem operations and methods in FileSink 
(Ashutosh Chauhan via Thejas Nair)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1549884&r1=1549883&r2=1549884&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
Tue Dec 10 16:17:37 2013
@@ -129,29 +129,7 @@ public class FileSinkOperator extends Te
      * Update OutPath according to tmpPath.
      */
     public Path getTaskOutPath(String taskId) {
-      return getOutPath(taskId, this.taskOutputTempPath);
-    }
-
-
-    /**
-     * Update OutPath according to tmpPath.
-     */
-    public Path getOutPath(String taskId) {
-      return getOutPath(taskId, this.tmpPath);
-    }
-
-    /**
-     * Update OutPath according to tmpPath.
-     */
-    public Path getOutPath(String taskId, Path tmp) {
-      return new Path(tmp, Utilities.toTempPath(taskId));
-    }
-
-    /**
-     * Update the final paths according to tmpPath.
-     */
-    public Path getFinalPath(String taskId) {
-      return getFinalPath(taskId, this.tmpPath, null);
+      return new Path(this.taskOutputTempPath, Utilities.toTempPath(taskId));
     }
 
     /**
@@ -229,8 +207,6 @@ public class FileSinkOperator extends Te
   private static final long serialVersionUID = 1L;
   protected transient FileSystem fs;
   protected transient Serializer serializer;
-  protected transient BytesWritable commonKey = new BytesWritable();
-  protected transient TableIdEnum tabIdEnum = null;
   protected transient LongWritable row_count;
   private transient boolean isNativeTable = true;
 
@@ -255,28 +231,6 @@ public class FileSinkOperator extends Te
   private transient int timeOut; // JT timeout in msec.
   private transient long lastProgressReport = System.currentTimeMillis();
 
-  /**
-   * TableIdEnum.
-   *
-   */
-  public static enum TableIdEnum {
-    TABLE_ID_1_ROWCOUNT,
-    TABLE_ID_2_ROWCOUNT,
-    TABLE_ID_3_ROWCOUNT,
-    TABLE_ID_4_ROWCOUNT,
-    TABLE_ID_5_ROWCOUNT,
-    TABLE_ID_6_ROWCOUNT,
-    TABLE_ID_7_ROWCOUNT,
-    TABLE_ID_8_ROWCOUNT,
-    TABLE_ID_9_ROWCOUNT,
-    TABLE_ID_10_ROWCOUNT,
-    TABLE_ID_11_ROWCOUNT,
-    TABLE_ID_12_ROWCOUNT,
-    TABLE_ID_13_ROWCOUNT,
-    TABLE_ID_14_ROWCOUNT,
-    TABLE_ID_15_ROWCOUNT;
-  }
-
   protected transient boolean autoDelete = false;
   protected transient JobConf jc;
   Class<? extends Writable> outputClass;
@@ -356,14 +310,7 @@ public class FileSinkOperator extends Te
         prtner = (HivePartitioner<HiveKey, Object>) 
ReflectionUtils.newInstance(
             jc.getPartitionerClass(), null);
       }
-      int id = conf.getDestTableId();
-      if ((id != 0) && (id <= TableIdEnum.values().length)) {
-        String enumName = "TABLE_ID_" + String.valueOf(id) + "_ROWCOUNT";
-        tabIdEnum = TableIdEnum.valueOf(enumName);
-        row_count = new LongWritable();
-        statsMap.put(tabIdEnum, row_count);
-      }
-
+      row_count = new LongWritable();
       if (dpCtx != null) {
         dpSetup();
       }
@@ -478,7 +425,7 @@ public class FileSinkOperator extends Te
           taskId = 
Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), bucketNum);
         }
         if (isNativeTable) {
-          fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId);
+          fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, 
null);
           LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]);
           fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId);
           LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]);
@@ -803,20 +750,6 @@ public class FileSinkOperator extends Te
     return FileUtils.makePartName(dpColNames, row);
   }
 
-  private String getPartitionSpec(Path path, int level) {
-    Stack<String> st = new Stack<String>();
-    Path p = path;
-    for (int i = 0; i < level; ++i) {
-      st.push(p.getName());
-      p = p.getParent();
-    }
-    StringBuilder sb = new StringBuilder();
-    while (!st.empty()) {
-      sb.append(st.pop());
-    }
-    return sb.toString();
-  }
-
   @Override
   public void closeOp(boolean abort) throws HiveException {
     if (!bDynParts && !filesCreated) {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1549884&r1=1549883&r2=1549884&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue 
Dec 10 16:17:37 2013
@@ -1623,27 +1623,20 @@ public final class Utilities {
     FileSystem fs = (new Path(specPath)).getFileSystem(hconf);
     Path tmpPath = Utilities.toTempPath(specPath);
     Path taskTmpPath = Utilities.toTaskTempPath(specPath);
-    Path intermediatePath = new Path(tmpPath.getParent(), tmpPath.getName()
-        + ".intermediate");
     Path finalPath = new Path(specPath);
     if (success) {
       if (fs.exists(tmpPath)) {
-        // Step1: rename tmp output folder to intermediate path. After this
-        // point, updates from speculative tasks still writing to tmpPath
-        // will not appear in finalPath.
-        log.info("Moving tmp dir: " + tmpPath + " to: " + intermediatePath);
-        Utilities.rename(fs, tmpPath, intermediatePath);
-        // Step2: remove any tmp file or double-committed output files
+        // remove any tmp file or double-committed output files
         ArrayList<String> emptyBuckets =
-            Utilities.removeTempOrDuplicateFiles(fs, intermediatePath, dpCtx);
+            Utilities.removeTempOrDuplicateFiles(fs, tmpPath, dpCtx);
         // create empty buckets if necessary
         if (emptyBuckets.size() > 0) {
           createEmptyBuckets(hconf, emptyBuckets, conf, reporter);
         }
 
-        // Step3: move to the file destination
-        log.info("Moving tmp dir: " + intermediatePath + " to: " + finalPath);
-        Utilities.renameOrMoveFiles(fs, intermediatePath, finalPath);
+        // move to the file destination
+        log.info("Moving tmp dir: " + tmpPath + " to: " + finalPath);
+        Utilities.renameOrMoveFiles(fs, tmpPath, finalPath);
       }
     } else {
       fs.delete(tmpPath, true);


Reply via email to