Author: gates
Date: Tue Feb 28 01:50:50 2012
New Revision: 1294442
URL: http://svn.apache.org/viewvc?rev=1294442&view=rev
Log:
HCATALOG-278 When outputSchema doesn't match table schema wrong columns are
returned to the user
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1294442&r1=1294441&r2=1294442&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue Feb 28 01:50:50 2012
@@ -52,6 +52,8 @@ Trunk (unreleased changes)
OPTIMIZATIONS
BUG FIXES
+ HCAT-278 When outputSchema doesn't match table schema wrong columns are
returned to the user (gates)
+
HCAT-276 After merging in HCATALOG-237 related changes Pig scripts with more
than one store fail (daijy via gates)
HCAT-257 e2e harness not working properly after file location change (gates)
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java?rev=1294442&r1=1294441&r2=1294442&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
(original)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatRecordReader.java
Tue Feb 28 01:50:50 2012
@@ -36,10 +36,13 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import org.apache.hcatalog.data.DefaultHCatRecord;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.LazyHCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
/** The HCat wrapper for the underlying RecordReader,
* this ensures that the initialize on
@@ -64,6 +67,9 @@ class HCatRecordReader extends RecordRea
private Map<Integer,Object> partCols;
+ private HCatSchema outputSchema = null;
+ private HCatSchema tableSchema = null;
+
/**
* Instantiates a new hcat record reader.
* @param baseRecordReader the base record reader
@@ -89,6 +95,10 @@ class HCatRecordReader extends RecordRea
TaskAttemptContext taskContext)
throws IOException, InterruptedException {
org.apache.hadoop.mapred.InputSplit baseSplit;
+
+ // Pull the output schema out of the TaskAttemptContext
+ outputSchema = (HCatSchema)HCatUtil.deserialize(
+
taskContext.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_SCHEMA));
if( split instanceof HCatSplit ) {
baseSplit = ((HCatSplit) split).getBaseSplit();
@@ -96,6 +106,10 @@ class HCatRecordReader extends RecordRea
throw new IOException("Not a HCatSplit");
}
+ // Pull the table schema out of the Split info
+ // TODO This should be passed in teh TaskAttemptContext instead
+ tableSchema = ((HCatSplit)split).getTableSchema();
+
Properties properties = new Properties();
for (Map.Entry<String, String>param :
((HCatSplit)split).getPartitionInfo()
@@ -122,14 +136,32 @@ class HCatRecordReader extends RecordRea
HCatRecord r;
try {
- r = new DefaultHCatRecord((new LazyHCatRecord(
+ /*
+ return new DefaultHCatRecord((new LazyHCatRecord(
serde.deserialize(currentValue),
serde.getObjectInspector(),
partCols)).getAll());
+ */
+ r = new LazyHCatRecord(serde.deserialize(currentValue),
+ serde.getObjectInspector(), partCols);
+ if (outputSchema == null) {
+ // there's no projection being done
+ return new DefaultHCatRecord(r.getAll());
+ } else {
+ // For each field in the outputSchema, do the mapping
+ DefaultHCatRecord dr = new DefaultHCatRecord(outputSchema.size());
+ for (int i = 0; i < outputSchema.size(); i++) {
+ // Figure out the field to read
+ HCatFieldSchema ofs = outputSchema.get(i);
+ dr.set(i, r.get(ofs.getName(), tableSchema));
+ }
+ return dr;
+ }
+
+
} catch (Exception e) {
throw new IOException("Failed to create HCatRecord " + e);
}
- return r;
}
/* (non-Javadoc)