Author: khorgath Date: Fri Feb 28 23:04:36 2014 New Revision: 1573107 URL: http://svn.apache.org/r1573107 Log: HIVE-6475 : Implement support for appending to mutable tables in HCatalog (Sushanth Sowmyan, reviewed by Daniel Dai)
Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java Removed: hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalHCatNonPartitioned.java Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerWrapper.java Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java?rev=1573107&r1=1573106&r2=1573107&view=diff ============================================================================== --- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java (original) +++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java Fri Feb 28 23:04:36 2014 @@ -119,7 +119,9 @@ public final class HCatConstants { public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY = "hcat.msgbus.topic.naming.policy"; public static final String HCAT_MSGBUS_TOPIC_PREFIX = "hcat.msgbus.topic.prefix"; - public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE + "dynamic.jobid"; + public static final String HCAT_OUTPUT_ID_HASH = HCAT_KEY_OUTPUT_BASE + ".id"; + + public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE + ".dynamic.jobid"; public static final boolean HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED = false; public static final String HCAT_DYNAMIC_CUSTOM_PATTERN = "hcat.dynamic.partitioning.custom.pattern"; Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1573107&r1=1573106&r2=1573107&view=diff ============================================================================== --- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java (original) +++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java Fri Feb 28 23:04:36 2014 @@ -35,14 +35,16 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.InvalidOperationException; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapred.JobConf; @@ -69,8 +71,10 @@ class FileOutputCommitterContainer exten private static final String TEMP_DIR_NAME = "_temporary"; private static final String LOGS_DIR_NAME = "_logs"; - /** The directory under which data is initially written for a partitioned table */ + static final String DYNTEMP_DIR_NAME = "_DYN"; + static final String SCRATCH_DIR_NAME = "_SCRATCH"; + private static final String APPEND_SUFFIX = "_a_"; private static final Logger LOG = LoggerFactory.getLogger(FileOutputCommitterContainer.class); private final boolean dynamicPartitioningUsed; @@ -174,6 +178,7 @@ class FileOutputCommitterContainer exten } Path src; OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext); + Path tblPath = new Path(jobInfo.getTableInfo().getTableLocation()); if (dynamicPartitioningUsed) { if (!customDynamicLocationUsed) { src = new Path(getPartitionRootLocation(jobInfo.getLocation(), jobInfo.getTableInfo().getTable() @@ -191,7 +196,9 @@ class FileOutputCommitterContainer exten // directory containing open files. So on Windows, we will leave output directory // behind when job fail. User needs to remove the output directory manually LOG.info("Job failed. Try cleaning up temporary directory [{}].", src); - fs.delete(src, true); + if (!src.equals(tblPath)){ + fs.delete(src, true); + } } finally { cancelDelegationTokens(jobContext); } @@ -332,13 +339,17 @@ class FileOutputCommitterContainer exten } else if (!dynamicPartitioningUsed && Boolean.valueOf((String)table.getProperty("EXTERNAL")) && jobInfo.getLocation() != null && jobInfo.getLocation().length() > 0) { - // honor external table that specifies the location - partPath = new Path(jobInfo.getLocation()); + // Now, we need to de-scratchify this location - i.e., get rid of any + // _SCRATCH[\d].?[\d]+ from the location. + String jobLocation = jobInfo.getLocation(); + String finalLocn = jobLocation.replaceAll(Path.SEPARATOR + SCRATCH_DIR_NAME + "\\d\\.?\\d+",""); + partPath = new Path(finalLocn); } else { partPath = new Path(partLocnRoot); int i = 0; for (FieldSchema partKey : table.getPartitionKeys()) { if (i++ != 0) { + fs.mkdirs(partPath); // Attempt to make the path in case it does not exist before we check applyGroupAndPerms(fs, partPath, perms, grpName, false); } partPath = constructPartialPartPath(partPath, partKey.getName().toLowerCase(), partKVs); @@ -347,6 +358,7 @@ class FileOutputCommitterContainer exten // Apply the group and permissions to the leaf partition and files. // Need not bother in case of HDFS as permission is taken care of by setting UMask + fs.mkdirs(partPath); // Attempt to make the path in case it does not exist before we check if (!ShimLoader.getHadoopShims().getHCatShim().isFileInHDFS(fs, partPath)) { applyGroupAndPerms(fs, partPath, perms, grpName, true); } @@ -370,6 +382,11 @@ class FileOutputCommitterContainer exten private void applyGroupAndPerms(FileSystem fs, Path dir, FsPermission permission, String group, boolean recursive) throws IOException { + if(LOG.isDebugEnabled()) { + LOG.debug("applyGroupAndPerms : " + dir + + " perms: " + permission + + " group: " + group + " recursive: " + recursive); + } fs.setPermission(dir, permission); if (recursive) { for (FileStatus fileStatus : fs.listStatus(dir)) { @@ -457,24 +474,38 @@ class FileOutputCommitterContainer exten * on whether other files exist where we're trying to copy * @throws java.io.IOException */ - private void moveTaskOutputs(FileSystem fs, - Path file, - Path srcDir, - Path destDir, final boolean dryRun) throws IOException { + private void moveTaskOutputs(FileSystem fs, Path file, Path srcDir, + Path destDir, final boolean dryRun, boolean immutable + ) throws IOException { + if(LOG.isDebugEnabled()) { + LOG.debug("moveTaskOutputs " + + file + " from: " + srcDir + " to: " + destDir + + " dry: " + dryRun + " immutable: " + immutable); + } + + if (dynamicPartitioningUsed) { + immutable = true; // Making sure we treat dynamic partitioning jobs as if they were immutable. + } if (file.getName().equals(TEMP_DIR_NAME) || file.getName().equals(LOGS_DIR_NAME) || file.getName().equals(SUCCEEDED_FILE_NAME)) { return; } - final Path finalOutputPath = getFinalPath(file, srcDir, destDir); + + final Path finalOutputPath = getFinalPath(fs, file, srcDir, destDir, immutable); + if (fs.isFile(file)) { if (dryRun){ - if(LOG.isDebugEnabled()) { - LOG.debug("Testing if moving file: [" + file + "] to [" - + finalOutputPath + "] would cause a problem"); - } - if (fs.exists(finalOutputPath)) { - throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath - + ", duplicate publish not possible."); + if (immutable){ + // Dryrun checks are meaningless for mutable table - we should always succeed + // unless there is a runtime IOException. + if(LOG.isDebugEnabled()) { + LOG.debug("Testing if moving file: [" + file + "] to [" + + finalOutputPath + "] would cause a problem"); + } + if (fs.exists(finalOutputPath)) { + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + + finalOutputPath + ", duplicate publish not possible."); + } } } else { if(LOG.isDebugEnabled()) { @@ -493,12 +524,15 @@ class FileOutputCommitterContainer exten } } } else if(fs.getFileStatus(file).isDir()) { + FileStatus[] children = fs.listStatus(file); FileStatus firstChild = null; if (children != null) { int index=0; while (index < children.length) { - if (!children[index].getPath().getName().equals(TEMP_DIR_NAME) && !children[index].getPath().getName().equals(LOGS_DIR_NAME) && !children[index].getPath().getName().equals(SUCCEEDED_FILE_NAME)) { + if ( !children[index].getPath().getName().equals(TEMP_DIR_NAME) + && !children[index].getPath().getName().equals(LOGS_DIR_NAME) + && !children[index].getPath().getName().equals(SUCCEEDED_FILE_NAME)) { firstChild = children[index]; break; } @@ -509,14 +543,17 @@ class FileOutputCommitterContainer exten // If the first child is directory, then rest would be directory too according to HCatalog dir structure // recurse in that case for (FileStatus child : children) { - moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun); + moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun, immutable); } } else { if (!dryRun) { if (dynamicPartitioningUsed) { + // Optimization: if the first child is file, we have reached the leaf directory, move the parent directory itself // instead of moving each file under the directory. See HCATALOG-538 + // Note for future Append implementation : This optimization is another reason dynamic + // partitioning is currently incompatible with append on mutable tables. final Path parentDir = finalOutputPath.getParent(); // Create the directory @@ -539,16 +576,21 @@ class FileOutputCommitterContainer exten } fs.delete(placeholder, false); } else { + // In case of no partition we have to move each file for (FileStatus child : children) { - moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun); + moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun, immutable); } + } + } else { - if(fs.exists(finalOutputPath)) { - throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already exists in " + finalOutputPath + if(immutable && fs.exists(finalOutputPath) && !MetaStoreUtils.isDirEmpty(fs, finalOutputPath)) { + + throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION, "Data already exists in " + finalOutputPath + ", duplicate publish not possible."); } + } } } else { @@ -560,15 +602,16 @@ class FileOutputCommitterContainer exten /** * Find the final name of a given output file, given the output directory - * and the work directory. + * and the work directory. If immutable, attempt to create file of name + * _aN till we find an item that does not exist. * @param file the file to move * @param src the source directory * @param dest the target directory * @return the final path for the specific output file * @throws java.io.IOException */ - private Path getFinalPath(Path file, Path src, - Path dest) throws IOException { + private Path getFinalPath(FileSystem fs, Path file, Path src, + Path dest, final boolean immutable) throws IOException { URI taskOutputUri = file.toUri(); URI relativePath = src.toUri().relativize(taskOutputUri); if (taskOutputUri == relativePath) { @@ -576,8 +619,43 @@ class FileOutputCommitterContainer exten src + " child = " + file); } if (relativePath.getPath().length() > 0) { - return new Path(dest, relativePath.getPath()); + + Path itemDest = new Path(dest, relativePath.getPath()); + if (!immutable){ + String name = relativePath.getPath(); + String filetype; + int index = name.lastIndexOf('.'); + if (index >= 0) { + filetype = name.substring(index); + name = name.substring(0, index); + } else { + filetype = ""; + } + + // Attempt to find COUNTER_MAX possible alternatives to a filename by + // appending _a_N and seeing if that destination also clashes. If we're + // still clashing after that, give up. + final int COUNTER_MAX = 1000; + int counter = 1; + for (; fs.exists(itemDest) && counter < COUNTER_MAX ; counter++) { + itemDest = new Path(dest, name + (APPEND_SUFFIX + counter) + filetype); + } + + if (counter == COUNTER_MAX){ + throw new HCatException(ErrorType.ERROR_MOVE_FAILED, + "Could not find a unique destination path for move: file = " + + file + " , src = " + src + ", dest = " + dest); + } + + } + + if (LOG.isDebugEnabled()){ + LOG.debug("FinalPath(file:"+file+":"+src+"->"+dest+"="+itemDest); + } + + return itemDest; } else { + return dest; } } @@ -671,8 +749,10 @@ class FileOutputCommitterContainer exten //Move data from temp directory the actual table directory //No metastore operation required. Path src = new Path(jobInfo.getLocation()); - moveTaskOutputs(fs, src, src, tblPath, false); - fs.delete(src, true); + moveTaskOutputs(fs, src, src, tblPath, false, table.isImmutable()); + if (!src.equals(tblPath)){ + fs.delete(src, true); + } return; } @@ -715,15 +795,38 @@ class FileOutputCommitterContainer exten ptnInfos.add(InternalUtil.createPtnKeyValueMap(new Table(tableInfo.getTable()), ptn)); } + /** + * Dynamic partitioning & Append incompatibility note: + * + * Currently, we do not support mixing dynamic partitioning and append in the + * same job. One reason is that we need exhaustive testing of corner cases + * for that, and a second reason is the behaviour of add_partitions. To support + * dynamic partitioning with append, we'd have to have a add_partitions_if_not_exist + * call, rather than an add_partitions call. Thus far, we've tried to keep the + * implementation of append jobtype-agnostic, but here, in code, we assume that + * a table is considered immutable if dynamic partitioning is enabled on the job. + * + * This does not mean that we can check before the job begins that this is going + * to be a dynamic partition job on an immutable table and thus fail the job, since + * it is quite possible to have a dynamic partitioning job run on an unpopulated + * immutable table. It simply means that at the end of the job, as far as copying + * in data is concerned, we will pretend that the table is immutable irrespective + * of what table.isImmutable() tells us. + */ + //Publish the new partition(s) if (dynamicPartitioningUsed && harProcessor.isEnabled() && (!partitionsToAdd.isEmpty())){ + if (!customDynamicLocationUsed) { Path src = new Path(ptnRootLocation); // check here for each dir we're copying out, to see if it - // already exists, error out if so - moveTaskOutputs(fs, src, src, tblPath, true); - moveTaskOutputs(fs, src, src, tblPath, false); - fs.delete(src, true); + // already exists, error out if so. + // Also, treat dyn-writes as writes to immutable tables. + moveTaskOutputs(fs, src, src, tblPath, true, true); // dryRun = true, immutable = true + moveTaskOutputs(fs, src, src, tblPath, false, true); + if (!src.equals(tblPath)){ + fs.delete(src, true); + } } else { moveCustomLocationTaskOutputs(fs, table, hiveConf); } @@ -744,21 +847,84 @@ class FileOutputCommitterContainer exten } }else{ + // no harProcessor, regular operation updateTableSchema(client, table, jobInfo.getOutputSchema()); LOG.info("HAR not is not being used. The table {} has new partitions {}.", table.getTableName(), ptnInfos); - if (dynamicPartitioningUsed && (partitionsToAdd.size()>0)){ - if (!customDynamicLocationUsed) { - Path src = new Path(ptnRootLocation); - moveTaskOutputs(fs, src, src, tblPath, true); - moveTaskOutputs(fs, src, src, tblPath, false); - fs.delete(src, true); + if (partitionsToAdd.size() > 0){ + if (!dynamicPartitioningUsed ) { + + // regular single-partition write into a partitioned table. + //Move data from temp directory the actual table directory + if (partitionsToAdd.size() > 1){ + throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, + "More than one partition to publish in non-dynamic partitioning job"); + } + Partition p = partitionsToAdd.get(0); + Path src = new Path(jobInfo.getLocation()); + Path dest = new Path(p.getSd().getLocation()); + moveTaskOutputs(fs, src, src, dest, true, table.isImmutable()); + moveTaskOutputs(fs,src,src,dest,false,table.isImmutable()); + if (!src.equals(dest)){ + fs.delete(src, true); + } + + // Now, we check if the partition already exists. If not, we go ahead. + // If so, we error out if immutable, and if mutable, check that the partition's IF + // matches our current job's IF (table's IF) to check for compatibility. If compatible, we + // ignore and do not add. If incompatible, we error out again. + + boolean publishRequired = false; + try { + Partition existingP = client.getPartition(p.getDbName(),p.getTableName(),p.getValues()); + if (existingP != null){ + if (table.isImmutable()){ + throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION, + "Attempted duplicate partition publish on to immutable table"); + } else { + if (! existingP.getSd().getInputFormat().equals(table.getInputFormatClass().getName())){ + throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION, + "Attempted partition append, where old partition format was " + + existingP.getSd().getInputFormat() + + " and table format was " + + table.getInputFormatClass().getName()); + } + } + } else { + publishRequired = true; + } + } catch (NoSuchObjectException e){ + // All good, no such partition exists, move on. + publishRequired = true; + } + if (publishRequired){ + client.add_partitions(partitionsToAdd); + partitionsAdded = partitionsToAdd; + } + } else { - moveCustomLocationTaskOutputs(fs, table, hiveConf); + // Dynamic partitioning usecase + if (!customDynamicLocationUsed) { + Path src = new Path(ptnRootLocation); + moveTaskOutputs(fs, src, src, tblPath, true, true); // dryRun = true, immutable = true + moveTaskOutputs(fs, src, src, tblPath, false, true); + if (!src.equals(tblPath)){ + fs.delete(src, true); + } + } else { + moveCustomLocationTaskOutputs(fs, table, hiveConf); + } + client.add_partitions(partitionsToAdd); + partitionsAdded = partitionsToAdd; } } - client.add_partitions(partitionsToAdd); - partitionsAdded = partitionsToAdd; + + // Set permissions appropriately for each of the partitions we just created + // so as to have their permissions mimic the table permissions + for (Partition p : partitionsAdded){ + applyGroupAndPerms(fs,new Path(p.getSd().getLocation()),tblStat.getPermission(),tblStat.getGroup(),true); + } + } } catch (Exception e) { if (partitionsAdded.size() > 0) { @@ -793,8 +959,8 @@ class FileOutputCommitterContainer exten for (Entry<String, Map<String, String>> entry : partitionsDiscoveredByPath.entrySet()) { Path src = new Path(entry.getKey()); Path destPath = new Path(getFinalDynamicPartitionDestination(table, entry.getValue(), jobInfo)); - moveTaskOutputs(fs, src, src, destPath, true); - moveTaskOutputs(fs, src, src, destPath, false); + moveTaskOutputs(fs, src, src, destPath, true, true); // dryRun = true, immutable = true + moveTaskOutputs(fs, src, src, destPath, false, true); } // delete the parent temp directory of all custom dynamic partitions Path parentPath = new Path(getCustomPartitionRootLocation(jobInfo, conf)); Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java?rev=1573107&r1=1573106&r2=1573107&view=diff ============================================================================== --- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java (original) +++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java Fri Feb 28 23:04:36 2014 @@ -149,7 +149,10 @@ class FileOutputFormatContainer extends } /** - * Handles duplicate publish of partition. Fails if partition already exists. + * Handles duplicate publish of partition or data into an unpartitioned table + * if the table is immutable + * + * For partitioned tables, fails if partition already exists. * For non partitioned tables, fails if files are present in table directory. * For dynamic partitioned publish, does nothing - check would need to be done at recordwriter time * @param context the job @@ -161,17 +164,21 @@ class FileOutputFormatContainer extends * @throws org.apache.thrift.TException */ private static void handleDuplicatePublish(JobContext context, OutputJobInfo outputInfo, - HiveMetaStoreClient client, Table table) throws IOException, MetaException, TException, NoSuchObjectException { + HiveMetaStoreClient client, Table table) + throws IOException, MetaException, TException, NoSuchObjectException { /* - * For fully specified ptn, follow strict checks for existence of partitions in metadata - * For unpartitioned tables, follow filechecks - * For partially specified tables: - * This would then need filechecks at the start of a ptn write, - * Doing metadata checks can get potentially very expensive (fat conf) if - * there are a large number of partitions that match the partial specifications - */ + * For fully specified ptn, follow strict checks for existence of partitions in metadata + * For unpartitioned tables, follow filechecks + * For partially specified tables: + * This would then need filechecks at the start of a ptn write, + * Doing metadata checks can get potentially very expensive (fat conf) if + * there are a large number of partitions that match the partial specifications + */ + if (!table.isImmutable()){ + return; + } if (table.getPartitionKeys().size() > 0) { if (!outputInfo.isDynamicPartitioningUsed()) { List<String> partitionValues = getPartitionValueList( @@ -181,6 +188,9 @@ class FileOutputFormatContainer extends outputInfo.getTableName(), partitionValues, (short) 1); if (currentParts.size() > 0) { + // If a table is partitioned and immutable, then the presence + // of the partition alone is enough to throw an error - we do + // not need to check for emptiness to decide to throw an error throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION); } } Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java?rev=1573107&r1=1573106&r2=1573107&view=diff ============================================================================== --- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java (original) +++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java Fri Feb 28 23:04:36 2014 @@ -111,6 +111,8 @@ public class FosterStorageHandler extend String parentPath = jobInfo.getTableInfo().getTableLocation(); String dynHash = tableDesc.getJobProperties().get( HCatConstants.HCAT_DYNAMIC_PTN_JOBID); + String idHash = tableDesc.getJobProperties().get( + HCatConstants.HCAT_OUTPUT_ID_HASH); // For dynamic partitioned writes without all keyvalues specified, // we create a temp dir for the associated write job @@ -122,6 +124,8 @@ public class FosterStorageHandler extend parentPath = new Path(parentPath, jobInfo.getCustomDynamicRoot()).toString(); } parentPath = new Path(parentPath, FileOutputCommitterContainer.DYNTEMP_DIR_NAME + dynHash).toString(); + } else { + parentPath = new Path(parentPath,FileOutputCommitterContainer.SCRATCH_DIR_NAME + idHash).toString(); } String outputLocation; @@ -139,8 +143,8 @@ public class FosterStorageHandler extend // honor custom location for external table apart from what metadata specifies outputLocation = jobInfo.getLocation(); } else if (dynHash == null && jobInfo.getPartitionValues().size() == 0) { - // For non-partitioned tables, we send them to the temp dir - outputLocation = TEMP_DIR_NAME; + // Unpartitioned table, writing to the scratch dir directly is good enough. + outputLocation = ""; } else { List<String> cols = new ArrayList<String>(); List<String> values = new ArrayList<String>(); @@ -156,11 +160,15 @@ public class FosterStorageHandler extend outputLocation = FileUtils.makePartName(cols, values); } - jobInfo.setLocation(new Path(parentPath, outputLocation).toString()); + if (outputLocation!= null && !outputLocation.isEmpty()){ + jobInfo.setLocation(new Path(parentPath, outputLocation).toString()); + } else { + jobInfo.setLocation(new Path(parentPath).toString()); + } //only set output dir if partition is fully materialized - if (jobInfo.getPartitionValues().size() - == jobInfo.getTableInfo().getPartitionColumns().size()) { + if (jobInfo.getPartitionValues().size() == + jobInfo.getTableInfo().getPartitionColumns().size()) { jobProperties.put("mapred.output.dir", jobInfo.getLocation()); } Modified: hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java?rev=1573107&r1=1573106&r2=1573107&view=diff ============================================================================== --- hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java (original) +++ hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java Fri Feb 28 23:04:36 2014 @@ -113,6 +113,14 @@ public class HCatOutputFormat extends HC throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a partition with sorted column definition from Pig/Mapreduce is not supported"); } + // Set up a common id hash for this job, so that when we create any temporary directory + // later on, it is guaranteed to be unique. + String idHash; + if ((idHash = conf.get(HCatConstants.HCAT_OUTPUT_ID_HASH)) == null) { + idHash = String.valueOf(Math.random()); + } + conf.set(HCatConstants.HCAT_OUTPUT_ID_HASH,idHash); + if (table.getTTable().getPartitionKeysSize() == 0) { if ((outputJobInfo.getPartitionValues() != null) && (!outputJobInfo.getPartitionValues().isEmpty())) { // attempt made to save partition values in non-partitioned table - throw error. @@ -153,9 +161,6 @@ public class HCatOutputFormat extends HC String dynHash; if ((dynHash = conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)) == null) { dynHash = String.valueOf(Math.random()); -// LOG.info("New dynHash : ["+dynHash+"]"); -// }else{ -// LOG.info("Old dynHash : ["+dynHash+"]"); } conf.set(HCatConstants.HCAT_DYNAMIC_PTN_JOBID, dynHash); Modified: hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1573107&r1=1573106&r2=1573107&view=diff ============================================================================== --- hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java (original) +++ hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java Fri Feb 28 23:04:36 2014 @@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; @@ -90,7 +91,11 @@ public abstract class HCatMapReduceTest return false; } - protected String inputFormat() { + protected boolean isTableImmutable() { + return true; + } + + protected String inputFormat() { return RCFileInputFormat.class.getName(); } @@ -177,6 +182,9 @@ public abstract class HCatMapReduceTest if (isTableExternal()) { tableParams.put("EXTERNAL", "TRUE"); } + if (isTableImmutable()){ + tableParams.put(hive_metastoreConstants.IS_IMMUTABLE,"true"); + } tbl.setParameters(tableParams); client.createTable(tbl); Modified: hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java?rev=1573107&r1=1573106&r2=1573107&view=diff ============================================================================== --- hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java (original) +++ hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java Fri Feb 28 23:04:36 2014 @@ -157,9 +157,11 @@ public class TestHCatDynamicPartitioned assertTrue(exc != null); assertTrue(exc instanceof HCatException); assertTrue("Got exception of type [" + ((HCatException) exc).getErrorType().toString() - + "] Expected ERROR_PUBLISHING_PARTITION or ERROR_MOVE_FAILED", + + "] Expected ERROR_PUBLISHING_PARTITION or ERROR_MOVE_FAILED " + + "or ERROR_DUPLICATE_PARTITION", (ErrorType.ERROR_PUBLISHING_PARTITION == ((HCatException) exc).getErrorType()) || (ErrorType.ERROR_MOVE_FAILED == ((HCatException) exc).getErrorType()) + || (ErrorType.ERROR_DUPLICATE_PARTITION == ((HCatException) exc).getErrorType()) ); } Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java?rev=1573107&view=auto ============================================================================== --- hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java (added) +++ hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java Fri Feb 28 23:04:36 2014 @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce; + +public class TestHCatExternalNonPartitioned extends TestHCatNonPartitioned { + + @Override + protected Boolean isTableExternal() { + return true; + } + +} Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java?rev=1573107&view=auto ============================================================================== --- hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java (added) +++ hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java Fri Feb 28 23:04:36 2014 @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce; + +public class TestHCatMutableDynamicPartitioned extends TestHCatDynamicPartitioned { + + @Override + protected boolean isTableImmutable() { + return false; + } + +} Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java?rev=1573107&view=auto ============================================================================== --- hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java (added) +++ hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java Fri Feb 28 23:04:36 2014 @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce; + +public class TestHCatMutableNonPartitioned extends TestHCatNonPartitioned { + + + @Override + protected boolean isTableImmutable() { + return false; + } + +} Added: hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java?rev=1573107&view=auto ============================================================================== --- hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java (added) +++ hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java Fri Feb 28 23:04:36 2014 @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hive.hcatalog.mapreduce; + +public class TestHCatMutablePartitioned extends TestHCatPartitioned { + + @Override + protected boolean isTableImmutable() { + return false; + } + +} Modified: hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java?rev=1573107&r1=1573106&r2=1573107&view=diff ============================================================================== --- hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java (original) +++ hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java Fri Feb 28 23:04:36 2014 @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.data.DefaultHCatRecord; @@ -38,6 +39,8 @@ import org.junit.Test; import static junit.framework.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; public class TestHCatNonPartitioned extends HCatMapReduceTest { @@ -87,17 +90,21 @@ public class TestHCatNonPartitioned exte Map<String, String> partitionMap = new HashMap<String, String>(); runMRCreate(null, partitionColumns, writeRecords, 10, true); - //Test for duplicate publish + //Test for duplicate publish -- this will either fail on job creation time + // and throw an exception, or will fail at runtime, and fail the job. + IOException exc = null; try { - runMRCreate(null, partitionColumns, writeRecords, 20, true); + Job j = runMRCreate(null, partitionColumns, writeRecords, 20, true); + assertEquals(!isTableImmutable(),j.isSuccessful()); } catch (IOException e) { exc = e; + assertTrue(exc instanceof HCatException); + assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException) exc).getErrorType()); + } + if (!isTableImmutable()){ + assertNull(exc); } - - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException) exc).getErrorType()); //Test for publish with invalid partition key name exc = null; @@ -105,17 +112,21 @@ public class TestHCatNonPartitioned exte partitionMap.put("px", "p1value2"); try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true); + Job j = runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true); + assertFalse(j.isSuccessful()); } catch (IOException e) { exc = e; + assertTrue(exc != null); + assertTrue(exc instanceof HCatException); + assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType()); } - assertTrue(exc != null); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) exc).getErrorType()); - - //Read should get 10 rows - runMRRead(10); + //Read should get 10 rows if immutable, 30 if mutable + if (isTableImmutable()){ + runMRRead(10); + } else { + runMRRead(30); + } hiveReadTest(); } @@ -132,6 +143,10 @@ public class TestHCatNonPartitioned exte ArrayList<String> res = new ArrayList<String>(); driver.getResults(res); - assertEquals(10, res.size()); + if (isTableImmutable()){ + assertEquals(10, res.size()); + }else { + assertEquals(30, res.size()); + } } } Modified: hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java?rev=1573107&r1=1573106&r2=1573107&view=diff ============================================================================== --- hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java (original) +++ hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java Fri Feb 28 23:04:36 2014 @@ -40,8 +40,11 @@ import org.apache.hadoop.hive.metastore. import org.apache.hadoop.hive.ql.io.RCFileInputFormat; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.shims.ShimLoader; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,7 +157,13 @@ public class TestHCatOutputFormat extend } public void publishTest(Job job) throws Exception { - OutputCommitter committer = new FileOutputCommitterContainer(job, null); + HCatOutputFormat hcof = new HCatOutputFormat(); + TaskAttemptContext tac = ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext( + job.getConfiguration(), ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID()); + OutputCommitter committer = hcof.getOutputCommitter(tac); + committer.setupJob(job); + committer.setupTask(tac); + committer.commitTask(tac); committer.commitJob(job); Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1")); Modified: hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java?rev=1573107&r1=1573106&r2=1573107&view=diff ============================================================================== --- hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java (original) +++ hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java Fri Feb 28 23:04:36 2014 @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.common.ErrorType; import org.apache.hive.hcatalog.common.HCatException; import org.apache.hive.hcatalog.data.DefaultHCatRecord; @@ -39,7 +40,9 @@ import org.junit.Test; import static junit.framework.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; public class TestHCatPartitioned extends HCatMapReduceTest { @@ -99,17 +102,21 @@ public class TestHCatPartitioned extends runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true); - //Test for duplicate publish + //Test for duplicate publish -- this will either fail on job creation time + // and throw an exception, or will fail at runtime, and fail the job. + IOException exc = null; try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true); + Job j = runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true); + assertEquals(!isTableImmutable(),j.isSuccessful()); } catch (IOException e) { exc = e; + assertTrue(exc instanceof HCatException); + assertTrue(ErrorType.ERROR_DUPLICATE_PARTITION.equals(((HCatException) exc).getErrorType())); + } + if (!isTableImmutable()){ + assertNull(exc); } - - assertNotNull(exc); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_DUPLICATE_PARTITION, ((HCatException) exc).getErrorType()); //Test for publish with invalid partition key name exc = null; @@ -118,15 +125,15 @@ public class TestHCatPartitioned extends partitionMap.put("px0", "p0value2"); try { - runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true); + Job j = runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true); + assertFalse(j.isSuccessful()); } catch (IOException e) { exc = e; + assertNotNull(exc); + assertTrue(exc instanceof HCatException); + assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException) exc).getErrorType()); } - assertNotNull(exc); - assertTrue(exc instanceof HCatException); - assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException) exc).getErrorType()); - //Test for publish with missing partition key values exc = null; partitionMap.clear(); @@ -156,16 +163,27 @@ public class TestHCatPartitioned extends // assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) exc).getErrorType()); // With Dynamic partitioning, this isn't an error that the keyValues specified didn't values - //Read should get 10 + 20 rows - runMRRead(30); + //Read should get 10 + 20 rows if immutable, 50 (10+20+20) if mutable + if (isTableImmutable()){ + runMRRead(30); + } else { + runMRRead(50); + } //Read with partition filter runMRRead(10, "part1 = \"p1value1\""); - runMRRead(20, "part1 = \"p1value2\""); - runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\""); runMRRead(10, "part0 = \"p0value1\""); - runMRRead(20, "part0 = \"p0value2\""); - runMRRead(30, "part0 = \"p0value1\" or part0 = \"p0value2\""); + if (isTableImmutable()){ + runMRRead(20, "part1 = \"p1value2\""); + runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\""); + runMRRead(20, "part0 = \"p0value2\""); + runMRRead(30, "part0 = \"p0value1\" or part0 = \"p0value2\""); + } else { + runMRRead(40, "part1 = \"p1value2\""); + runMRRead(50, "part1 = \"p1value1\" or part1 = \"p1value2\""); + runMRRead(40, "part0 = \"p0value2\""); + runMRRead(50, "part0 = \"p0value1\" or part0 = \"p0value2\""); + } tableSchemaTest(); columnOrderChangeTest(); @@ -331,8 +349,12 @@ public class TestHCatPartitioned extends runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true); - //Read should get 10 + 20 + 10 + 10 + 20 rows - runMRRead(70); + if (isTableImmutable()){ + //Read should get 10 + 20 + 10 + 10 + 20 rows + runMRRead(70); + } else { + runMRRead(90); // +20 from the duplicate publish + } } //Test that data inserted through hcatoutputformat is readable from hive @@ -347,6 +369,12 @@ public class TestHCatPartitioned extends ArrayList<String> res = new ArrayList<String>(); driver.getResults(res); - assertEquals(70, res.size()); + if (isTableImmutable()){ + //Read should get 10 + 20 + 10 + 10 + 20 rows + assertEquals(70, res.size()); + } else { + assertEquals(90, res.size()); // +20 from the duplicate publish + } + } } Modified: hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerWrapper.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerWrapper.java?rev=1573107&r1=1573106&r2=1573107&view=diff ============================================================================== --- hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerWrapper.java (original) +++ hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerWrapper.java Fri Feb 28 23:04:36 2014 @@ -73,7 +73,18 @@ public class TestHCatStorerWrapper exten server.executeBatch(); Assert.assertTrue(tmpExternalDir.exists()); - Assert.assertTrue(new File(tmpExternalDir.getPath().replaceAll("\\\\", "/") + "/" + "part-m-00000").exists()); + + boolean found = false; + File[] f = tmpExternalDir.listFiles(); + if (f != null) { + for (File fin : f){ + if (fin.getPath().contains("part-m-00000")){ + found = true; + } + } + } + + Assert.assertTrue(found); driver.run("select * from junit_external"); ArrayList<String> res = new ArrayList<String>();