Author: hashutosh
Date: Sat Sep 28 04:26:55 2013
New Revision: 1527149

URL: http://svn.apache.org/r1527149
Log:
HIVE-5324 : Extend record writer and ORC reader/writer interfaces to provide 
statistics (Prasanth J via Ashutosh Chauhan)

Added:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java
Modified:
    
hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java
    
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
    
hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java
    
hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java
    
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
    
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
    
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java
    hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java

Modified: 
hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java
 (original)
+++ 
hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java
 Sat Sep 28 04:26:55 2013
@@ -24,7 +24,7 @@ import java.util.Properties;
 
 import org.apache.commons.codec.binary.Base64;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
@@ -53,13 +53,13 @@ public class Base64TextOutputFormat<K ex
    * Base64RecordWriter.
    *
    */
-  public static class Base64RecordWriter implements RecordWriter,
+  public static class Base64RecordWriter implements FSRecordWriter,
       JobConfigurable {
 
-    RecordWriter writer;
+    FSRecordWriter writer;
     BytesWritable bytesWritable;
 
-    public Base64RecordWriter(RecordWriter writer) {
+    public Base64RecordWriter(FSRecordWriter writer) {
       this.writer = writer;
       bytesWritable = new BytesWritable();
     }
@@ -119,7 +119,7 @@ public class Base64TextOutputFormat<K ex
   }
 
   @Override
-  public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+  public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
       Class<? extends Writable> valueClass, boolean isCompressed,
       Properties tableProperties, Progressable progress) throws IOException {
 

Modified: 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
 (original)
+++ 
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
 Sat Sep 28 04:26:55 2013
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.Text;
@@ -71,7 +71,7 @@ public class HiveHFileOutputFormat exten
   }
 
   @Override
-  public RecordWriter getHiveRecordWriter(
+  public FSRecordWriter getHiveRecordWriter(
     final JobConf jc,
     final Path finalOutPath,
     Class<? extends Writable> valueClass,
@@ -120,7 +120,7 @@ public class HiveHFileOutputFormat exten
       ++i;
     }
 
-    return new RecordWriter() {
+    return new FSRecordWriter() {
 
       @Override
       public void close(boolean abort) throws IOException {

Modified: 
hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java
 (original)
+++ 
hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java
 Sat Sep 28 04:26:55 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.HiveMetaHook;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -285,7 +286,7 @@ class DummyStorageHandler extends HCatSt
      * org.apache.hadoop.util.Progressable)
      */
     @Override
-    public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter 
getHiveRecordWriter(
+    public FSRecordWriter getHiveRecordWriter(
       JobConf jc, Path finalOutPath,
       Class<? extends Writable> valueClass, boolean isCompressed,
       Properties tableProperties, Progressable progress)

Modified: 
hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
 (original)
+++ 
hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
 Sat Sep 28 04:26:55 2013
@@ -25,6 +25,7 @@ import java.util.Properties;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -40,7 +41,7 @@ public class HBaseBaseOutputFormat imple
   HiveOutputFormat<WritableComparable<?>, Put> {
 
   @Override
-  public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter 
getHiveRecordWriter(
+  public FSRecordWriter getHiveRecordWriter(
     JobConf jc, Path finalOutPath,
     Class<? extends Writable> valueClass, boolean isCompressed,
     Properties tableProperties, Progressable progress)

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
Sat Sep 28 04:26:55 2013
@@ -35,11 +35,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter.StatsProvidingRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
-import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
 import org.apache.hadoop.hive.ql.io.HivePartitioner;
+import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -84,11 +86,13 @@ public class FileSinkOperator extends Te
   protected transient int dpStartCol; // start column # for DP columns
   protected transient List<String> dpVals; // array of values corresponding to 
DP columns
   protected transient List<Object> dpWritables;
-  protected transient RecordWriter[] rowOutWriters; // row specific 
RecordWriters
+  protected transient FSRecordWriter[] rowOutWriters; // row specific 
RecordWriters
   protected transient int maxPartitions;
   protected transient ListBucketingCtx lbCtx;
   protected transient boolean isSkewedStoredAsSubDirectories;
   private transient boolean statsCollectRawDataSize;
+  private transient boolean[] statsFromRecordWriter;
+  private transient boolean isCollectRWStats;
 
 
   private static final transient String[] FATAL_ERR_MSG = {
@@ -96,22 +100,12 @@ public class FileSinkOperator extends Te
       "Number of dynamic partitions exceeded 
hive.exec.max.dynamic.partitions.pernode."
   };
 
-  /**
-   * RecordWriter.
-   *
-   */
-  public static interface RecordWriter {
-    void write(Writable w) throws IOException;
-
-    void close(boolean abort) throws IOException;
-  }
-
   public class FSPaths implements Cloneable {
     Path tmpPath;
     Path taskOutputTempPath;
     Path[] outPaths;
     Path[] finalPaths;
-    RecordWriter[] outWriters;
+    FSRecordWriter[] outWriters;
     Stat stat;
 
     public FSPaths() {
@@ -122,7 +116,7 @@ public class FileSinkOperator extends Te
       taskOutputTempPath = Utilities.toTaskTempPath(specPath);
       outPaths = new Path[numFiles];
       finalPaths = new Path[numFiles];
-      outWriters = new RecordWriter[numFiles];
+      outWriters = new FSRecordWriter[numFiles];
       stat = new Stat();
     }
 
@@ -166,11 +160,11 @@ public class FileSinkOperator extends Te
       }
     }
 
-    public void setOutWriters(RecordWriter[] out) {
+    public void setOutWriters(FSRecordWriter[] out) {
       outWriters = out;
     }
 
-    public RecordWriter[] getOutWriters() {
+    public FSRecordWriter[] getOutWriters() {
       return outWriters;
     }
 
@@ -324,6 +318,7 @@ public class FileSinkOperator extends Te
       isCompressed = conf.getCompressed();
       parent = Utilities.toTempPath(conf.getDirName());
       statsCollectRawDataSize = conf.isStatsCollectRawDataSize();
+      statsFromRecordWriter = new boolean[numFiles];
 
       serializer = (Serializer) 
conf.getTableInfo().getDeserializerClass().newInstance();
       serializer.initialize(null, conf.getTableInfo().getProperties());
@@ -516,6 +511,8 @@ public class FileSinkOperator extends Te
         fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(
             jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx],
             reporter);
+        // If the record writer provides stats, get it from there instead of 
the serde
+        statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof 
StatsProvidingRecordWriter;
         // increment the CREATED_FILES counter
         if (reporter != null) {
           reporter.incrCounter(ProgressCounter.CREATED_FILES, 1);
@@ -619,7 +616,11 @@ public class FileSinkOperator extends Te
       }
 
       rowOutWriters = fpaths.outWriters;
-      if (conf.isGatherStats()) {
+      // check if all record writers implement statistics. if atleast one RW
+      // doesn't implement stats interface we will fallback to conventional way
+      // of gathering stats
+      isCollectRWStats = areAllTrue(statsFromRecordWriter);
+      if (conf.isGatherStats() && !isCollectRWStats) {
         if (statsCollectRawDataSize) {
           SerDeStats stats = serializer.getSerDeStats();
           if (stats != null) {
@@ -630,12 +631,14 @@ public class FileSinkOperator extends Te
       }
 
 
+      FSRecordWriter rowOutWriter = null;
+
       if (row_count != null) {
         row_count.set(row_count.get() + 1);
       }
 
       if (!multiFileSpray) {
-        rowOutWriters[0].write(recordValue);
+        rowOutWriter = rowOutWriters[0];
       } else {
         int keyHashCode = 0;
         for (int i = 0; i < partitionEval.length; i++) {
@@ -646,8 +649,9 @@ public class FileSinkOperator extends Te
         key.setHashCode(keyHashCode);
         int bucketNum = prtner.getBucket(key, null, totalFiles);
         int idx = bucketMap.get(bucketNum);
-        rowOutWriters[idx].write(recordValue);
+        rowOutWriter = rowOutWriters[idx];
       }
+      rowOutWriter.write(recordValue);
     } catch (IOException e) {
       throw new HiveException(e);
     } catch (SerDeException e) {
@@ -655,6 +659,15 @@ public class FileSinkOperator extends Te
     }
   }
 
+  private boolean areAllTrue(boolean[] statsFromRW) {
+    for(boolean b : statsFromRW) {
+      if (!b) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   /**
    * Lookup list bucketing path.
    * @param lbDirName
@@ -864,6 +877,27 @@ public class FileSinkOperator extends Te
     if (!abort) {
       for (FSPaths fsp : valToPaths.values()) {
         fsp.closeWriters(abort);
+
+        // before closing the operator check if statistics gathering is 
requested
+        // and is provided by record writer. this is different from the 
statistics
+        // gathering done in processOp(). In processOp(), for each row added
+        // serde statistics about the row is gathered and accumulated in 
hashmap.
+        // this adds more overhead to the actual processing of row. But if the
+        // record writer already gathers the statistics, it can simply return 
the
+        // accumulated statistics which will be aggregated in case of spray 
writers
+        if (conf.isGatherStats() && isCollectRWStats) {
+          for (int idx = 0; idx < fsp.outWriters.length; idx++) {
+            FSRecordWriter outWriter = fsp.outWriters[idx];
+            if (outWriter != null) {
+              SerDeStats stats = ((StatsProvidingRecordWriter) 
outWriter).getStats();
+              if (stats != null) {
+                fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, 
stats.getRawDataSize());
+                fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, 
stats.getRowCount());
+              }
+            }
+          }
+        }
+
         if (isNativeTable) {
           fsp.commit(fs);
         }
@@ -934,7 +968,7 @@ public class FileSinkOperator extends Te
                  hiveOutputFormat = 
ReflectionUtils.newInstance(conf.getTableInfo().getOutputFileFormatClass(),job);
            }
           else {
-                 hiveOutputFormat = 
conf.getTableInfo().getOutputFileFormatClass().newInstance(); 
+                 hiveOutputFormat = 
conf.getTableInfo().getOutputFileFormatClass().newInstance();
           }
         }
         else {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Sat 
Sep 28 04:26:55 2013
@@ -102,12 +102,12 @@ import org.apache.hadoop.hive.ql.Context
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
@@ -1694,7 +1694,7 @@ public final class Utilities {
 
     for (String p : paths) {
       Path path = new Path(p);
-      RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
+      FSRecordWriter writer = HiveFileFormatUtils.getRecordWriter(
           jc, hiveOutputFormat, outputClass, isCompressed,
           tableInfo.getProperties(), path, reporter);
       writer.close(false);
@@ -2853,7 +2853,7 @@ public final class Utilities {
     Path newFilePath = new Path(newFile);
 
     String onefile = newPath.toString();
-    RecordWriter recWriter = 
outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
+    FSRecordWriter recWriter = 
outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
         Text.class, false, props, null);
     if (dummyRow) {
       // empty files are omitted at CombineHiveInputFormat.

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
 Sat Sep 28 04:26:55 2013
@@ -28,8 +28,8 @@ import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.PTFDeserializer;
@@ -240,7 +240,7 @@ public class PTFRowContainer<Row extends
   }
 
 
-  private static class PTFRecordWriter implements RecordWriter {
+  private static class PTFRecordWriter implements FSRecordWriter {
     BytesWritable EMPTY_KEY = new BytesWritable();
 
     SequenceFile.Writer outStream;
@@ -262,7 +262,7 @@ public class PTFRowContainer<Row extends
     extends HiveSequenceFileOutputFormat<K,V> {
 
     @Override
-    public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+    public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
         Class<? extends Writable> valueClass, boolean isCompressed,
         Properties tableProperties, Progressable progress) throws IOException {
 

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
 Sat Sep 28 04:26:55 2013
@@ -30,8 +30,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -105,7 +105,7 @@ public class RowContainer<ROW extends Li
   int acutalSplitNum = 0;
   int currentSplitPointer = 0;
   org.apache.hadoop.mapred.RecordReader rr = null; // record reader
-  RecordWriter rw = null;
+  FSRecordWriter rw = null;
   InputFormat<WritableComparable, Writable> inputFormat = null;
   InputSplit[] inputSplits = null;
   private ROW dummyRow = null;
@@ -531,7 +531,7 @@ public class RowContainer<ROW extends Li
 
   }
 
-  protected RecordWriter getRecordWriter() {
+  protected FSRecordWriter getRecordWriter() {
     return rw;
   }
 

Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java?rev=1527149&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java 
(added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java Sat 
Sep 28 04:26:55 2013
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Record writer used by file sink operator.
+ *
+ * FSRecordWriter.
+ *
+ */
+public interface FSRecordWriter {
+  void write(Writable w) throws IOException;
+
+  void close(boolean abort) throws IOException;
+
+  /**
+   * If a file format internally gathers statistics (like ORC) while writing 
then
+   * it can expose the statistics through this record writer interface. Writer 
side
+   * statistics is useful for updating the metastore with table/partition level
+   * statistics.
+   * StatsProvidingRecordWriter.
+   *
+   */
+  public interface StatsProvidingRecordWriter extends FSRecordWriter{
+    /**
+     * Returns the statistics information
+     * @return SerDeStats
+     */
+    SerDeStats getStats();
+  }
+
+}
\ No newline at end of file

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java 
(original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java 
Sat Sep 28 04:26:55 2013
@@ -24,7 +24,6 @@ import java.util.Properties;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -43,7 +42,7 @@ public class HiveBinaryOutputFormat<K ex
   /**
    * create the final out file, and output row by row. After one row is
    * appended, a configured row separator is appended
-   * 
+   *
    * @param jc
    *          the job configuration file
    * @param outPath
@@ -59,14 +58,14 @@ public class HiveBinaryOutputFormat<K ex
    * @return the RecordWriter
    */
   @Override
-  public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
+  public FSRecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
       Class<? extends Writable> valueClass, boolean isCompressed,
       Properties tableProperties, Progressable progress) throws IOException {
 
     FileSystem fs = outPath.getFileSystem(jc);
     final OutputStream outStream = fs.create(outPath);
 
-    return new RecordWriter() {
+    return new FSRecordWriter() {
       public void write(Writable r) throws IOException {
         if (r instanceof Text) {
           Text tr = (Text) r;

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java 
(original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java 
Sat Sep 28 04:26:55 2013
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
@@ -246,7 +245,7 @@ public final class HiveFileFormatUtils {
     return true;
   }
 
-  public static RecordWriter getHiveRecordWriter(JobConf jc,
+  public static FSRecordWriter getHiveRecordWriter(JobConf jc,
       TableDesc tableInfo, Class<? extends Writable> outputClass,
       FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException 
{
     boolean storagehandlerofhivepassthru = false;
@@ -287,7 +286,7 @@ public final class HiveFileFormatUtils {
     }
   }
 
-  public static RecordWriter getRecordWriter(JobConf jc,
+  public static FSRecordWriter getRecordWriter(JobConf jc,
       HiveOutputFormat<?, ?> hiveOutputFormat,
       final Class<? extends Writable> valueClass, boolean isCompressed,
       Properties tableProp, Path outPath, Reporter reporter

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java
 Sat Sep 28 04:26:55 2013
@@ -25,7 +25,6 @@ import java.util.Properties;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
@@ -39,7 +38,7 @@ import org.apache.hadoop.util.Progressab
 /**
  * HiveIgnoreKeyTextOutputFormat replaces key with null before feeding the 
<key,
  * value> to TextOutputFormat.RecordWriter.
- * 
+ *
  */
 public class HiveIgnoreKeyTextOutputFormat<K extends WritableComparable, V 
extends Writable>
     extends TextOutputFormat<K, V> implements HiveOutputFormat<K, V> {
@@ -47,7 +46,7 @@ public class HiveIgnoreKeyTextOutputForm
   /**
    * create the final out file, and output row by row. After one row is
    * appended, a configured row separator is appended
-   * 
+   *
    * @param jc
    *          the job configuration file
    * @param outPath
@@ -63,7 +62,7 @@ public class HiveIgnoreKeyTextOutputForm
    * @return the RecordWriter
    */
   @Override
-  public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
+  public FSRecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
       Class<? extends Writable> valueClass, boolean isCompressed,
       Properties tableProperties, Progressable progress) throws IOException {
     int rowSeparator = 0;
@@ -79,7 +78,7 @@ public class HiveIgnoreKeyTextOutputForm
     FileSystem fs = outPath.getFileSystem(jc);
     final OutputStream outStream = Utilities.createCompressedStream(jc, fs
         .create(outPath), isCompressed);
-    return new RecordWriter() {
+    return new FSRecordWriter() {
       public void write(Writable r) throws IOException {
         if (r instanceof Text) {
           Text tr = (Text) r;

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java
 Sat Sep 28 04:26:55 2013
@@ -23,7 +23,6 @@ import java.util.Properties;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -48,7 +47,7 @@ public class HiveNullValueSequenceFileOu
   private boolean keyIsText;
 
   @Override
-  public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+  public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
       Class<? extends Writable> valueClass, boolean isCompressed,
       Properties tableProperties, Progressable progress) throws IOException {
 
@@ -58,7 +57,7 @@ public class HiveNullValueSequenceFileOu
 
     keyWritable = new HiveKey();
     keyIsText = valueClass.equals(Text.class);
-    return new RecordWriter() {
+    return new FSRecordWriter() {
       public void write(Writable r) throws IOException {
         if (keyIsText) {
           Text text = (Text) r;

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java 
Sat Sep 28 04:26:55 2013
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.Properties;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
@@ -58,7 +57,7 @@ public interface HiveOutputFormat<K, V> 
    *          progress used for status report
    * @return the RecordWriter for the output file
    */
-  RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+  FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
       final Class<? extends Writable> valueClass, boolean isCompressed,
       Properties tableProperties, Progressable progress) throws IOException;
 

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java
 Sat Sep 28 04:26:55 2013
@@ -49,7 +49,7 @@ public class HivePassThroughOutputFormat
                                   
"org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat";
 
   public static final String HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY =
-                                 "hive.passthrough.storagehandler.of"; 
+                                 "hive.passthrough.storagehandler.of";
 
   public HivePassThroughOutputFormat() {
     //construct this class through ReflectionUtils from FileSinkOperator
@@ -99,7 +99,7 @@ public class HivePassThroughOutputFormat
   }
 
   @Override
-  public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter 
getHiveRecordWriter(
+  public FSRecordWriter getHiveRecordWriter(
       JobConf jc, Path finalOutPath, Class<? extends Writable> valueClass, 
boolean isCompressed,
       Properties tableProperties, Progressable progress) throws IOException {
     if (this.initialized == false) {

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
 Sat Sep 28 04:26:55 2013
@@ -20,13 +20,12 @@ package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 
 public class HivePassThroughRecordWriter <K extends WritableComparable<?>, V 
extends Writable>
-implements RecordWriter {
+implements FSRecordWriter {
 
   private final org.apache.hadoop.mapred.RecordWriter<K, V> mWriter;
 

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java
 Sat Sep 28 04:26:55 2013
@@ -23,7 +23,6 @@ import java.util.Properties;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -56,7 +55,7 @@ public class HiveSequenceFileOutputForma
    * @return the RecordWriter for the output file
    */
   @Override
-  public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+  public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
       Class<? extends Writable> valueClass, boolean isCompressed,
       Properties tableProperties, Progressable progress) throws IOException {
 
@@ -64,7 +63,7 @@ public class HiveSequenceFileOutputForma
     final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc,
         fs, finalOutPath, BytesWritable.class, valueClass, isCompressed);
 
-    return new RecordWriter() {
+    return new FSRecordWriter() {
       public void write(Writable r) throws IOException {
         outStream.append(EMPTY_KEY, r);
       }

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java 
Sat Sep 28 04:26:55 2013
@@ -118,7 +118,7 @@ public class RCFileOutputFormat extends
    * @throws IOException
    */
   @Override
-  public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter 
getHiveRecordWriter(
+  public FSRecordWriter getHiveRecordWriter(
       JobConf jc, Path finalOutPath, Class<? extends Writable> valueClass,
       boolean isCompressed, Properties tableProperties, Progressable progress) 
throws IOException {
 
@@ -135,7 +135,7 @@ public class RCFileOutputFormat extends
       (jc, finalOutPath.getFileSystem(jc),
        finalOutPath, isCompressed);
 
-    return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
+    return new FSRecordWriter() {
       public void write(Writable r) throws IOException {
         outWriter.append(r);
       }

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java
 Sat Sep 28 04:26:55 2013
@@ -17,6 +17,14 @@
  */
 package org.apache.hadoop.hive.ql.io.avro;
 
+import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
+import static org.apache.avro.mapred.AvroJob.OUTPUT_CODEC;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
+
+import java.io.IOException;
+import java.util.Properties;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
@@ -24,7 +32,7 @@ import org.apache.avro.generic.GenericDa
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
@@ -36,14 +44,6 @@ import org.apache.hadoop.mapred.RecordWr
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Progressable;
 
-import java.io.IOException;
-import java.util.Properties;
-
-import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
-import static org.apache.avro.mapred.AvroJob.OUTPUT_CODEC;
-import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
-import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
-
 /**
  * Write to an Avro file from a Hive process.
  */
@@ -51,7 +51,7 @@ public class AvroContainerOutputFormat
         implements HiveOutputFormat<LongWritable, AvroGenericRecordWritable> {
 
   @Override
-  public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf,
+  public FSRecordWriter getHiveRecordWriter(JobConf jobConf,
          Path path, Class<? extends Writable> valueClass, boolean isCompressed,
          Properties properties, Progressable progressable) throws IOException {
     Schema schema;
@@ -62,7 +62,7 @@ public class AvroContainerOutputFormat
     }
     GenericDatumWriter<GenericRecord> gdw = new 
GenericDatumWriter<GenericRecord>(schema);
     DataFileWriter<GenericRecord> dfw = new DataFileWriter<GenericRecord>(gdw);
-    
+
     if (isCompressed) {
       int level = jobConf.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
       String codecName = jobConf.get(OUTPUT_CODEC, DEFLATE_CODEC);

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java
 (original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java
 Sat Sep 28 04:26:55 2013
@@ -18,18 +18,18 @@
 package org.apache.hadoop.hive.ql.io.avro;
 
 
+import java.io.IOException;
+
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
 import org.apache.hadoop.io.Writable;
 
-import java.io.IOException;
-
 /**
  * Write an Avro GenericRecord to an Avro data file.
  */
-public class AvroGenericRecordWriter implements FileSinkOperator.RecordWriter{
+public class AvroGenericRecordWriter implements FSRecordWriter{
   final private DataFileWriter<GenericRecord> dfw;
 
   public AvroGenericRecordWriter(DataFileWriter<GenericRecord> dfw) {

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java 
(original)
+++ 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java 
Sat Sep 28 04:26:55 2013
@@ -17,10 +17,9 @@
  */
 package org.apache.hadoop.hive.ql.io.orc;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -45,7 +44,7 @@ public class OrcOutputFormat extends Fil
 
   private static class OrcRecordWriter
       implements RecordWriter<NullWritable, OrcSerdeRow>,
-                 FileSinkOperator.RecordWriter {
+                 FSRecordWriter {
     private Writer writer = null;
     private final Path path;
     private final OrcFile.WriterOptions options;
@@ -105,7 +104,7 @@ public class OrcOutputFormat extends Fil
   }
 
   @Override
-  public FileSinkOperator.RecordWriter
+  public FSRecordWriter
      getHiveRecordWriter(JobConf conf,
                          Path path,
                          Class<? extends Writable> valueClass,

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Sat Sep 
28 04:26:55 2013
@@ -39,6 +39,19 @@ public interface Reader {
   long getNumberOfRows();
 
   /**
+   * Get the deserialized data size of the file
+   * @return raw data size
+   */
+  long getRawDataSize();
+
+  /**
+   * Get the deserialized data size of the specified columns
+   * @param colNames
+   * @return raw data size of columns
+   */
+  long getRawDataSizeOfColumns(List<String> colNames);
+
+  /**
    * Get the user metadata keys.
    * @return the set of metadata keys
    */

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Sat 
Sep 28 04:26:55 2013
@@ -343,4 +343,14 @@ final class ReaderImpl implements Reader
         include, footer.getRowIndexStride(), sarg, columnNames);
   }
 
+  @Override
+  public long getRawDataSize() {
+    return 0;
+  }
+
+  @Override
+  public long getRawDataSizeOfColumns(List<String> colNames) {
+    return 0;
+  }
+
 }

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java Sat Sep 
28 04:26:55 2013
@@ -47,4 +47,22 @@ public interface Writer {
    * @throws IOException
    */
   void close() throws IOException;
+
+  /**
+   * Return the deserialized data size. Raw data size will be compute when
+   * writing the file footer. Hence raw data size value will be available only
+   * after closing the writer.
+   *
+   * @return raw data size
+   */
+  long getRawDataSize();
+
+  /**
+   * Return the number of rows in file. Row count gets updated when flushing
+   * the stripes. To get accurate row count this method should be called after
+   * closing the writer.
+   *
+   * @return row count
+   */
+  long getNumberOfRows();
 }

Modified: 
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java 
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Sat 
Sep 28 04:26:55 2013
@@ -1871,4 +1871,14 @@ class WriterImpl implements Writer, Memo
       rawWriter.close();
     }
   }
+
+  @Override
+  public long getRawDataSize() {
+    return 0;
+  }
+
+  @Override
+  public long getNumberOfRows() {
+    return 0;
+  }
 }

Modified: 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
 (original)
+++ 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
 Sat Sep 28 04:26:55 2013
@@ -41,7 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -521,7 +521,7 @@ public class TestInputOutputFormat {
     }
     SerDe serde = new OrcSerde();
     HiveOutputFormat<?, ?> outFormat = new OrcOutputFormat();
-    FileSinkOperator.RecordWriter writer =
+    FSRecordWriter writer =
         outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true,
             properties, Reporter.NULL);
     writer.write(serde.serialize(new MyRow(1,2), inspector));
@@ -686,7 +686,7 @@ public class TestInputOutputFormat {
     JobConf job = new JobConf(conf);
     Properties properties = new Properties();
     HiveOutputFormat<?, ?> outFormat = new OrcOutputFormat();
-    FileSinkOperator.RecordWriter writer =
+    FSRecordWriter writer =
         outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true,
             properties, Reporter.NULL);
     writer.close(true);
@@ -731,7 +731,7 @@ public class TestInputOutputFormat {
     }
     SerDe serde = new OrcSerde();
     HiveOutputFormat<?, ?> outFormat = new OrcOutputFormat();
-    FileSinkOperator.RecordWriter writer =
+    FSRecordWriter writer =
         outFormat.getHiveRecordWriter(conf, testFilePath, StringRow.class,
             true, properties, Reporter.NULL);
     writer.write(serde.serialize(new StringRow("owen"), inspector));

Modified: 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java 
(original)
+++ 
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java 
Sat Sep 28 04:26:55 2013
@@ -1,7 +1,10 @@
 package org.apache.hadoop.hive.ql.io.udf;
 
+import java.io.IOException;
+import java.util.Properties;
+
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -11,27 +14,24 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Progressable;
 
-import java.io.IOException;
-import java.util.Properties;
-
 public class Rot13OutputFormat
   extends HiveIgnoreKeyTextOutputFormat<LongWritable,Text> {
 
   @Override
-  public RecordWriter
+  public FSRecordWriter
     getHiveRecordWriter(JobConf jc,
                         Path outPath,
                         Class<? extends Writable> valueClass,
                         boolean isCompressed,
                         Properties tableProperties,
                         Progressable progress) throws IOException {
-    final RecordWriter result =
+    final FSRecordWriter result =
       super.getHiveRecordWriter(jc,outPath,valueClass,isCompressed,
         tableProperties,progress);
     final Reporter reporter = (Reporter) progress;
     reporter.setStatus("got here");
     System.out.println("Got a reporter " + reporter);
-    return new RecordWriter() {
+    return new FSRecordWriter() {
       @Override
       public void write(Writable w) throws IOException {
         if (w instanceof Text) {

Modified: 
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java 
(original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java Sat 
Sep 28 04:26:55 2013
@@ -27,9 +27,11 @@ public class SerDeStats {
 
   // currently we support only raw data size stat
   private long rawDataSize;
+  private long rowCount;
 
   public SerDeStats() {
     rawDataSize = 0;
+    rowCount = 0;
   }
 
   /**
@@ -48,4 +50,20 @@ public class SerDeStats {
     rawDataSize = uSize;
   }
 
+  /**
+   * Return the row count
+   * @return row count
+   */
+  public long getRowCount() {
+    return rowCount;
+  }
+
+  /**
+   * Set the row count
+   * @param rowCount - count of rows
+   */
+  public void setRowCount(long rowCount) {
+    this.rowCount = rowCount;
+  }
+
 }


Reply via email to