Author: khorgath
Date: Mon Mar 3 22:36:48 2014
New Revision: 1573788
URL: http://svn.apache.org/r1573788
Log:
HIVE-5193 : Columnar Pushdown for RC/ORC File not happening in HCatLoader
(Viraj Bhat via Sushanth Sowmyan)
Modified:
hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java
hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
Modified:
hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java?rev=1573788&r1=1573787&r2=1573788&view=diff
==============================================================================
---
hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java
(original)
+++
hive/trunk/hcatalog/hcatalog-pig-adapter/src/main/java/org/apache/hive/hcatalog/pig/HCatLoader.java
Mon Mar 3 22:36:48 2014
@@ -19,6 +19,7 @@
package org.apache.hive.hcatalog.pig;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hive.common.cla
import org.apache.hadoop.hive.common.classification.InterfaceStability;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.security.Credentials;
@@ -155,6 +157,12 @@ public class HCatLoader extends HCatBase
if (requiredFieldsInfo != null) {
// convert to hcatschema and pass to HCatInputFormat
try {
+ //push down projections to columnar store works for RCFile and ORCFile
+ ArrayList<Integer> list = new
ArrayList<Integer>(requiredFieldsInfo.getFields().size());
+ for (RequiredField rf : requiredFieldsInfo.getFields()) {
+ list.add(rf.getIndex());
+ }
+ ColumnProjectionUtils.appendReadColumns(job.getConfiguration(), list);
outputSchema = phutil.getHCatSchema(requiredFieldsInfo.getFields(),
signature, this.getClass());
HCatInputFormat.setOutputSchema(job, outputSchema);
} catch (Exception e) {
Modified:
hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java?rev=1573788&r1=1573787&r2=1573788&view=diff
==============================================================================
---
hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
(original)
+++
hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatLoader.java
Mon Mar 3 22:36:48 2014
@@ -19,6 +19,8 @@
package org.apache.hive.hcatalog.pig;
import java.io.File;
+import java.io.FileWriter;
+import java.io.PrintWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.sql.Date;
@@ -32,13 +34,16 @@ import java.util.Map;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.HcatTestUtils;
import org.apache.hive.hcatalog.common.HCatUtil;
@@ -52,6 +57,9 @@ import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.PigRunner;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Before;
@@ -435,6 +443,39 @@ public class TestHCatLoader {
}
@Test
+ public void testColumnarStorePushdown() throws Exception {
+ String PIGOUTPUT_DIR = TEST_DATA_DIR+ "/colpushdownop";
+ String PIG_FILE = "test.pig";
+ String expectedCols = "0,1";
+ PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+ w.println("A = load '" + COMPLEX_TABLE + "' using
org.apache.hive.hcatalog.pig.HCatLoader();");
+ w.println("B = foreach A generate name,studentid;");
+ w.println("C = filter B by name is not null;");
+ w.println("store C into '" + PIGOUTPUT_DIR + "' using PigStorage();");
+ w.close();
+
+ try {
+ String[] args = { "-x", "local", PIG_FILE };
+ PigStats stats = PigRunner.run(args, null);
+ //Pig script was successful
+ assertTrue(stats.isSuccessful());
+ //Single MapReduce job is launched
+ OutputStats outstats = stats.getOutputStats().get(0);
+ assertTrue(outstats!= null);
+ assertEquals(expectedCols,outstats.getConf()
+ .get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
+ //delete output file on exit
+ FileSystem fs = FileSystem.get(outstats.getConf());
+ if (fs.exists(new Path(PIGOUTPUT_DIR)))
+ {
+ fs.delete(new Path(PIGOUTPUT_DIR), true);
+ }
+ }finally {
+ new File(PIG_FILE).delete();
+ }
+ }
+
+ @Test
public void testGetInputBytes() throws Exception {
File file = new File(TEST_WAREHOUSE_DIR + "/" + SPECIFIC_SIZE_TABLE +
"/part-m-00000");
file.deleteOnExit();