Author: hashutosh
Date: Sat Sep 28 04:26:55 2013
New Revision: 1527149
URL: http://svn.apache.org/r1527149
Log:
HIVE-5324 : Extend record writer and ORC reader/writer interfaces to provide
statistics (Prasanth J via Ashutosh Chauhan)
Added:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java
Modified:
hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java
hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java
Modified:
hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java
(original)
+++
hive/trunk/contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base64/Base64TextOutputFormat.java
Sat Sep 28 04:26:55 2013
@@ -24,7 +24,7 @@ import java.util.Properties;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
@@ -53,13 +53,13 @@ public class Base64TextOutputFormat<K ex
* Base64RecordWriter.
*
*/
- public static class Base64RecordWriter implements RecordWriter,
+ public static class Base64RecordWriter implements FSRecordWriter,
JobConfigurable {
- RecordWriter writer;
+ FSRecordWriter writer;
BytesWritable bytesWritable;
- public Base64RecordWriter(RecordWriter writer) {
+ public Base64RecordWriter(FSRecordWriter writer) {
this.writer = writer;
bytesWritable = new BytesWritable();
}
@@ -119,7 +119,7 @@ public class Base64TextOutputFormat<K ex
}
@Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException {
Modified:
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
(original)
+++
hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHFileOutputFormat.java
Sat Sep 28 04:26:55 2013
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.Text;
@@ -71,7 +71,7 @@ public class HiveHFileOutputFormat exten
}
@Override
- public RecordWriter getHiveRecordWriter(
+ public FSRecordWriter getHiveRecordWriter(
final JobConf jc,
final Path finalOutPath,
Class<? extends Writable> valueClass,
@@ -120,7 +120,7 @@ public class HiveHFileOutputFormat exten
++i;
}
- return new RecordWriter() {
+ return new FSRecordWriter() {
@Override
public void close(boolean abort) throws IOException {
Modified:
hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java
(original)
+++
hive/trunk/hcatalog/core/src/test/java/org/apache/hcatalog/cli/DummyStorageHandler.java
Sat Sep 28 04:26:55 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.metadata.AuthorizationException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -285,7 +286,7 @@ class DummyStorageHandler extends HCatSt
* org.apache.hadoop.util.Progressable)
*/
@Override
- public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter
getHiveRecordWriter(
+ public FSRecordWriter getHiveRecordWriter(
JobConf jc, Path finalOutPath,
Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress)
Modified:
hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
(original)
+++
hive/trunk/hcatalog/storage-handlers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputFormat.java
Sat Sep 28 04:26:55 2013
@@ -25,6 +25,7 @@ import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -40,7 +41,7 @@ public class HBaseBaseOutputFormat imple
HiveOutputFormat<WritableComparable<?>, Put> {
@Override
- public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter
getHiveRecordWriter(
+ public FSRecordWriter getHiveRecordWriter(
JobConf jc, Path finalOutPath,
Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
Sat Sep 28 04:26:55 2013
@@ -35,11 +35,13 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter.StatsProvidingRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
-import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
import org.apache.hadoop.hive.ql.io.HivePartitioner;
+import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -84,11 +86,13 @@ public class FileSinkOperator extends Te
protected transient int dpStartCol; // start column # for DP columns
protected transient List<String> dpVals; // array of values corresponding to
DP columns
protected transient List<Object> dpWritables;
- protected transient RecordWriter[] rowOutWriters; // row specific
RecordWriters
+ protected transient FSRecordWriter[] rowOutWriters; // row specific
RecordWriters
protected transient int maxPartitions;
protected transient ListBucketingCtx lbCtx;
protected transient boolean isSkewedStoredAsSubDirectories;
private transient boolean statsCollectRawDataSize;
+ private transient boolean[] statsFromRecordWriter;
+ private transient boolean isCollectRWStats;
private static final transient String[] FATAL_ERR_MSG = {
@@ -96,22 +100,12 @@ public class FileSinkOperator extends Te
"Number of dynamic partitions exceeded
hive.exec.max.dynamic.partitions.pernode."
};
- /**
- * RecordWriter.
- *
- */
- public static interface RecordWriter {
- void write(Writable w) throws IOException;
-
- void close(boolean abort) throws IOException;
- }
-
public class FSPaths implements Cloneable {
Path tmpPath;
Path taskOutputTempPath;
Path[] outPaths;
Path[] finalPaths;
- RecordWriter[] outWriters;
+ FSRecordWriter[] outWriters;
Stat stat;
public FSPaths() {
@@ -122,7 +116,7 @@ public class FileSinkOperator extends Te
taskOutputTempPath = Utilities.toTaskTempPath(specPath);
outPaths = new Path[numFiles];
finalPaths = new Path[numFiles];
- outWriters = new RecordWriter[numFiles];
+ outWriters = new FSRecordWriter[numFiles];
stat = new Stat();
}
@@ -166,11 +160,11 @@ public class FileSinkOperator extends Te
}
}
- public void setOutWriters(RecordWriter[] out) {
+ public void setOutWriters(FSRecordWriter[] out) {
outWriters = out;
}
- public RecordWriter[] getOutWriters() {
+ public FSRecordWriter[] getOutWriters() {
return outWriters;
}
@@ -324,6 +318,7 @@ public class FileSinkOperator extends Te
isCompressed = conf.getCompressed();
parent = Utilities.toTempPath(conf.getDirName());
statsCollectRawDataSize = conf.isStatsCollectRawDataSize();
+ statsFromRecordWriter = new boolean[numFiles];
serializer = (Serializer)
conf.getTableInfo().getDeserializerClass().newInstance();
serializer.initialize(null, conf.getTableInfo().getProperties());
@@ -516,6 +511,8 @@ public class FileSinkOperator extends Te
fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(
jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx],
reporter);
+ // If the record writer provides stats, get it from there instead of
the serde
+ statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof
StatsProvidingRecordWriter;
// increment the CREATED_FILES counter
if (reporter != null) {
reporter.incrCounter(ProgressCounter.CREATED_FILES, 1);
@@ -619,7 +616,11 @@ public class FileSinkOperator extends Te
}
rowOutWriters = fpaths.outWriters;
- if (conf.isGatherStats()) {
+ // check if all record writers implement statistics. if atleast one RW
+ // doesn't implement stats interface we will fallback to conventional way
+ // of gathering stats
+ isCollectRWStats = areAllTrue(statsFromRecordWriter);
+ if (conf.isGatherStats() && !isCollectRWStats) {
if (statsCollectRawDataSize) {
SerDeStats stats = serializer.getSerDeStats();
if (stats != null) {
@@ -630,12 +631,14 @@ public class FileSinkOperator extends Te
}
+ FSRecordWriter rowOutWriter = null;
+
if (row_count != null) {
row_count.set(row_count.get() + 1);
}
if (!multiFileSpray) {
- rowOutWriters[0].write(recordValue);
+ rowOutWriter = rowOutWriters[0];
} else {
int keyHashCode = 0;
for (int i = 0; i < partitionEval.length; i++) {
@@ -646,8 +649,9 @@ public class FileSinkOperator extends Te
key.setHashCode(keyHashCode);
int bucketNum = prtner.getBucket(key, null, totalFiles);
int idx = bucketMap.get(bucketNum);
- rowOutWriters[idx].write(recordValue);
+ rowOutWriter = rowOutWriters[idx];
}
+ rowOutWriter.write(recordValue);
} catch (IOException e) {
throw new HiveException(e);
} catch (SerDeException e) {
@@ -655,6 +659,15 @@ public class FileSinkOperator extends Te
}
}
+ private boolean areAllTrue(boolean[] statsFromRW) {
+ for(boolean b : statsFromRW) {
+ if (!b) {
+ return false;
+ }
+ }
+ return true;
+ }
+
/**
* Lookup list bucketing path.
* @param lbDirName
@@ -864,6 +877,27 @@ public class FileSinkOperator extends Te
if (!abort) {
for (FSPaths fsp : valToPaths.values()) {
fsp.closeWriters(abort);
+
+ // before closing the operator check if statistics gathering is
requested
+ // and is provided by record writer. this is different from the
statistics
+ // gathering done in processOp(). In processOp(), for each row added
+ // serde statistics about the row is gathered and accumulated in
hashmap.
+ // this adds more overhead to the actual processing of row. But if the
+ // record writer already gathers the statistics, it can simply return
the
+ // accumulated statistics which will be aggregated in case of spray
writers
+ if (conf.isGatherStats() && isCollectRWStats) {
+ for (int idx = 0; idx < fsp.outWriters.length; idx++) {
+ FSRecordWriter outWriter = fsp.outWriters[idx];
+ if (outWriter != null) {
+ SerDeStats stats = ((StatsProvidingRecordWriter)
outWriter).getStats();
+ if (stats != null) {
+ fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE,
stats.getRawDataSize());
+ fsp.stat.addToStat(StatsSetupConst.ROW_COUNT,
stats.getRowCount());
+ }
+ }
+ }
+ }
+
if (isNativeTable) {
fsp.commit(fs);
}
@@ -934,7 +968,7 @@ public class FileSinkOperator extends Te
hiveOutputFormat =
ReflectionUtils.newInstance(conf.getTableInfo().getOutputFileFormatClass(),job);
}
else {
- hiveOutputFormat =
conf.getTableInfo().getOutputFileFormatClass().newInstance();
+ hiveOutputFormat =
conf.getTableInfo().getOutputFileFormatClass().newInstance();
}
}
else {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Sat
Sep 28 04:26:55 2013
@@ -102,12 +102,12 @@ import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
@@ -1694,7 +1694,7 @@ public final class Utilities {
for (String p : paths) {
Path path = new Path(p);
- RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
+ FSRecordWriter writer = HiveFileFormatUtils.getRecordWriter(
jc, hiveOutputFormat, outputClass, isCompressed,
tableInfo.getProperties(), path, reporter);
writer.close(false);
@@ -2853,7 +2853,7 @@ public final class Utilities {
Path newFilePath = new Path(newFile);
String onefile = newPath.toString();
- RecordWriter recWriter =
outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
+ FSRecordWriter recWriter =
outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
Text.class, false, props, null);
if (dummyRow) {
// empty files are omitted at CombineHiveInputFormat.
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
Sat Sep 28 04:26:55 2013
@@ -28,8 +28,8 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.PTFDeserializer;
@@ -240,7 +240,7 @@ public class PTFRowContainer<Row extends
}
- private static class PTFRecordWriter implements RecordWriter {
+ private static class PTFRecordWriter implements FSRecordWriter {
BytesWritable EMPTY_KEY = new BytesWritable();
SequenceFile.Writer outStream;
@@ -262,7 +262,7 @@ public class PTFRowContainer<Row extends
extends HiveSequenceFileOutputFormat<K,V> {
@Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException {
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
Sat Sep 28 04:26:55 2013
@@ -30,8 +30,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -105,7 +105,7 @@ public class RowContainer<ROW extends Li
int acutalSplitNum = 0;
int currentSplitPointer = 0;
org.apache.hadoop.mapred.RecordReader rr = null; // record reader
- RecordWriter rw = null;
+ FSRecordWriter rw = null;
InputFormat<WritableComparable, Writable> inputFormat = null;
InputSplit[] inputSplits = null;
private ROW dummyRow = null;
@@ -531,7 +531,7 @@ public class RowContainer<ROW extends Li
}
- protected RecordWriter getRecordWriter() {
+ protected FSRecordWriter getRecordWriter() {
return rw;
}
Added: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java?rev=1527149&view=auto
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java
(added)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/FSRecordWriter.java Sat
Sep 28 04:26:55 2013
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Record writer used by file sink operator.
+ *
+ * FSRecordWriter.
+ *
+ */
+public interface FSRecordWriter {
+ void write(Writable w) throws IOException;
+
+ void close(boolean abort) throws IOException;
+
+ /**
+ * If a file format internally gathers statistics (like ORC) while writing
then
+ * it can expose the statistics through this record writer interface. Writer
side
+ * statistics is useful for updating the metastore with table/partition level
+ * statistics.
+ * StatsProvidingRecordWriter.
+ *
+ */
+ public interface StatsProvidingRecordWriter extends FSRecordWriter{
+ /**
+ * Returns the statistics information
+ * @return SerDeStats
+ */
+ SerDeStats getStats();
+ }
+
+}
\ No newline at end of file
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java
Sat Sep 28 04:26:55 2013
@@ -24,7 +24,6 @@ import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -43,7 +42,7 @@ public class HiveBinaryOutputFormat<K ex
/**
* create the final out file, and output row by row. After one row is
* appended, a configured row separator is appended
- *
+ *
* @param jc
* the job configuration file
* @param outPath
@@ -59,14 +58,14 @@ public class HiveBinaryOutputFormat<K ex
* @return the RecordWriter
*/
@Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException {
FileSystem fs = outPath.getFileSystem(jc);
final OutputStream outStream = fs.create(outPath);
- return new RecordWriter() {
+ return new FSRecordWriter() {
public void write(Writable r) throws IOException {
if (r instanceof Text) {
Text tr = (Text) r;
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
Sat Sep 28 04:26:55 2013
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
@@ -246,7 +245,7 @@ public final class HiveFileFormatUtils {
return true;
}
- public static RecordWriter getHiveRecordWriter(JobConf jc,
+ public static FSRecordWriter getHiveRecordWriter(JobConf jc,
TableDesc tableInfo, Class<? extends Writable> outputClass,
FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException
{
boolean storagehandlerofhivepassthru = false;
@@ -287,7 +286,7 @@ public final class HiveFileFormatUtils {
}
}
- public static RecordWriter getRecordWriter(JobConf jc,
+ public static FSRecordWriter getRecordWriter(JobConf jc,
HiveOutputFormat<?, ?> hiveOutputFormat,
final Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProp, Path outPath, Reporter reporter
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java
Sat Sep 28 04:26:55 2013
@@ -25,7 +25,6 @@ import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
@@ -39,7 +38,7 @@ import org.apache.hadoop.util.Progressab
/**
* HiveIgnoreKeyTextOutputFormat replaces key with null before feeding the
<key,
* value> to TextOutputFormat.RecordWriter.
- *
+ *
*/
public class HiveIgnoreKeyTextOutputFormat<K extends WritableComparable, V
extends Writable>
extends TextOutputFormat<K, V> implements HiveOutputFormat<K, V> {
@@ -47,7 +46,7 @@ public class HiveIgnoreKeyTextOutputForm
/**
* create the final out file, and output row by row. After one row is
* appended, a configured row separator is appended
- *
+ *
* @param jc
* the job configuration file
* @param outPath
@@ -63,7 +62,7 @@ public class HiveIgnoreKeyTextOutputForm
* @return the RecordWriter
*/
@Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException {
int rowSeparator = 0;
@@ -79,7 +78,7 @@ public class HiveIgnoreKeyTextOutputForm
FileSystem fs = outPath.getFileSystem(jc);
final OutputStream outStream = Utilities.createCompressedStream(jc, fs
.create(outPath), isCompressed);
- return new RecordWriter() {
+ return new FSRecordWriter() {
public void write(Writable r) throws IOException {
if (r instanceof Text) {
Text tr = (Text) r;
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java
Sat Sep 28 04:26:55 2013
@@ -23,7 +23,6 @@ import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
@@ -48,7 +47,7 @@ public class HiveNullValueSequenceFileOu
private boolean keyIsText;
@Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException {
@@ -58,7 +57,7 @@ public class HiveNullValueSequenceFileOu
keyWritable = new HiveKey();
keyIsText = valueClass.equals(Text.class);
- return new RecordWriter() {
+ return new FSRecordWriter() {
public void write(Writable r) throws IOException {
if (keyIsText) {
Text text = (Text) r;
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java
Sat Sep 28 04:26:55 2013
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.Properties;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
@@ -58,7 +57,7 @@ public interface HiveOutputFormat<K, V>
* progress used for status report
* @return the RecordWriter for the output file
*/
- RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
final Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException;
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java
Sat Sep 28 04:26:55 2013
@@ -49,7 +49,7 @@ public class HivePassThroughOutputFormat
"org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat";
public static final String HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY =
- "hive.passthrough.storagehandler.of";
+ "hive.passthrough.storagehandler.of";
public HivePassThroughOutputFormat() {
//construct this class through ReflectionUtils from FileSinkOperator
@@ -99,7 +99,7 @@ public class HivePassThroughOutputFormat
}
@Override
- public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter
getHiveRecordWriter(
+ public FSRecordWriter getHiveRecordWriter(
JobConf jc, Path finalOutPath, Class<? extends Writable> valueClass,
boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException {
if (this.initialized == false) {
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
Sat Sep 28 04:26:55 2013
@@ -20,13 +20,12 @@ package org.apache.hadoop.hive.ql.io;
import java.io.IOException;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
public class HivePassThroughRecordWriter <K extends WritableComparable<?>, V
extends Writable>
-implements RecordWriter {
+implements FSRecordWriter {
private final org.apache.hadoop.mapred.RecordWriter<K, V> mWriter;
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java
Sat Sep 28 04:26:55 2013
@@ -23,7 +23,6 @@ import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
@@ -56,7 +55,7 @@ public class HiveSequenceFileOutputForma
* @return the RecordWriter for the output file
*/
@Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException {
@@ -64,7 +63,7 @@ public class HiveSequenceFileOutputForma
final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc,
fs, finalOutPath, BytesWritable.class, valueClass, isCompressed);
- return new RecordWriter() {
+ return new FSRecordWriter() {
public void write(Writable r) throws IOException {
outStream.append(EMPTY_KEY, r);
}
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
Sat Sep 28 04:26:55 2013
@@ -118,7 +118,7 @@ public class RCFileOutputFormat extends
* @throws IOException
*/
@Override
- public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter
getHiveRecordWriter(
+ public FSRecordWriter getHiveRecordWriter(
JobConf jc, Path finalOutPath, Class<? extends Writable> valueClass,
boolean isCompressed, Properties tableProperties, Progressable progress)
throws IOException {
@@ -135,7 +135,7 @@ public class RCFileOutputFormat extends
(jc, finalOutPath.getFileSystem(jc),
finalOutPath, isCompressed);
- return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
+ return new FSRecordWriter() {
public void write(Writable r) throws IOException {
outWriter.append(r);
}
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java
Sat Sep 28 04:26:55 2013
@@ -17,6 +17,14 @@
*/
package org.apache.hadoop.hive.ql.io.avro;
+import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
+import static org.apache.avro.mapred.AvroJob.OUTPUT_CODEC;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
+
+import java.io.IOException;
+import java.util.Properties;
+
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
@@ -24,7 +32,7 @@ import org.apache.avro.generic.GenericDa
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
@@ -36,14 +44,6 @@ import org.apache.hadoop.mapred.RecordWr
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
-import java.io.IOException;
-import java.util.Properties;
-
-import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
-import static org.apache.avro.mapred.AvroJob.OUTPUT_CODEC;
-import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
-import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
-
/**
* Write to an Avro file from a Hive process.
*/
@@ -51,7 +51,7 @@ public class AvroContainerOutputFormat
implements HiveOutputFormat<LongWritable, AvroGenericRecordWritable> {
@Override
- public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf,
+ public FSRecordWriter getHiveRecordWriter(JobConf jobConf,
Path path, Class<? extends Writable> valueClass, boolean isCompressed,
Properties properties, Progressable progressable) throws IOException {
Schema schema;
@@ -62,7 +62,7 @@ public class AvroContainerOutputFormat
}
GenericDatumWriter<GenericRecord> gdw = new
GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dfw = new DataFileWriter<GenericRecord>(gdw);
-
+
if (isCompressed) {
int level = jobConf.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
String codecName = jobConf.get(OUTPUT_CODEC, DEFLATE_CODEC);
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java
Sat Sep 28 04:26:55 2013
@@ -18,18 +18,18 @@
package org.apache.hadoop.hive.ql.io.avro;
+import java.io.IOException;
+
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
import org.apache.hadoop.io.Writable;
-import java.io.IOException;
-
/**
* Write an Avro GenericRecord to an Avro data file.
*/
-public class AvroGenericRecordWriter implements FileSinkOperator.RecordWriter{
+public class AvroGenericRecordWriter implements FSRecordWriter{
final private DataFileWriter<GenericRecord> dfw;
public AvroGenericRecordWriter(DataFileWriter<GenericRecord> dfw) {
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
Sat Sep 28 04:26:55 2013
@@ -17,10 +17,9 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde.OrcSerdeRow;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -45,7 +44,7 @@ public class OrcOutputFormat extends Fil
private static class OrcRecordWriter
implements RecordWriter<NullWritable, OrcSerdeRow>,
- FileSinkOperator.RecordWriter {
+ FSRecordWriter {
private Writer writer = null;
private final Path path;
private final OrcFile.WriterOptions options;
@@ -105,7 +104,7 @@ public class OrcOutputFormat extends Fil
}
@Override
- public FileSinkOperator.RecordWriter
+ public FSRecordWriter
getHiveRecordWriter(JobConf conf,
Path path,
Class<? extends Writable> valueClass,
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Sat Sep
28 04:26:55 2013
@@ -39,6 +39,19 @@ public interface Reader {
long getNumberOfRows();
/**
+ * Get the deserialized data size of the file
+ * @return raw data size
+ */
+ long getRawDataSize();
+
+ /**
+ * Get the deserialized data size of the specified columns
+ * @param colNames
+ * @return raw data size of columns
+ */
+ long getRawDataSizeOfColumns(List<String> colNames);
+
+ /**
* Get the user metadata keys.
* @return the set of metadata keys
*/
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Sat
Sep 28 04:26:55 2013
@@ -343,4 +343,14 @@ final class ReaderImpl implements Reader
include, footer.getRowIndexStride(), sarg, columnNames);
}
+ @Override
+ public long getRawDataSize() {
+ return 0;
+ }
+
+ @Override
+ public long getRawDataSizeOfColumns(List<String> colNames) {
+ return 0;
+ }
+
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Writer.java Sat Sep
28 04:26:55 2013
@@ -47,4 +47,22 @@ public interface Writer {
* @throws IOException
*/
void close() throws IOException;
+
+ /**
+ * Return the deserialized data size. Raw data size will be compute when
+ * writing the file footer. Hence raw data size value will be available only
+ * after closing the writer.
+ *
+ * @return raw data size
+ */
+ long getRawDataSize();
+
+ /**
+ * Return the number of rows in file. Row count gets updated when flushing
+ * the stripes. To get accurate row count this method should be called after
+ * closing the writer.
+ *
+ * @return row count
+ */
+ long getNumberOfRows();
}
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Sat
Sep 28 04:26:55 2013
@@ -1871,4 +1871,14 @@ class WriterImpl implements Writer, Memo
rawWriter.close();
}
}
+
+ @Override
+ public long getRawDataSize() {
+ return 0;
+ }
+
+ @Override
+ public long getNumberOfRows() {
+ return 0;
+ }
}
Modified:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
(original)
+++
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
Sat Sep 28 04:26:55 2013
@@ -41,7 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
@@ -521,7 +521,7 @@ public class TestInputOutputFormat {
}
SerDe serde = new OrcSerde();
HiveOutputFormat<?, ?> outFormat = new OrcOutputFormat();
- FileSinkOperator.RecordWriter writer =
+ FSRecordWriter writer =
outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true,
properties, Reporter.NULL);
writer.write(serde.serialize(new MyRow(1,2), inspector));
@@ -686,7 +686,7 @@ public class TestInputOutputFormat {
JobConf job = new JobConf(conf);
Properties properties = new Properties();
HiveOutputFormat<?, ?> outFormat = new OrcOutputFormat();
- FileSinkOperator.RecordWriter writer =
+ FSRecordWriter writer =
outFormat.getHiveRecordWriter(conf, testFilePath, MyRow.class, true,
properties, Reporter.NULL);
writer.close(true);
@@ -731,7 +731,7 @@ public class TestInputOutputFormat {
}
SerDe serde = new OrcSerde();
HiveOutputFormat<?, ?> outFormat = new OrcOutputFormat();
- FileSinkOperator.RecordWriter writer =
+ FSRecordWriter writer =
outFormat.getHiveRecordWriter(conf, testFilePath, StringRow.class,
true, properties, Reporter.NULL);
writer.write(serde.serialize(new StringRow("owen"), inspector));
Modified:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
---
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java
(original)
+++
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/udf/Rot13OutputFormat.java
Sat Sep 28 04:26:55 2013
@@ -1,7 +1,10 @@
package org.apache.hadoop.hive.ql.io.udf;
+import java.io.IOException;
+import java.util.Properties;
+
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
@@ -11,27 +14,24 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
-import java.io.IOException;
-import java.util.Properties;
-
public class Rot13OutputFormat
extends HiveIgnoreKeyTextOutputFormat<LongWritable,Text> {
@Override
- public RecordWriter
+ public FSRecordWriter
getHiveRecordWriter(JobConf jc,
Path outPath,
Class<? extends Writable> valueClass,
boolean isCompressed,
Properties tableProperties,
Progressable progress) throws IOException {
- final RecordWriter result =
+ final FSRecordWriter result =
super.getHiveRecordWriter(jc,outPath,valueClass,isCompressed,
tableProperties,progress);
final Reporter reporter = (Reporter) progress;
reporter.setStatus("got here");
System.out.println("Got a reporter " + reporter);
- return new RecordWriter() {
+ return new FSRecordWriter() {
@Override
public void write(Writable w) throws IOException {
if (w instanceof Text) {
Modified:
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java
URL:
http://svn.apache.org/viewvc/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java?rev=1527149&r1=1527148&r2=1527149&view=diff
==============================================================================
--- hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java
(original)
+++ hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/SerDeStats.java Sat
Sep 28 04:26:55 2013
@@ -27,9 +27,11 @@ public class SerDeStats {
// currently we support only raw data size stat
private long rawDataSize;
+ private long rowCount;
public SerDeStats() {
rawDataSize = 0;
+ rowCount = 0;
}
/**
@@ -48,4 +50,20 @@ public class SerDeStats {
rawDataSize = uSize;
}
+ /**
+ * Return the row count
+ * @return row count
+ */
+ public long getRowCount() {
+ return rowCount;
+ }
+
+ /**
+ * Set the row count
+ * @param rowCount - count of rows
+ */
+ public void setRowCount(long rowCount) {
+ this.rowCount = rowCount;
+ }
+
}