Author: khorgath
Date: Fri Feb 28 23:04:36 2014
New Revision: 1573107
URL: http://svn.apache.org/r1573107
Log:
HIVE-6475 : Implement support for appending to mutable tables in HCatalog
(Sushanth Sowmyan, reviewed by Daniel Dai)
Added:
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java
Removed:
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalHCatNonPartitioned.java
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java
hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerWrapper.java
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
Fri Feb 28 23:04:36 2014
@@ -119,7 +119,9 @@ public final class HCatConstants {
public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY =
"hcat.msgbus.topic.naming.policy";
public static final String HCAT_MSGBUS_TOPIC_PREFIX =
"hcat.msgbus.topic.prefix";
- public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE +
"dynamic.jobid";
+ public static final String HCAT_OUTPUT_ID_HASH = HCAT_KEY_OUTPUT_BASE +
".id";
+
+ public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE +
".dynamic.jobid";
public static final boolean HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED = false;
public static final String HCAT_DYNAMIC_CUSTOM_PATTERN =
"hcat.dynamic.partitioning.custom.pattern";
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
Fri Feb 28 23:04:36 2014
@@ -35,14 +35,16 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapred.JobConf;
@@ -69,8 +71,10 @@ class FileOutputCommitterContainer exten
private static final String TEMP_DIR_NAME = "_temporary";
private static final String LOGS_DIR_NAME = "_logs";
- /** The directory under which data is initially written for a partitioned
table */
+
static final String DYNTEMP_DIR_NAME = "_DYN";
+ static final String SCRATCH_DIR_NAME = "_SCRATCH";
+ private static final String APPEND_SUFFIX = "_a_";
private static final Logger LOG =
LoggerFactory.getLogger(FileOutputCommitterContainer.class);
private final boolean dynamicPartitioningUsed;
@@ -174,6 +178,7 @@ class FileOutputCommitterContainer exten
}
Path src;
OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+ Path tblPath = new Path(jobInfo.getTableInfo().getTableLocation());
if (dynamicPartitioningUsed) {
if (!customDynamicLocationUsed) {
src = new Path(getPartitionRootLocation(jobInfo.getLocation(),
jobInfo.getTableInfo().getTable()
@@ -191,7 +196,9 @@ class FileOutputCommitterContainer exten
// directory containing open files. So on Windows, we will leave output
directory
// behind when job fail. User needs to remove the output directory
manually
LOG.info("Job failed. Try cleaning up temporary directory [{}].", src);
- fs.delete(src, true);
+ if (!src.equals(tblPath)){
+ fs.delete(src, true);
+ }
} finally {
cancelDelegationTokens(jobContext);
}
@@ -332,13 +339,17 @@ class FileOutputCommitterContainer exten
} else if (!dynamicPartitioningUsed
&& Boolean.valueOf((String)table.getProperty("EXTERNAL"))
&& jobInfo.getLocation() != null && jobInfo.getLocation().length() >
0) {
- // honor external table that specifies the location
- partPath = new Path(jobInfo.getLocation());
+ // Now, we need to de-scratchify this location - i.e., get rid of any
+ // _SCRATCH[\d].?[\d]+ from the location.
+ String jobLocation = jobInfo.getLocation();
+ String finalLocn = jobLocation.replaceAll(Path.SEPARATOR +
SCRATCH_DIR_NAME + "\\d\\.?\\d+","");
+ partPath = new Path(finalLocn);
} else {
partPath = new Path(partLocnRoot);
int i = 0;
for (FieldSchema partKey : table.getPartitionKeys()) {
if (i++ != 0) {
+ fs.mkdirs(partPath); // Attempt to make the path in case it does not
exist before we check
applyGroupAndPerms(fs, partPath, perms, grpName, false);
}
partPath = constructPartialPartPath(partPath,
partKey.getName().toLowerCase(), partKVs);
@@ -347,6 +358,7 @@ class FileOutputCommitterContainer exten
// Apply the group and permissions to the leaf partition and files.
// Need not bother in case of HDFS as permission is taken care of by
setting UMask
+ fs.mkdirs(partPath); // Attempt to make the path in case it does not exist
before we check
if (!ShimLoader.getHadoopShims().getHCatShim().isFileInHDFS(fs, partPath))
{
applyGroupAndPerms(fs, partPath, perms, grpName, true);
}
@@ -370,6 +382,11 @@ class FileOutputCommitterContainer exten
private void applyGroupAndPerms(FileSystem fs, Path dir, FsPermission
permission,
String group, boolean recursive)
throws IOException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("applyGroupAndPerms : " + dir +
+ " perms: " + permission +
+ " group: " + group + " recursive: " + recursive);
+ }
fs.setPermission(dir, permission);
if (recursive) {
for (FileStatus fileStatus : fs.listStatus(dir)) {
@@ -457,24 +474,38 @@ class FileOutputCommitterContainer exten
* on whether other files exist where we're trying to copy
* @throws java.io.IOException
*/
- private void moveTaskOutputs(FileSystem fs,
- Path file,
- Path srcDir,
- Path destDir, final boolean dryRun) throws IOException {
+ private void moveTaskOutputs(FileSystem fs, Path file, Path srcDir,
+ Path destDir, final boolean dryRun, boolean immutable
+ ) throws IOException {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("moveTaskOutputs "
+ + file + " from: " + srcDir + " to: " + destDir
+ + " dry: " + dryRun + " immutable: " + immutable);
+ }
+
+ if (dynamicPartitioningUsed) {
+ immutable = true; // Making sure we treat dynamic partitioning jobs as
if they were immutable.
+ }
if (file.getName().equals(TEMP_DIR_NAME) ||
file.getName().equals(LOGS_DIR_NAME) ||
file.getName().equals(SUCCEEDED_FILE_NAME)) {
return;
}
- final Path finalOutputPath = getFinalPath(file, srcDir, destDir);
+
+ final Path finalOutputPath = getFinalPath(fs, file, srcDir, destDir,
immutable);
+
if (fs.isFile(file)) {
if (dryRun){
- if(LOG.isDebugEnabled()) {
- LOG.debug("Testing if moving file: [" + file + "] to ["
- + finalOutputPath + "] would cause a problem");
- }
- if (fs.exists(finalOutputPath)) {
- throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already
exists in " + finalOutputPath
- + ", duplicate publish not possible.");
+ if (immutable){
+ // Dryrun checks are meaningless for mutable table - we should
always succeed
+ // unless there is a runtime IOException.
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Testing if moving file: [" + file + "] to ["
+ + finalOutputPath + "] would cause a problem");
+ }
+ if (fs.exists(finalOutputPath)) {
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already
exists in "
+ + finalOutputPath + ", duplicate publish not possible.");
+ }
}
} else {
if(LOG.isDebugEnabled()) {
@@ -493,12 +524,15 @@ class FileOutputCommitterContainer exten
}
}
} else if(fs.getFileStatus(file).isDir()) {
+
FileStatus[] children = fs.listStatus(file);
FileStatus firstChild = null;
if (children != null) {
int index=0;
while (index < children.length) {
- if (!children[index].getPath().getName().equals(TEMP_DIR_NAME) &&
!children[index].getPath().getName().equals(LOGS_DIR_NAME) &&
!children[index].getPath().getName().equals(SUCCEEDED_FILE_NAME)) {
+ if ( !children[index].getPath().getName().equals(TEMP_DIR_NAME)
+ && !children[index].getPath().getName().equals(LOGS_DIR_NAME)
+ &&
!children[index].getPath().getName().equals(SUCCEEDED_FILE_NAME)) {
firstChild = children[index];
break;
}
@@ -509,14 +543,17 @@ class FileOutputCommitterContainer exten
// If the first child is directory, then rest would be directory too
according to HCatalog dir structure
// recurse in that case
for (FileStatus child : children) {
- moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun);
+ moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun,
immutable);
}
} else {
if (!dryRun) {
if (dynamicPartitioningUsed) {
+
// Optimization: if the first child is file, we have reached the
leaf directory, move the parent directory itself
// instead of moving each file under the directory. See
HCATALOG-538
+ // Note for future Append implementation : This optimization is
another reason dynamic
+ // partitioning is currently incompatible with append on mutable
tables.
final Path parentDir = finalOutputPath.getParent();
// Create the directory
@@ -539,16 +576,21 @@ class FileOutputCommitterContainer exten
}
fs.delete(placeholder, false);
} else {
+
// In case of no partition we have to move each file
for (FileStatus child : children) {
- moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun);
+ moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun,
immutable);
}
+
}
+
} else {
- if(fs.exists(finalOutputPath)) {
- throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already
exists in " + finalOutputPath
+ if(immutable && fs.exists(finalOutputPath) &&
!MetaStoreUtils.isDirEmpty(fs, finalOutputPath)) {
+
+ throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION, "Data
already exists in " + finalOutputPath
+ ", duplicate publish not possible.");
}
+
}
}
} else {
@@ -560,15 +602,16 @@ class FileOutputCommitterContainer exten
/**
* Find the final name of a given output file, given the output directory
- * and the work directory.
+ * and the work directory. If immutable, attempt to create file of name
+ * _aN till we find an item that does not exist.
* @param file the file to move
* @param src the source directory
* @param dest the target directory
* @return the final path for the specific output file
* @throws java.io.IOException
*/
- private Path getFinalPath(Path file, Path src,
- Path dest) throws IOException {
+ private Path getFinalPath(FileSystem fs, Path file, Path src,
+ Path dest, final boolean immutable) throws IOException {
URI taskOutputUri = file.toUri();
URI relativePath = src.toUri().relativize(taskOutputUri);
if (taskOutputUri == relativePath) {
@@ -576,8 +619,43 @@ class FileOutputCommitterContainer exten
src + " child = " + file);
}
if (relativePath.getPath().length() > 0) {
- return new Path(dest, relativePath.getPath());
+
+ Path itemDest = new Path(dest, relativePath.getPath());
+ if (!immutable){
+ String name = relativePath.getPath();
+ String filetype;
+ int index = name.lastIndexOf('.');
+ if (index >= 0) {
+ filetype = name.substring(index);
+ name = name.substring(0, index);
+ } else {
+ filetype = "";
+ }
+
+ // Attempt to find COUNTER_MAX possible alternatives to a filename by
+ // appending _a_N and seeing if that destination also clashes. If we're
+ // still clashing after that, give up.
+ final int COUNTER_MAX = 1000;
+ int counter = 1;
+ for (; fs.exists(itemDest) && counter < COUNTER_MAX ; counter++) {
+ itemDest = new Path(dest, name + (APPEND_SUFFIX + counter) +
filetype);
+ }
+
+ if (counter == COUNTER_MAX){
+ throw new HCatException(ErrorType.ERROR_MOVE_FAILED,
+ "Could not find a unique destination path for move: file = "
+ + file + " , src = " + src + ", dest = " + dest);
+ }
+
+ }
+
+ if (LOG.isDebugEnabled()){
+ LOG.debug("FinalPath(file:"+file+":"+src+"->"+dest+"="+itemDest);
+ }
+
+ return itemDest;
} else {
+
return dest;
}
}
@@ -671,8 +749,10 @@ class FileOutputCommitterContainer exten
//Move data from temp directory the actual table directory
//No metastore operation required.
Path src = new Path(jobInfo.getLocation());
- moveTaskOutputs(fs, src, src, tblPath, false);
- fs.delete(src, true);
+ moveTaskOutputs(fs, src, src, tblPath, false, table.isImmutable());
+ if (!src.equals(tblPath)){
+ fs.delete(src, true);
+ }
return;
}
@@ -715,15 +795,38 @@ class FileOutputCommitterContainer exten
ptnInfos.add(InternalUtil.createPtnKeyValueMap(new
Table(tableInfo.getTable()), ptn));
}
+ /**
+ * Dynamic partitioning & Append incompatibility note:
+ *
+ * Currently, we do not support mixing dynamic partitioning and append
in the
+ * same job. One reason is that we need exhaustive testing of corner
cases
+ * for that, and a second reason is the behaviour of add_partitions. To
support
+ * dynamic partitioning with append, we'd have to have a
add_partitions_if_not_exist
+ * call, rather than an add_partitions call. Thus far, we've tried to
keep the
+ * implementation of append jobtype-agnostic, but here, in code, we
assume that
+ * a table is considered immutable if dynamic partitioning is enabled on
the job.
+ *
+ * This does not mean that we can check before the job begins that this
is going
+ * to be a dynamic partition job on an immutable table and thus fail the
job, since
+ * it is quite possible to have a dynamic partitioning job run on an
unpopulated
+ * immutable table. It simply means that at the end of the job, as far
as copying
+ * in data is concerned, we will pretend that the table is immutable
irrespective
+ * of what table.isImmutable() tells us.
+ */
+
//Publish the new partition(s)
if (dynamicPartitioningUsed && harProcessor.isEnabled() &&
(!partitionsToAdd.isEmpty())){
+
if (!customDynamicLocationUsed) {
Path src = new Path(ptnRootLocation);
// check here for each dir we're copying out, to see if it
- // already exists, error out if so
- moveTaskOutputs(fs, src, src, tblPath, true);
- moveTaskOutputs(fs, src, src, tblPath, false);
- fs.delete(src, true);
+ // already exists, error out if so.
+ // Also, treat dyn-writes as writes to immutable tables.
+ moveTaskOutputs(fs, src, src, tblPath, true, true); // dryRun =
true, immutable = true
+ moveTaskOutputs(fs, src, src, tblPath, false, true);
+ if (!src.equals(tblPath)){
+ fs.delete(src, true);
+ }
} else {
moveCustomLocationTaskOutputs(fs, table, hiveConf);
}
@@ -744,21 +847,84 @@ class FileOutputCommitterContainer exten
}
}else{
+
// no harProcessor, regular operation
updateTableSchema(client, table, jobInfo.getOutputSchema());
LOG.info("HAR not is not being used. The table {} has new partitions
{}.", table.getTableName(), ptnInfos);
- if (dynamicPartitioningUsed && (partitionsToAdd.size()>0)){
- if (!customDynamicLocationUsed) {
- Path src = new Path(ptnRootLocation);
- moveTaskOutputs(fs, src, src, tblPath, true);
- moveTaskOutputs(fs, src, src, tblPath, false);
- fs.delete(src, true);
+ if (partitionsToAdd.size() > 0){
+ if (!dynamicPartitioningUsed ) {
+
+ // regular single-partition write into a partitioned table.
+ //Move data from temp directory the actual table directory
+ if (partitionsToAdd.size() > 1){
+ throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION,
+ "More than one partition to publish in non-dynamic
partitioning job");
+ }
+ Partition p = partitionsToAdd.get(0);
+ Path src = new Path(jobInfo.getLocation());
+ Path dest = new Path(p.getSd().getLocation());
+ moveTaskOutputs(fs, src, src, dest, true, table.isImmutable());
+ moveTaskOutputs(fs,src,src,dest,false,table.isImmutable());
+ if (!src.equals(dest)){
+ fs.delete(src, true);
+ }
+
+ // Now, we check if the partition already exists. If not, we go
ahead.
+ // If so, we error out if immutable, and if mutable, check that
the partition's IF
+ // matches our current job's IF (table's IF) to check for
compatibility. If compatible, we
+ // ignore and do not add. If incompatible, we error out again.
+
+ boolean publishRequired = false;
+ try {
+ Partition existingP =
client.getPartition(p.getDbName(),p.getTableName(),p.getValues());
+ if (existingP != null){
+ if (table.isImmutable()){
+ throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION,
+ "Attempted duplicate partition publish on to immutable
table");
+ } else {
+ if (!
existingP.getSd().getInputFormat().equals(table.getInputFormatClass().getName())){
+ throw new
HCatException(ErrorType.ERROR_PUBLISHING_PARTITION,
+ "Attempted partition append, where old partition
format was "
+ + existingP.getSd().getInputFormat()
+ + " and table format was "
+ + table.getInputFormatClass().getName());
+ }
+ }
+ } else {
+ publishRequired = true;
+ }
+ } catch (NoSuchObjectException e){
+ // All good, no such partition exists, move on.
+ publishRequired = true;
+ }
+ if (publishRequired){
+ client.add_partitions(partitionsToAdd);
+ partitionsAdded = partitionsToAdd;
+ }
+
} else {
- moveCustomLocationTaskOutputs(fs, table, hiveConf);
+ // Dynamic partitioning usecase
+ if (!customDynamicLocationUsed) {
+ Path src = new Path(ptnRootLocation);
+ moveTaskOutputs(fs, src, src, tblPath, true, true); // dryRun =
true, immutable = true
+ moveTaskOutputs(fs, src, src, tblPath, false, true);
+ if (!src.equals(tblPath)){
+ fs.delete(src, true);
+ }
+ } else {
+ moveCustomLocationTaskOutputs(fs, table, hiveConf);
+ }
+ client.add_partitions(partitionsToAdd);
+ partitionsAdded = partitionsToAdd;
}
}
- client.add_partitions(partitionsToAdd);
- partitionsAdded = partitionsToAdd;
+
+ // Set permissions appropriately for each of the partitions we just
created
+ // so as to have their permissions mimic the table permissions
+ for (Partition p : partitionsAdded){
+ applyGroupAndPerms(fs,new
Path(p.getSd().getLocation()),tblStat.getPermission(),tblStat.getGroup(),true);
+ }
+
}
} catch (Exception e) {
if (partitionsAdded.size() > 0) {
@@ -793,8 +959,8 @@ class FileOutputCommitterContainer exten
for (Entry<String, Map<String, String>> entry :
partitionsDiscoveredByPath.entrySet()) {
Path src = new Path(entry.getKey());
Path destPath = new Path(getFinalDynamicPartitionDestination(table,
entry.getValue(), jobInfo));
- moveTaskOutputs(fs, src, src, destPath, true);
- moveTaskOutputs(fs, src, src, destPath, false);
+ moveTaskOutputs(fs, src, src, destPath, true, true); // dryRun = true,
immutable = true
+ moveTaskOutputs(fs, src, src, destPath, false, true);
}
// delete the parent temp directory of all custom dynamic partitions
Path parentPath = new Path(getCustomPartitionRootLocation(jobInfo, conf));
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java
Fri Feb 28 23:04:36 2014
@@ -149,7 +149,10 @@ class FileOutputFormatContainer extends
}
/**
- * Handles duplicate publish of partition. Fails if partition already exists.
+ * Handles duplicate publish of partition or data into an unpartitioned table
+ * if the table is immutable
+ *
+ * For partitioned tables, fails if partition already exists.
* For non partitioned tables, fails if files are present in table directory.
* For dynamic partitioned publish, does nothing - check would need to be
done at recordwriter time
* @param context the job
@@ -161,17 +164,21 @@ class FileOutputFormatContainer extends
* @throws org.apache.thrift.TException
*/
private static void handleDuplicatePublish(JobContext context, OutputJobInfo
outputInfo,
- HiveMetaStoreClient client, Table table) throws
IOException, MetaException, TException, NoSuchObjectException {
+ HiveMetaStoreClient client, Table table)
+ throws IOException, MetaException, TException, NoSuchObjectException {
/*
- * For fully specified ptn, follow strict checks for existence of
partitions in metadata
- * For unpartitioned tables, follow filechecks
- * For partially specified tables:
- * This would then need filechecks at the start of a ptn write,
- * Doing metadata checks can get potentially very expensive (fat conf) if
- * there are a large number of partitions that match the partial
specifications
- */
+ * For fully specified ptn, follow strict checks for existence of
partitions in metadata
+ * For unpartitioned tables, follow filechecks
+ * For partially specified tables:
+ * This would then need filechecks at the start of a ptn write,
+ * Doing metadata checks can get potentially very expensive (fat conf)
if
+ * there are a large number of partitions that match the partial
specifications
+ */
+ if (!table.isImmutable()){
+ return;
+ }
if (table.getPartitionKeys().size() > 0) {
if (!outputInfo.isDynamicPartitioningUsed()) {
List<String> partitionValues = getPartitionValueList(
@@ -181,6 +188,9 @@ class FileOutputFormatContainer extends
outputInfo.getTableName(), partitionValues, (short) 1);
if (currentParts.size() > 0) {
+ // If a table is partitioned and immutable, then the presence
+ // of the partition alone is enough to throw an error - we do
+ // not need to check for emptiness to decide to throw an error
throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION);
}
}
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
Fri Feb 28 23:04:36 2014
@@ -111,6 +111,8 @@ public class FosterStorageHandler extend
String parentPath = jobInfo.getTableInfo().getTableLocation();
String dynHash = tableDesc.getJobProperties().get(
HCatConstants.HCAT_DYNAMIC_PTN_JOBID);
+ String idHash = tableDesc.getJobProperties().get(
+ HCatConstants.HCAT_OUTPUT_ID_HASH);
// For dynamic partitioned writes without all keyvalues specified,
// we create a temp dir for the associated write job
@@ -122,6 +124,8 @@ public class FosterStorageHandler extend
parentPath = new Path(parentPath,
jobInfo.getCustomDynamicRoot()).toString();
}
parentPath = new Path(parentPath,
FileOutputCommitterContainer.DYNTEMP_DIR_NAME + dynHash).toString();
+ } else {
+ parentPath = new
Path(parentPath,FileOutputCommitterContainer.SCRATCH_DIR_NAME +
idHash).toString();
}
String outputLocation;
@@ -139,8 +143,8 @@ public class FosterStorageHandler extend
// honor custom location for external table apart from what metadata
specifies
outputLocation = jobInfo.getLocation();
} else if (dynHash == null && jobInfo.getPartitionValues().size() == 0) {
- // For non-partitioned tables, we send them to the temp dir
- outputLocation = TEMP_DIR_NAME;
+ // Unpartitioned table, writing to the scratch dir directly is good
enough.
+ outputLocation = "";
} else {
List<String> cols = new ArrayList<String>();
List<String> values = new ArrayList<String>();
@@ -156,11 +160,15 @@ public class FosterStorageHandler extend
outputLocation = FileUtils.makePartName(cols, values);
}
- jobInfo.setLocation(new Path(parentPath, outputLocation).toString());
+ if (outputLocation!= null && !outputLocation.isEmpty()){
+ jobInfo.setLocation(new Path(parentPath, outputLocation).toString());
+ } else {
+ jobInfo.setLocation(new Path(parentPath).toString());
+ }
//only set output dir if partition is fully materialized
- if (jobInfo.getPartitionValues().size()
- == jobInfo.getTableInfo().getPartitionColumns().size()) {
+ if (jobInfo.getPartitionValues().size() ==
+ jobInfo.getTableInfo().getPartitionColumns().size()) {
jobProperties.put("mapred.output.dir", jobInfo.getLocation());
}
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java
Fri Feb 28 23:04:36 2014
@@ -113,6 +113,14 @@ public class HCatOutputFormat extends HC
throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a
partition with sorted column definition from Pig/Mapreduce is not supported");
}
+ // Set up a common id hash for this job, so that when we create any
temporary directory
+ // later on, it is guaranteed to be unique.
+ String idHash;
+ if ((idHash = conf.get(HCatConstants.HCAT_OUTPUT_ID_HASH)) == null) {
+ idHash = String.valueOf(Math.random());
+ }
+ conf.set(HCatConstants.HCAT_OUTPUT_ID_HASH,idHash);
+
if (table.getTTable().getPartitionKeysSize() == 0) {
if ((outputJobInfo.getPartitionValues() != null) &&
(!outputJobInfo.getPartitionValues().isEmpty())) {
// attempt made to save partition values in non-partitioned table -
throw error.
@@ -153,9 +161,6 @@ public class HCatOutputFormat extends HC
String dynHash;
if ((dynHash = conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)) ==
null) {
dynHash = String.valueOf(Math.random());
-// LOG.info("New dynHash : ["+dynHash+"]");
-// }else{
-// LOG.info("Old dynHash : ["+dynHash+"]");
}
conf.set(HCatConstants.HCAT_DYNAMIC_PTN_JOBID, dynHash);
Modified:
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
(original)
+++
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
Fri Feb 28 23:04:36 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
@@ -90,7 +91,11 @@ public abstract class HCatMapReduceTest
return false;
}
- protected String inputFormat() {
+ protected boolean isTableImmutable() {
+ return true;
+ }
+
+ protected String inputFormat() {
return RCFileInputFormat.class.getName();
}
@@ -177,6 +182,9 @@ public abstract class HCatMapReduceTest
if (isTableExternal()) {
tableParams.put("EXTERNAL", "TRUE");
}
+ if (isTableImmutable()){
+ tableParams.put(hive_metastoreConstants.IS_IMMUTABLE,"true");
+ }
tbl.setParameters(tableParams);
client.createTable(tbl);
Modified:
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
(original)
+++
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
Fri Feb 28 23:04:36 2014
@@ -157,9 +157,11 @@ public class TestHCatDynamicPartitioned
assertTrue(exc != null);
assertTrue(exc instanceof HCatException);
assertTrue("Got exception of type [" + ((HCatException)
exc).getErrorType().toString()
- + "] Expected ERROR_PUBLISHING_PARTITION or ERROR_MOVE_FAILED",
+ + "] Expected ERROR_PUBLISHING_PARTITION or ERROR_MOVE_FAILED "
+ + "or ERROR_DUPLICATE_PARTITION",
(ErrorType.ERROR_PUBLISHING_PARTITION == ((HCatException)
exc).getErrorType())
|| (ErrorType.ERROR_MOVE_FAILED == ((HCatException)
exc).getErrorType())
+ || (ErrorType.ERROR_DUPLICATE_PARTITION == ((HCatException)
exc).getErrorType())
);
}
Added:
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java?rev=1573107&view=auto
==============================================================================
---
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java
(added)
+++
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java
Fri Feb 28 23:04:36 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+public class TestHCatExternalNonPartitioned extends TestHCatNonPartitioned {
+
+ @Override
+ protected Boolean isTableExternal() {
+ return true;
+ }
+
+}
Added:
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java?rev=1573107&view=auto
==============================================================================
---
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java
(added)
+++
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java
Fri Feb 28 23:04:36 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+public class TestHCatMutableDynamicPartitioned extends
TestHCatDynamicPartitioned {
+
+ @Override
+ protected boolean isTableImmutable() {
+ return false;
+ }
+
+}
Added:
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java?rev=1573107&view=auto
==============================================================================
---
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java
(added)
+++
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java
Fri Feb 28 23:04:36 2014
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+public class TestHCatMutableNonPartitioned extends TestHCatNonPartitioned {
+
+
+ @Override
+ protected boolean isTableImmutable() {
+ return false;
+ }
+
+}
Added:
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java?rev=1573107&view=auto
==============================================================================
---
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java
(added)
+++
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java
Fri Feb 28 23:04:36 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+public class TestHCatMutablePartitioned extends TestHCatPartitioned {
+
+ @Override
+ protected boolean isTableImmutable() {
+ return false;
+ }
+
+}
Modified:
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java
(original)
+++
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java
Fri Feb 28 23:04:36 2014
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.common.ErrorType;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
@@ -38,6 +39,8 @@ import org.junit.Test;
import static junit.framework.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
public class TestHCatNonPartitioned extends HCatMapReduceTest {
@@ -87,17 +90,21 @@ public class TestHCatNonPartitioned exte
Map<String, String> partitionMap = new HashMap<String, String>();
runMRCreate(null, partitionColumns, writeRecords, 10, true);
- //Test for duplicate publish
+ //Test for duplicate publish -- this will either fail on job creation time
+ // and throw an exception, or will fail at runtime, and fail the job.
+
IOException exc = null;
try {
- runMRCreate(null, partitionColumns, writeRecords, 20, true);
+ Job j = runMRCreate(null, partitionColumns, writeRecords, 20, true);
+ assertEquals(!isTableImmutable(),j.isSuccessful());
} catch (IOException e) {
exc = e;
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException)
exc).getErrorType());
+ }
+ if (!isTableImmutable()){
+ assertNull(exc);
}
-
- assertTrue(exc != null);
- assertTrue(exc instanceof HCatException);
- assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException)
exc).getErrorType());
//Test for publish with invalid partition key name
exc = null;
@@ -105,17 +112,21 @@ public class TestHCatNonPartitioned exte
partitionMap.put("px", "p1value2");
try {
- runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+ Job j = runMRCreate(partitionMap, partitionColumns, writeRecords, 20,
true);
+ assertFalse(j.isSuccessful());
} catch (IOException e) {
exc = e;
+ assertTrue(exc != null);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException)
exc).getErrorType());
}
- assertTrue(exc != null);
- assertTrue(exc instanceof HCatException);
- assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException)
exc).getErrorType());
-
- //Read should get 10 rows
- runMRRead(10);
+ //Read should get 10 rows if immutable, 30 if mutable
+ if (isTableImmutable()){
+ runMRRead(10);
+ } else {
+ runMRRead(30);
+ }
hiveReadTest();
}
@@ -132,6 +143,10 @@ public class TestHCatNonPartitioned exte
ArrayList<String> res = new ArrayList<String>();
driver.getResults(res);
- assertEquals(10, res.size());
+ if (isTableImmutable()){
+ assertEquals(10, res.size());
+ }else {
+ assertEquals(30, res.size());
+ }
}
}
Modified:
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java
(original)
+++
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java
Fri Feb 28 23:04:36 2014
@@ -40,8 +40,11 @@ import org.apache.hadoop.hive.metastore.
import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -154,7 +157,13 @@ public class TestHCatOutputFormat extend
}
public void publishTest(Job job) throws Exception {
- OutputCommitter committer = new FileOutputCommitterContainer(job, null);
+ HCatOutputFormat hcof = new HCatOutputFormat();
+ TaskAttemptContext tac =
ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(
+ job.getConfiguration(),
ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID());
+ OutputCommitter committer = hcof.getOutputCommitter(tac);
+ committer.setupJob(job);
+ committer.setupTask(tac);
+ committer.commitTask(tac);
committer.commitJob(job);
Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1"));
Modified:
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java
(original)
+++
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java
Fri Feb 28 23:04:36 2014
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.hive.hcatalog.common.ErrorType;
import org.apache.hive.hcatalog.common.HCatException;
import org.apache.hive.hcatalog.data.DefaultHCatRecord;
@@ -39,7 +40,9 @@ import org.junit.Test;
import static junit.framework.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
public class TestHCatPartitioned extends HCatMapReduceTest {
@@ -99,17 +102,21 @@ public class TestHCatPartitioned extends
runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
- //Test for duplicate publish
+ //Test for duplicate publish -- this will either fail on job creation time
+ // and throw an exception, or will fail at runtime, and fail the job.
+
IOException exc = null;
try {
- runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+ Job j = runMRCreate(partitionMap, partitionColumns, writeRecords, 20,
true);
+ assertEquals(!isTableImmutable(),j.isSuccessful());
} catch (IOException e) {
exc = e;
+ assertTrue(exc instanceof HCatException);
+ assertTrue(ErrorType.ERROR_DUPLICATE_PARTITION.equals(((HCatException)
exc).getErrorType()));
+ }
+ if (!isTableImmutable()){
+ assertNull(exc);
}
-
- assertNotNull(exc);
- assertTrue(exc instanceof HCatException);
- assertEquals(ErrorType.ERROR_DUPLICATE_PARTITION, ((HCatException)
exc).getErrorType());
//Test for publish with invalid partition key name
exc = null;
@@ -118,15 +125,15 @@ public class TestHCatPartitioned extends
partitionMap.put("px0", "p0value2");
try {
- runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+ Job j = runMRCreate(partitionMap, partitionColumns, writeRecords, 20,
true);
+ assertFalse(j.isSuccessful());
} catch (IOException e) {
exc = e;
+ assertNotNull(exc);
+ assertTrue(exc instanceof HCatException);
+ assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException)
exc).getErrorType());
}
- assertNotNull(exc);
- assertTrue(exc instanceof HCatException);
- assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException)
exc).getErrorType());
-
//Test for publish with missing partition key values
exc = null;
partitionMap.clear();
@@ -156,16 +163,27 @@ public class TestHCatPartitioned extends
// assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException)
exc).getErrorType());
// With Dynamic partitioning, this isn't an error that the keyValues
specified didn't values
- //Read should get 10 + 20 rows
- runMRRead(30);
+ //Read should get 10 + 20 rows if immutable, 50 (10+20+20) if mutable
+ if (isTableImmutable()){
+ runMRRead(30);
+ } else {
+ runMRRead(50);
+ }
//Read with partition filter
runMRRead(10, "part1 = \"p1value1\"");
- runMRRead(20, "part1 = \"p1value2\"");
- runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\"");
runMRRead(10, "part0 = \"p0value1\"");
- runMRRead(20, "part0 = \"p0value2\"");
- runMRRead(30, "part0 = \"p0value1\" or part0 = \"p0value2\"");
+ if (isTableImmutable()){
+ runMRRead(20, "part1 = \"p1value2\"");
+ runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\"");
+ runMRRead(20, "part0 = \"p0value2\"");
+ runMRRead(30, "part0 = \"p0value1\" or part0 = \"p0value2\"");
+ } else {
+ runMRRead(40, "part1 = \"p1value2\"");
+ runMRRead(50, "part1 = \"p1value1\" or part1 = \"p1value2\"");
+ runMRRead(40, "part0 = \"p0value2\"");
+ runMRRead(50, "part0 = \"p0value1\" or part0 = \"p0value2\"");
+ }
tableSchemaTest();
columnOrderChangeTest();
@@ -331,8 +349,12 @@ public class TestHCatPartitioned extends
runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
- //Read should get 10 + 20 + 10 + 10 + 20 rows
- runMRRead(70);
+ if (isTableImmutable()){
+ //Read should get 10 + 20 + 10 + 10 + 20 rows
+ runMRRead(70);
+ } else {
+ runMRRead(90); // +20 from the duplicate publish
+ }
}
//Test that data inserted through hcatoutputformat is readable from hive
@@ -347,6 +369,12 @@ public class TestHCatPartitioned extends
ArrayList<String> res = new ArrayList<String>();
driver.getResults(res);
- assertEquals(70, res.size());
+ if (isTableImmutable()){
+ //Read should get 10 + 20 + 10 + 10 + 20 rows
+ assertEquals(70, res.size());
+ } else {
+ assertEquals(90, res.size()); // +20 from the duplicate publish
+ }
+
}
}
Modified:
hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerWrapper.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerWrapper.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
---
hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerWrapper.java
(original)
+++
hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerWrapper.java
Fri Feb 28 23:04:36 2014
@@ -73,7 +73,18 @@ public class TestHCatStorerWrapper exten
server.executeBatch();
Assert.assertTrue(tmpExternalDir.exists());
- Assert.assertTrue(new File(tmpExternalDir.getPath().replaceAll("\\\\",
"/") + "/" + "part-m-00000").exists());
+
+ boolean found = false;
+ File[] f = tmpExternalDir.listFiles();
+ if (f != null) {
+ for (File fin : f){
+ if (fin.getPath().contains("part-m-00000")){
+ found = true;
+ }
+ }
+ }
+
+ Assert.assertTrue(found);
driver.run("select * from junit_external");
ArrayList<String> res = new ArrayList<String>();