Author: gates
Date: Thu Apr 12 18:20:21 2012
New Revision: 1325423
URL: http://svn.apache.org/viewvc?rev=1325423&view=rev
Log:
HCATALOG-367 Pig writes data but partition information isn't getting updated
Modified:
incubator/hcatalog/trunk/CHANGES.txt
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
Modified: incubator/hcatalog/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1325423&r1=1325422&r2=1325423&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Thu Apr 12 18:20:21 2012
@@ -110,6 +110,8 @@ Release 0.4.0 - Unreleased
OPTIMIZATIONS
BUG FIXES
+ HCAT-367 Pig writes data but partition information isn't getting updated
(rohini via gates)
+
HCAT-365 HCatUtil.getStorageHandler should set the configuration
(traviscrawford via gates)
HCAT-371 A few javadoc warnings have crept into HCat (gates)
Modified:
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java?rev=1325423&r1=1325422&r2=1325423&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java
(original)
+++
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java
Thu Apr 12 18:20:21 2012
@@ -19,10 +19,7 @@
package org.apache.hcatalog.mapreduce;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.metastore.HiveMetaHook;
import org.apache.hadoop.hive.ql.io.RCFile;
@@ -32,13 +29,11 @@ import org.apache.hadoop.hive.ql.securit
import
org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.hcatalog.common.HCatUtil;
import java.io.IOException;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -95,13 +90,13 @@ public class FosterStorageHandler extend
}
@Override
- public void configureInputJobProperties(TableDesc tableDesc,
+ public void configureInputJobProperties(TableDesc tableDesc,
Map<String, String> jobProperties)
{
}
@Override
- public void configureOutputJobProperties(TableDesc tableDesc,
+ public void configureOutputJobProperties(TableDesc tableDesc,
Map<String, String> jobProperties) {
try {
OutputJobInfo jobInfo = (OutputJobInfo)
@@ -114,7 +109,7 @@ public class FosterStorageHandler extend
// For dynamic partitioned writes without all keyvalues specified,
// we create a temp dir for the associated write job
if (dynHash != null){
- parentPath = new Path(parentPath,
+ parentPath = new Path(parentPath,
DYNTEMP_DIR_NAME+dynHash).toString();
}
@@ -128,16 +123,13 @@ public class FosterStorageHandler extend
List<String> cols = new ArrayList<String>();
List<String> values = new ArrayList<String>();
- //sort the cols and vals
- for(String name:
+ //Get the output location in the order partition keys are
defined for the table.
+ for(String name:
jobInfo.getTableInfo().
getPartitionColumns().getFieldNames()) {
String value = jobInfo.getPartitionValues().get(name);
- int i=0;
- while(i <cols.size() && name.compareTo(cols.get(i)) > 0)
- i++;
- cols.add(i,name);
- values.add(i,value);
+ cols.add(name);
+ values.add(value);
}
outputLocation = FileUtils.makePartName(cols, values);
}
@@ -145,7 +137,7 @@ public class FosterStorageHandler extend
jobInfo.setLocation(new
Path(parentPath,outputLocation).toString());
//only set output dir if partition is fully materialized
- if(jobInfo.getPartitionValues().size()
+ if(jobInfo.getPartitionValues().size()
== jobInfo.getTableInfo().getPartitionColumns().size()) {
jobProperties.put("mapred.output.dir", jobInfo.getLocation());
}
@@ -179,7 +171,7 @@ public class FosterStorageHandler extend
}
@Override
- public HiveAuthorizationProvider getAuthorizationProvider()
+ public HiveAuthorizationProvider getAuthorizationProvider()
throws HiveException {
return new DefaultHiveAuthorizationProvider();
}
Modified:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1325423&r1=1325422&r2=1325423&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
(original)
+++
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java
Thu Apr 12 18:20:21 2012
@@ -50,6 +50,7 @@ import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
@@ -262,8 +263,12 @@ public abstract class HCatMapReduceTest
HCatOutputFormat.setSchema(job, new HCatSchema(partitionColumns));
- job.waitForCompletion(true);
- new FileOutputCommitterContainer(job,null).cleanupJob(job);
+ boolean success = job.waitForCompletion(true);
+ if (success) {
+ new FileOutputCommitterContainer(job,null).commitJob(job);
+ } else {
+ new FileOutputCommitterContainer(job,null).abortJob(job,
JobStatus.State.FAILED);
+ }
if (assertWrite){
// we assert only if we expected to assert with this call.
Assert.assertEquals(writeCount, MapCreate.writeCount);
Modified:
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
URL:
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java?rev=1325423&r1=1325422&r2=1325423&view=diff
==============================================================================
---
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
(original)
+++
incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java
Thu Apr 12 18:20:21 2012
@@ -62,7 +62,9 @@ public class TestHCatPartitioned extends
@Override
protected List<FieldSchema> getPartitionKeys() {
List<FieldSchema> fields = new ArrayList<FieldSchema>();
+ //Defining partition names in unsorted order
fields.add(new FieldSchema("PaRT1", Constants.STRING_TYPE_NAME, ""));
+ fields.add(new FieldSchema("part0", Constants.STRING_TYPE_NAME, ""));
return fields;
}
@@ -79,11 +81,13 @@ public class TestHCatPartitioned extends
Map<String, String> partitionMap = new HashMap<String, String>();
partitionMap.put("part1", "p1value1");
+ partitionMap.put("part0", "p0value1");
runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
partitionMap.clear();
partitionMap.put("PART1", "p1value2");
+ partitionMap.put("PART0", "p0value2");
runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
@@ -102,7 +106,8 @@ public class TestHCatPartitioned extends
//Test for publish with invalid partition key name
exc = null;
partitionMap.clear();
- partitionMap.put("px", "p1value2");
+ partitionMap.put("px1", "p1value2");
+ partitionMap.put("px0", "p0value2");
try {
runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
@@ -114,6 +119,21 @@ public class TestHCatPartitioned extends
assertTrue(exc instanceof HCatException);
assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException)
exc).getErrorType());
+ //Test for publish with missing partition key values
+ exc = null;
+ partitionMap.clear();
+ partitionMap.put("px", "p1value2");
+
+ try {
+ runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true);
+ } catch(IOException e) {
+ exc = e;
+ }
+
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException)
exc).getErrorType());
+
//Test for null partition value map
exc = null;
@@ -135,6 +155,9 @@ public class TestHCatPartitioned extends
runMRRead(10, "part1 = \"p1value1\"");
runMRRead(20, "part1 = \"p1value2\"");
runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\"");
+ runMRRead(10, "part0 = \"p0value1\"");
+ runMRRead(20, "part0 = \"p0value2\"");
+ runMRRead(30, "part0 = \"p0value1\" or part0 = \"p0value2\"");
tableSchemaTest();
columnOrderChangeTest();
@@ -147,7 +170,7 @@ public class TestHCatPartitioned extends
HCatSchema tableSchema = getTableSchema();
- assertEquals(3, tableSchema.getFields().size());
+ assertEquals(4, tableSchema.getFields().size());
//Update partition schema to have 3 fields
partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new
FieldSchema("c3", Constants.STRING_TYPE_NAME, "")));
@@ -166,21 +189,24 @@ public class TestHCatPartitioned extends
Map<String, String> partitionMap = new HashMap<String, String>();
partitionMap.put("part1", "p1value5");
+ partitionMap.put("part0", "p0value5");
runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true);
tableSchema = getTableSchema();
//assert that c3 has got added to table schema
- assertEquals(4, tableSchema.getFields().size());
+ assertEquals(5, tableSchema.getFields().size());
assertEquals("c1", tableSchema.getFields().get(0).getName());
assertEquals("c2", tableSchema.getFields().get(1).getName());
assertEquals("c3", tableSchema.getFields().get(2).getName());
assertEquals("part1", tableSchema.getFields().get(3).getName());
+ assertEquals("part0", tableSchema.getFields().get(4).getName());
//Test that changing column data type fails
partitionMap.clear();
partitionMap.put("part1", "p1value6");
+ partitionMap.put("part0", "p0value6");
partitionColumns = new ArrayList<HCatFieldSchema>();
partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new
FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
@@ -225,13 +251,16 @@ public class TestHCatPartitioned extends
List<HCatRecord> records= runMRRead(20,"part1 = \"p1value6\"");
assertEquals(20, records.size());
+ records= runMRRead(20,"part0 = \"p0value6\"");
+ assertEquals(20, records.size());
Integer i =0;
for(HCatRecord rec : records){
- assertEquals(4, rec.size());
+ assertEquals(5, rec.size());
assertTrue(rec.get(0).equals(i));
assertTrue(rec.get(1).equals("c2value"+i));
assertTrue(rec.get(2).equals("c3value"+i));
assertTrue(rec.get(3).equals("p1value6"));
+ assertTrue(rec.get(4).equals("p0value6"));
i++;
}
}
@@ -241,7 +270,7 @@ public class TestHCatPartitioned extends
HCatSchema tableSchema = getTableSchema();
- assertEquals(4, tableSchema.getFields().size());
+ assertEquals(5, tableSchema.getFields().size());
partitionColumns = new ArrayList<HCatFieldSchema>();
partitionColumns.add(HCatSchemaUtils.getHCatFieldSchema(new
FieldSchema("c1", Constants.INT_TYPE_NAME, "")));
@@ -263,7 +292,7 @@ public class TestHCatPartitioned extends
Map<String, String> partitionMap = new HashMap<String, String>();
partitionMap.put("part1", "p1value8");
-
+ partitionMap.put("part0", "p0value8");
Exception exc = null;
try {