Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/HCatTableInfo.java Fri Jul 22 23:38:07 2011 @@ -19,6 +19,7 @@ package org.apache.hcatalog.mapreduce; import java.io.Serializable; +import java.util.List; import java.util.Map; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -66,6 +67,10 @@ public class HCatTableInfo implements Se /** The partition values to publish to, if used for output*/ private Map<String, String> partitionValues; + /** List of keys for which values were not specified at write setup time, to be infered at write time */ + private List<String> dynamicPartitioningKeys; + + /** * Initializes a new HCatTableInfo instance to be used with {@link HCatInputFormat} * for reading data from a table. @@ -229,6 +234,27 @@ public class HCatTableInfo implements Se return serverKerberosPrincipal; } + /** + * Returns whether or not Dynamic Partitioning is used + * @return whether or not dynamic partitioning is currently enabled and used + */ + public boolean isDynamicPartitioningUsed() { + return !((dynamicPartitioningKeys == null) || (dynamicPartitioningKeys.isEmpty())); + } + + /** + * Sets the list of dynamic partitioning keys used for outputting without specifying all the keys + * @param dynamicPartitioningKeys + */ + public void setDynamicPartitioningKeys(List<String> dynamicPartitioningKeys) { + this.dynamicPartitioningKeys = dynamicPartitioningKeys; + } + + public List<String> getDynamicPartitioningKeys(){ + return this.dynamicPartitioningKeys; + } + + @Override public int hashCode() { int result = 17; @@ -240,8 +266,9 @@ public class HCatTableInfo implements Se result = 31*result + (partitionPredicates == null ? 0 : partitionPredicates.hashCode()); result = 31*result + tableInfoType.ordinal(); result = 31*result + (partitionValues == null ? 0 : partitionValues.hashCode()); + result = 31*result + (dynamicPartitioningKeys == null ? 0 : dynamicPartitioningKeys.hashCode()); return result; - } + }
Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/OutputJobInfo.java Fri Jul 22 23:38:07 2011 @@ -56,6 +56,11 @@ class OutputJobInfo implements Serializa * data contains partition columns.*/ private List<Integer> posOfPartCols; + private List<Integer> posOfDynPartCols; + + private int maxDynamicPartitions; + + private boolean harRequested; /** * @return the posOfPartCols @@ -65,6 +70,13 @@ class OutputJobInfo implements Serializa } /** + * @return the posOfDynPartCols + */ + protected List<Integer> getPosOfDynPartCols() { + return posOfDynPartCols; + } + + /** * @param posOfPartCols the posOfPartCols to set */ protected void setPosOfPartCols(List<Integer> posOfPartCols) { @@ -78,6 +90,14 @@ class OutputJobInfo implements Serializa this.posOfPartCols = posOfPartCols; } + /** + * @param posOfDynPartCols the posOfDynPartCols to set + */ + protected void setPosOfDynPartCols(List<Integer> posOfDynPartCols) { + // Important - no sorting here! We retain order, it's used to match with values at runtime + this.posOfDynPartCols = posOfDynPartCols; + } + public OutputJobInfo(HCatTableInfo tableInfo, HCatSchema outputSchema, HCatSchema tableSchema, StorerInfo storerInfo, String location, Table table) { super(); @@ -139,4 +159,36 @@ class OutputJobInfo implements Serializa return table; } + /** + * Set maximum number of allowable dynamic partitions + * @param maxDynamicPartitions + */ + public void setMaximumDynamicPartitions(int maxDynamicPartitions){ + this.maxDynamicPartitions = maxDynamicPartitions; + } + + /** + * Returns maximum number of allowable dynamic partitions + * @return maximum number of allowable dynamic partitions + */ + public int getMaxDynamicPartitions() { + return this.maxDynamicPartitions; + } + + /** + * Sets whether or not hadoop archiving has been requested for this job + * @param harRequested + */ + public void setHarRequested(boolean harRequested){ + this.harRequested = harRequested; + } + + /** + * Returns whether or not hadoop archiving has been requested for this job + * @return whether or not hadoop archiving has been requested for this job + */ + public boolean getHarRequested() { + return this.harRequested; + } + } Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatEximStorer.java Fri Jul 22 23:38:07 2011 @@ -145,7 +145,7 @@ public class HCatEximStorer extends HCat //In local mode, mapreduce will not call HCatOutputCommitter.cleanupJob. //Calling it from here so that the partition publish happens. //This call needs to be removed after MAPREDUCE-1447 is fixed. - new HCatEximOutputCommitter(null).cleanupJob(job); + new HCatEximOutputCommitter(job,null).cleanupJob(job); } } } Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/HCatStorer.java Fri Jul 22 23:38:07 2011 @@ -109,34 +109,35 @@ public class HCatStorer extends HCatBase computedSchema = convertPigSchemaToHCatSchema(pigSchema,hcatTblSchema); HCatOutputFormat.setSchema(job, computedSchema); p.setProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO, config.get(HCatConstants.HCAT_KEY_OUTPUT_INFO)); - if(config.get(HCatConstants.HCAT_KEY_HIVE_CONF) != null){ - p.setProperty(HCatConstants.HCAT_KEY_HIVE_CONF, config.get(HCatConstants.HCAT_KEY_HIVE_CONF)); - } - if(config.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null){ - p.setProperty(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, - config.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE)); - } + + PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_HIVE_CONF); + PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_DYNAMIC_PTN_JOBID); + PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_TOKEN_SIGNATURE); + PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE); + PigHCatUtil.saveConfigIntoUDFProperties(p, config,HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM); + p.setProperty(COMPUTED_OUTPUT_SCHEMA,ObjectSerializer.serialize(computedSchema)); }else{ config.set(HCatConstants.HCAT_KEY_OUTPUT_INFO, p.getProperty(HCatConstants.HCAT_KEY_OUTPUT_INFO)); - if(p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF) != null){ - config.set(HCatConstants.HCAT_KEY_HIVE_CONF, p.getProperty(HCatConstants.HCAT_KEY_HIVE_CONF)); - } - if(p.getProperty(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null){ - config.set(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE, - p.getProperty(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE)); - } + + PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_HIVE_CONF); + PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_DYNAMIC_PTN_JOBID); + PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_TOKEN_SIGNATURE); + PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_SIGNATURE); + PigHCatUtil.getConfigFromUDFProperties(p, config, HCatConstants.HCAT_KEY_JOBCLIENT_TOKEN_STRFORM); + } } + @Override public void storeSchema(ResourceSchema schema, String arg1, Job job) throws IOException { if( job.getConfiguration().get("mapred.job.tracker", "").equalsIgnoreCase("local") ) { //In local mode, mapreduce will not call HCatOutputCommitter.cleanupJob. //Calling it from here so that the partition publish happens. //This call needs to be removed after MAPREDUCE-1447 is fixed. - new HCatOutputCommitter(null).cleanupJob(job); + new HCatOutputCommitter(job,null).cleanupJob(job); } } } Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/pig/PigHCatUtil.java Fri Jul 22 23:38:07 2011 @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -405,4 +406,16 @@ public static Object extractPigObject(Ob } + public static void getConfigFromUDFProperties(Properties p, Configuration config, String propName) { + if(p.getProperty(propName) != null){ + config.set(propName, p.getProperty(propName)); + } + } + + public static void saveConfigIntoUDFProperties(Properties p, Configuration config, String propName) { + if(config.get(propName) != null){ + p.setProperty(propName, config.get(propName)); + } + } + } Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileMapReduceOutputFormat.java Fri Jul 22 23:38:07 2011 @@ -19,6 +19,8 @@ package org.apache.hcatalog.rcfile; import java.io.IOException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java (original) +++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/rcfile/RCFileOutputDriver.java Fri Jul 22 23:38:07 2011 @@ -45,6 +45,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.HCatRecord; import org.apache.hcatalog.data.schema.HCatFieldSchema; Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java (original) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/HCatMapReduceTest.java Fri Jul 22 23:38:07 2011 @@ -243,7 +243,7 @@ public abstract class HCatMapReduceTest void runMRCreate(Map<String, String> partitionValues, List<HCatFieldSchema> partitionColumns, List<HCatRecord> records, - int writeCount) throws Exception { + int writeCount, boolean assertWrite) throws Exception { writeRecords = records; MapCreate.writeCount = 0; @@ -275,8 +275,11 @@ public abstract class HCatMapReduceTest //new HCatOutputCommitter(null).setupJob(job); job.waitForCompletion(true); - new HCatOutputCommitter(null).cleanupJob(job); - Assert.assertEquals(writeCount, MapCreate.writeCount); + new HCatOutputCommitter(job,null).cleanupJob(job); + if (assertWrite){ + // we assert only if we expected to assert with this call. + Assert.assertEquals(writeCount, MapCreate.writeCount); + } } List<HCatRecord> runMRRead(int readCount) throws Exception { Added: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java?rev=1149763&view=auto ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (added) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatDynamicPartitioned.java Fri Jul 22 23:38:07 2011 @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hcatalog.mapreduce; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.ql.CommandNeedRetryException; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hcatalog.common.ErrorType; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatException; +import org.apache.hcatalog.data.DefaultHCatRecord; +import org.apache.hcatalog.data.HCatRecord; +import org.apache.hcatalog.data.schema.HCatFieldSchema; +import org.apache.hcatalog.data.schema.HCatSchemaUtils; + +public class TestHCatDynamicPartitioned extends HCatMapReduceTest { + + private List<HCatRecord> writeRecords; + private List<HCatFieldSchema> dataColumns; + + @Override + protected void initialize() throws Exception { + + tableName = "testHCatDynamicPartitionedTable"; + generateWriteRecords(20,5,0); + generateDataColumns(); + } + + private void generateDataColumns() throws HCatException { + dataColumns = new ArrayList<HCatFieldSchema>(); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c1", Constants.INT_TYPE_NAME, ""))); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("c2", Constants.STRING_TYPE_NAME, ""))); + dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("p1", Constants.STRING_TYPE_NAME, ""))); + } + + private void generateWriteRecords(int max, int mod,int offset) { + writeRecords = new ArrayList<HCatRecord>(); + + for(int i = 0;i < max;i++) { + List<Object> objList = new ArrayList<Object>(); + + objList.add(i); + objList.add("strvalue" + i); + objList.add(String.valueOf((i % mod)+offset)); + writeRecords.add(new DefaultHCatRecord(objList)); + } + } + + @Override + protected List<FieldSchema> getPartitionKeys() { + List<FieldSchema> fields = new ArrayList<FieldSchema>(); + fields.add(new FieldSchema("p1", Constants.STRING_TYPE_NAME, "")); + return fields; + } + + @Override + protected List<FieldSchema> getTableColumns() { + List<FieldSchema> fields = new ArrayList<FieldSchema>(); + fields.add(new FieldSchema("c1", Constants.INT_TYPE_NAME, "")); + fields.add(new FieldSchema("c2", Constants.STRING_TYPE_NAME, "")); + return fields; + } + + + public void testHCatDynamicPartitionedTable() throws Exception { + + generateWriteRecords(20,5,0); + runMRCreate(null, dataColumns, writeRecords, 20,true); + + runMRRead(20); + + //Read with partition filter + runMRRead(4, "p1 = \"0\""); + runMRRead(8, "p1 = \"1\" or p1 = \"3\""); + runMRRead(4, "p1 = \"4\""); + + // read from hive to test + + String query = "select * from " + tableName; + int retCode = driver.run(query).getResponseCode(); + + if( retCode != 0 ) { + throw new Exception("Error " + retCode + " running query " + query); + } + + ArrayList<String> res = new ArrayList<String>(); + driver.getResults(res); + assertEquals(20, res.size()); + + + //Test for duplicate publish + IOException exc = null; + try { + generateWriteRecords(20,5,0); + runMRCreate(null, dataColumns, writeRecords, 20,false); + } catch(IOException e) { + exc = e; + } + + assertTrue(exc != null); + assertTrue(exc instanceof HCatException); + assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) exc).getErrorType()); + } + + public void testHCatDynamicPartitionMaxPartitions() throws Exception { + HiveConf hc = new HiveConf(this.getClass()); + + int maxParts = hiveConf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTS); + System.out.println("Max partitions allowed = " + maxParts); + + IOException exc = null; + try { + generateWriteRecords(maxParts+5,maxParts+2,10); + runMRCreate(null,dataColumns,writeRecords,maxParts+5,false); + } catch(IOException e) { + exc = e; + } + + if (HCatConstants.HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED){ + assertTrue(exc != null); + assertTrue(exc instanceof HCatException); + assertEquals(ErrorType.ERROR_TOO_MANY_DYNAMIC_PTNS, ((HCatException) exc).getErrorType()); + }else{ + assertTrue(exc == null); + runMRRead(maxParts+5); + } + } +} Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java (original) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximInputFormat.java Fri Jul 22 23:38:07 2011 @@ -228,7 +228,7 @@ public class TestHCatEximInputFormat ext schema); job.waitForCompletion(true); - HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null); + HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null); committer.cleanupJob(job); } @@ -247,7 +247,7 @@ public class TestHCatEximInputFormat ext schema); job.waitForCompletion(true); - HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null); + HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null); committer.cleanupJob(job); } Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java (original) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatEximOutputFormat.java Fri Jul 22 23:38:07 2011 @@ -109,7 +109,7 @@ public class TestHCatEximOutputFormat ex schema); job.waitForCompletion(true); - HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null); + HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null); committer.cleanupJob(job); Path metadataPath = new Path(outputLocation, "_metadata"); @@ -165,7 +165,7 @@ public class TestHCatEximOutputFormat ex schema); job.waitForCompletion(true); - HCatEximOutputCommitter committer = new HCatEximOutputCommitter(null); + HCatEximOutputCommitter committer = new HCatEximOutputCommitter(job,null); committer.cleanupJob(job); Path metadataPath = new Path(outputLocation, "_metadata"); Map.Entry<Table, List<Partition>> rv = EximUtil.readMetaData(fs, metadataPath); Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java (original) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatNonPartitioned.java Fri Jul 22 23:38:07 2011 @@ -78,12 +78,12 @@ public class TestHCatNonPartitioned exte public void testHCatNonPartitionedTable() throws Exception { Map<String, String> partitionMap = new HashMap<String, String>(); - runMRCreate(null, partitionColumns, writeRecords, 10); + runMRCreate(null, partitionColumns, writeRecords, 10,true); //Test for duplicate publish IOException exc = null; try { - runMRCreate(null, partitionColumns, writeRecords, 20); + runMRCreate(null, partitionColumns, writeRecords, 20,true); } catch(IOException e) { exc = e; } @@ -98,7 +98,7 @@ public class TestHCatNonPartitioned exte partitionMap.put("px", "p1value2"); try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 20); + runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true); } catch(IOException e) { exc = e; } Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java (original) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatOutputFormat.java Fri Jul 22 23:38:07 2011 @@ -160,7 +160,7 @@ public class TestHCatOutputFormat extend } public void publishTest(Job job) throws Exception { - OutputCommitter committer = new HCatOutputCommitter(null); + OutputCommitter committer = new HCatOutputCommitter(job,null); committer.cleanupJob(job); Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1")); Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java (original) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/mapreduce/TestHCatPartitioned.java Fri Jul 22 23:38:07 2011 @@ -80,17 +80,17 @@ public class TestHCatPartitioned extends Map<String, String> partitionMap = new HashMap<String, String>(); partitionMap.put("part1", "p1value1"); - runMRCreate(partitionMap, partitionColumns, writeRecords, 10); + runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true); partitionMap.clear(); partitionMap.put("PART1", "p1value2"); - runMRCreate(partitionMap, partitionColumns, writeRecords, 20); + runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true); //Test for duplicate publish IOException exc = null; try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 20); + runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true); } catch(IOException e) { exc = e; } @@ -105,7 +105,7 @@ public class TestHCatPartitioned extends partitionMap.put("px", "p1value2"); try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 20); + runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true); } catch(IOException e) { exc = e; } @@ -118,14 +118,15 @@ public class TestHCatPartitioned extends //Test for null partition value map exc = null; try { - runMRCreate(null, partitionColumns, writeRecords, 20); + runMRCreate(null, partitionColumns, writeRecords, 20,false); } catch(IOException e) { exc = e; } - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType()); + assertTrue(exc == null); +// assertTrue(exc instanceof HCatException); +// assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) exc).getErrorType()); + // With Dynamic partitioning, this isn't an error that the keyValues specified didn't values //Read should get 10 + 20 rows runMRRead(30); @@ -166,7 +167,7 @@ public class TestHCatPartitioned extends Map<String, String> partitionMap = new HashMap<String, String>(); partitionMap.put("part1", "p1value5"); - runMRCreate(partitionMap, partitionColumns, writeRecords, 10); + runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true); tableSchema = getTableSchema(); @@ -187,7 +188,7 @@ public class TestHCatPartitioned extends IOException exc = null; try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 20); + runMRCreate(partitionMap, partitionColumns, writeRecords, 20,true); } catch(IOException e) { exc = e; } @@ -217,7 +218,7 @@ public class TestHCatPartitioned extends exc = null; try { - runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20); + runMRCreate(partitionMap, partitionColumns, recordsContainingPartitionCols, 20,true); } catch(IOException e) { exc = e; } @@ -266,7 +267,7 @@ public class TestHCatPartitioned extends Exception exc = null; try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 10); + runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true); } catch(IOException e) { exc = e; } @@ -291,7 +292,7 @@ public class TestHCatPartitioned extends writeRecords.add(new DefaultHCatRecord(objList)); } - runMRCreate(partitionMap, partitionColumns, writeRecords, 10); + runMRCreate(partitionMap, partitionColumns, writeRecords, 10,true); //Read should get 10 + 20 + 10 + 10 + 20 rows runMRRead(70); Modified: incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java?rev=1149763&r1=1149762&r2=1149763&view=diff ============================================================================== --- incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java (original) +++ incubator/hcatalog/trunk/src/test/org/apache/hcatalog/pig/TestHCatStorer.java Fri Jul 22 23:38:07 2011 @@ -585,4 +585,122 @@ public class TestHCatStorer extends Test assertFalse(itr.hasNext()); } + + + public void testDynamicPartitioningMultiPartColsInDataPartialSpec() throws IOException, CommandNeedRetryException{ + + driver.run("drop table if exists employee"); + String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " + + " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE " + + "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," + + "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') "; + + int retCode = driver.run(createTable).getResponseCode(); + if(retCode != 0) { + throw new IOException("Failed to create table."); + } + + MiniCluster.deleteFile(cluster, fullFileName); + String[] inputData = {"111237\tKrishna\t01/01/1990\tM\tIN\tTN", + "111238\tKalpana\t01/01/2000\tF\tIN\tKA", + "111239\tSatya\t01/01/2001\tM\tIN\tKL", + "111240\tKavya\t01/01/2002\tF\tIN\tAP"}; + + MiniCluster.createInputFile(cluster, fullFileName, inputData); + PigServer pig = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + pig.setBatchOn(); + pig.registerQuery("A = LOAD '"+fullFileName+"' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," + + "emp_gender:chararray,emp_country:chararray,emp_state:chararray);"); + pig.registerQuery("IN = FILTER A BY emp_country == 'IN';"); + pig.registerQuery("STORE IN INTO 'employee' USING "+HCatStorer.class.getName()+"('emp_country=IN');"); + pig.executeBatch(); + driver.run("select * from employee"); + ArrayList<String> results = new ArrayList<String>(); + driver.getResults(results); + assertEquals(4, results.size()); + Collections.sort(results); + assertEquals(inputData[0], results.get(0)); + assertEquals(inputData[1], results.get(1)); + assertEquals(inputData[2], results.get(2)); + assertEquals(inputData[3], results.get(3)); + MiniCluster.deleteFile(cluster, fullFileName); + driver.run("drop table employee"); + } + + public void testDynamicPartitioningMultiPartColsInDataNoSpec() throws IOException, CommandNeedRetryException{ + + driver.run("drop table if exists employee"); + String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " + + " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE " + + "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," + + "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') "; + + int retCode = driver.run(createTable).getResponseCode(); + if(retCode != 0) { + throw new IOException("Failed to create table."); + } + + MiniCluster.deleteFile(cluster, fullFileName); + String[] inputData = {"111237\tKrishna\t01/01/1990\tM\tIN\tTN", + "111238\tKalpana\t01/01/2000\tF\tIN\tKA", + "111239\tSatya\t01/01/2001\tM\tIN\tKL", + "111240\tKavya\t01/01/2002\tF\tIN\tAP"}; + + MiniCluster.createInputFile(cluster, fullFileName, inputData); + PigServer pig = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + pig.setBatchOn(); + pig.registerQuery("A = LOAD '"+fullFileName+"' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," + + "emp_gender:chararray,emp_country:chararray,emp_state:chararray);"); + pig.registerQuery("IN = FILTER A BY emp_country == 'IN';"); + pig.registerQuery("STORE IN INTO 'employee' USING "+HCatStorer.class.getName()+"();"); + pig.executeBatch(); + driver.run("select * from employee"); + ArrayList<String> results = new ArrayList<String>(); + driver.getResults(results); + assertEquals(4, results.size()); + Collections.sort(results); + assertEquals(inputData[0], results.get(0)); + assertEquals(inputData[1], results.get(1)); + assertEquals(inputData[2], results.get(2)); + assertEquals(inputData[3], results.get(3)); + MiniCluster.deleteFile(cluster, fullFileName); + driver.run("drop table employee"); + } + + public void testDynamicPartitioningMultiPartColsNoDataInDataNoSpec() throws IOException, CommandNeedRetryException{ + + driver.run("drop table if exists employee"); + String createTable = "CREATE TABLE employee (emp_id INT, emp_name STRING, emp_start_date STRING , emp_gender STRING ) " + + " PARTITIONED BY (emp_country STRING , emp_state STRING ) STORED AS RCFILE " + + "tblproperties('"+HCatConstants.HCAT_ISD_CLASS+"'='"+RCFileInputDriver.class.getName()+"'," + + "'"+HCatConstants.HCAT_OSD_CLASS+"'='"+RCFileOutputDriver.class.getName()+"') "; + + int retCode = driver.run(createTable).getResponseCode(); + if(retCode != 0) { + throw new IOException("Failed to create table."); + } + + MiniCluster.deleteFile(cluster, fullFileName); + String[] inputData = {}; + + MiniCluster.createInputFile(cluster, fullFileName, inputData); + PigServer pig = new PigServer(ExecType.LOCAL, props); + UDFContext.getUDFContext().setClientSystemProps(); + pig.setBatchOn(); + pig.registerQuery("A = LOAD '"+fullFileName+"' USING PigStorage() AS (emp_id:int,emp_name:chararray,emp_start_date:chararray," + + "emp_gender:chararray,emp_country:chararray,emp_state:chararray);"); + pig.registerQuery("IN = FILTER A BY emp_country == 'IN';"); + pig.registerQuery("STORE IN INTO 'employee' USING "+HCatStorer.class.getName()+"();"); + pig.executeBatch(); + driver.run("select * from employee"); + ArrayList<String> results = new ArrayList<String>(); + driver.getResults(results); + assertEquals(0, results.size()); + MiniCluster.deleteFile(cluster, fullFileName); + driver.run("drop table employee"); + } + + }
