Author: gates
Date: Mon Mar 26 16:53:56 2012
New Revision: 1305444
URL: http://svn.apache.org/viewvc?rev=1305444&view=rev
Log:
HCATALOG-328 HCatLoader should report its input size so pig can estimate the
number of reducers
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseLoader.java
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1305444&r1=1305443&r2=1305444&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Mon Mar 26 16:53:56 2012
@@ -23,6 +23,8 @@ Trunk (unreleased changes)
INCOMPATIBLE CHANGES
NEW FEATURES
+ HCAT-328 HCatLoader should report its input size so pig can estimate the
number of reducers (traviscrawford via gates)
+
HCAT-287 Add data api to HCatalog (hashutosh)
IMPROVEMENTS
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseLoader.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseLoader.java?rev=1305444&r1=1305443&r2=1305444&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseLoader.java
(original)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatBaseLoader.java
Mon Mar 26 16:53:56 2012
@@ -22,10 +22,15 @@ import java.util.Arrays;
import java.util.List;
import java.util.Properties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hcatalog.data.HCatRecord;
import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.apache.hcatalog.mapreduce.PartInfo;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadMetadata;
import org.apache.pig.LoadPushDown;
@@ -114,4 +119,31 @@ public abstract class HCatBaseLoader ext
props.put(key, value);
}
+ /**
+ * A utility method to get the size of inputs. This is accomplished by
summing the
+ * size of all input paths on supported FileSystems. Locations whose size
cannot be
+ * determined are ignored. Note non-FileSystem and unpartitioned locations
will not
+ * report their input size by default.
+ */
+ protected static long getSizeInBytes(InputJobInfo inputJobInfo) throws
IOException {
+ Configuration conf = new Configuration();
+ long sizeInBytes = 0;
+
+ for (PartInfo partInfo : inputJobInfo.getPartitions()) {
+ try {
+ Path p = new Path(partInfo.getLocation());
+ if (p.getFileSystem(conf).isFile(p)) {
+ sizeInBytes += p.getFileSystem(conf).getFileStatus(p).getLen();
+ } else {
+ for (FileStatus child : p.getFileSystem(conf).listStatus(p)) {
+ sizeInBytes += child.getLen();
+ }
+ }
+ } catch (IOException e) {
+ // Report size to the extent possible.
+ }
+ }
+
+ return sizeInBytes;
+ }
}
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java?rev=1305444&r1=1305443&r2=1305444&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
(original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatLoader.java
Mon Mar 26 16:53:56 2012
@@ -37,6 +37,7 @@ import org.apache.pig.Expression.BinaryE
import org.apache.pig.LoadFunc;
import org.apache.pig.PigException;
import org.apache.pig.ResourceSchema;
+import org.apache.pig.ResourceStatistics;
import org.apache.pig.impl.util.UDFContext;
/**
@@ -177,6 +178,22 @@ public class HCatLoader extends HCatBase
PARTITION_FILTER, partitionFilterString);
}
+ /**
+ * Get statistics about the data to be loaded. Only input data size is
implemented at this time.
+ */
+ @Override
+ public ResourceStatistics getStatistics(String location, Job job) throws
IOException {
+ try {
+ ResourceStatistics stats = new ResourceStatistics();
+ InputJobInfo inputJobInfo = (InputJobInfo) HCatUtil.deserialize(
+ job.getConfiguration().get(HCatConstants.HCAT_KEY_JOB_INFO));
+ stats.setmBytes(getSizeInBytes(inputJobInfo) / 1024 / 1024);
+ return stats;
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
private String getPartitionFilterString() {
if(partitionFilterString == null) {
Properties props = UDFContext.getUDFContext().getUDFProperties(
Modified:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java?rev=1305444&r1=1305443&r2=1305444&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java
(original)
+++
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatLoader.java
Mon Mar 26 16:53:56 2012
@@ -19,6 +19,7 @@ package org.apache.hcatalog.pig;
import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -34,10 +35,12 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.ql.CommandNeedRetryException;
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hcatalog.HcatTestUtils;
import org.apache.hcatalog.data.Pair;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
+import org.apache.pig.ResourceStatistics;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -53,9 +56,10 @@ public class TestHCatLoader extends Test
private static final String BASIC_TABLE = "junit_unparted_basic";
private static final String COMPLEX_TABLE = "junit_unparted_complex";
private static final String PARTITIONED_TABLE = "junit_parted_basic";
+ private static final String SPECIFIC_SIZE_TABLE = "junit_specific_size";
private static Driver driver;
- private static int guardTestCount = 5; // ugh, instantiate using
introspection in guardedSetupBeforeClass
+ private static int guardTestCount = 6; // ugh, instantiate using
introspection in guardedSetupBeforeClass
private static boolean setupHasRun = false;
private static Map<Integer,Pair<Integer,String>> basicInputData;
@@ -113,7 +117,7 @@ public class TestHCatLoader extends Test
+ "phnos array<struct<phno:string,type:string>>");
createTable(PARTITIONED_TABLE,"a int, b string","bkt string");
-
+ createTable(SPECIFIC_SIZE_TABLE, "a int, b string");
int LOOP_SIZE = 3;
String[] input = new String[LOOP_SIZE*LOOP_SIZE];
@@ -141,6 +145,7 @@ public class TestHCatLoader extends Test
server.registerQuery("A = load '"+BASIC_FILE_NAME+"' as (a:int,
b:chararray);");
server.registerQuery("store A into '"+BASIC_TABLE+"' using
org.apache.hcatalog.pig.HCatStorer();");
+ server.registerQuery("store A into '" + SPECIFIC_SIZE_TABLE + "' using
org.apache.hcatalog.pig.HCatStorer();");
server.registerQuery("B = foreach A generate a,b;");
server.registerQuery("B2 = filter B by a < 2;");
server.registerQuery("store B2 into '"+PARTITIONED_TABLE+"' using
org.apache.hcatalog.pig.HCatStorer('bkt=0');");
@@ -158,6 +163,7 @@ public class TestHCatLoader extends Test
dropTable(BASIC_TABLE);
dropTable(COMPLEX_TABLE);
dropTable(PARTITIONED_TABLE);
+ dropTable(SPECIFIC_SIZE_TABLE);
}
protected void guardedTearDownAfterClass() throws Exception {
@@ -376,4 +382,18 @@ public class TestHCatLoader extends Test
}
assertEquals(basicInputData.size(),numTuplesRead);
}
+
+ public void testGetInputBytes() throws Exception {
+ File file = new File(TEST_WAREHOUSE_DIR + "/" + SPECIFIC_SIZE_TABLE +
"/part-m-00000");
+ file.deleteOnExit();
+ RandomAccessFile randomAccessFile = new RandomAccessFile(file, "rw");
+ randomAccessFile.setLength(2L * 1024 * 1024 * 1024);
+
+ Job job = new Job();
+ HCatLoader hCatLoader = new HCatLoader();
+ hCatLoader.setUDFContextSignature(this.getName());
+ hCatLoader.setLocation(SPECIFIC_SIZE_TABLE, job);
+ ResourceStatistics statistics =
hCatLoader.getStatistics(file.getAbsolutePath(), job);
+ assertEquals(2048, (long) statistics.getmBytes());
+ }
}