Author: khorgath
Date: Fri Feb 28 23:04:36 2014
New Revision: 1573107

URL: http://svn.apache.org/r1573107
Log:
HIVE-6475 : Implement support for appending to mutable tables in HCatalog 
(Sushanth Sowmyan, reviewed by Daniel Dai)

Added:
    
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java
    
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java
    
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java
    
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java
Removed:
    
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalHCatNonPartitioned.java
Modified:
    
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
    
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
    
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java
    
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
    
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java
    
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
    
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
    
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java
    
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java
    
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java
    
hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerWrapper.java

Modified: 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
--- 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
 (original)
+++ 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
 Fri Feb 28 23:04:36 2014
@@ -119,7 +119,9 @@ public final class HCatConstants {
   public static final String HCAT_MSGBUS_TOPIC_NAMING_POLICY = 
"hcat.msgbus.topic.naming.policy";
   public static final String HCAT_MSGBUS_TOPIC_PREFIX = 
"hcat.msgbus.topic.prefix";
 
-  public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE + 
"dynamic.jobid";
+  public static final String HCAT_OUTPUT_ID_HASH = HCAT_KEY_OUTPUT_BASE + 
".id";
+
+  public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE + 
".dynamic.jobid";
   public static final boolean HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED = false;
   public static final String HCAT_DYNAMIC_CUSTOM_PATTERN = 
"hcat.dynamic.partitioning.custom.pattern";
 

Modified: 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
--- 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
 (original)
+++ 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
 Fri Feb 28 23:04:36 2014
@@ -35,14 +35,16 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.JobConf;
@@ -69,8 +71,10 @@ class FileOutputCommitterContainer exten
 
   private static final String TEMP_DIR_NAME = "_temporary";
   private static final String LOGS_DIR_NAME = "_logs";
-  /** The directory under which data is initially written for a partitioned 
table */
+
   static final String DYNTEMP_DIR_NAME = "_DYN";
+  static final String SCRATCH_DIR_NAME = "_SCRATCH";
+  private static final String APPEND_SUFFIX = "_a_";
 
   private static final Logger LOG = 
LoggerFactory.getLogger(FileOutputCommitterContainer.class);
   private final boolean dynamicPartitioningUsed;
@@ -174,6 +178,7 @@ class FileOutputCommitterContainer exten
       }
       Path src;
       OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
+      Path tblPath = new Path(jobInfo.getTableInfo().getTableLocation());
       if (dynamicPartitioningUsed) {
         if (!customDynamicLocationUsed) {
           src = new Path(getPartitionRootLocation(jobInfo.getLocation(), 
jobInfo.getTableInfo().getTable()
@@ -191,7 +196,9 @@ class FileOutputCommitterContainer exten
       // directory containing open files. So on Windows, we will leave output 
directory
       // behind when job fail. User needs to remove the output directory 
manually
       LOG.info("Job failed. Try cleaning up temporary directory [{}].", src);
-      fs.delete(src, true);
+      if (!src.equals(tblPath)){
+        fs.delete(src, true);
+      }
     } finally {
       cancelDelegationTokens(jobContext);
     }
@@ -332,13 +339,17 @@ class FileOutputCommitterContainer exten
     } else if (!dynamicPartitioningUsed
          && Boolean.valueOf((String)table.getProperty("EXTERNAL"))
          && jobInfo.getLocation() != null && jobInfo.getLocation().length() > 
0) {
-      // honor external table that specifies the location
-      partPath = new Path(jobInfo.getLocation());
+      // Now, we need to de-scratchify this location - i.e., get rid of any
+      // _SCRATCH[\d].?[\d]+ from the location.
+      String jobLocation = jobInfo.getLocation();
+      String finalLocn = jobLocation.replaceAll(Path.SEPARATOR + 
SCRATCH_DIR_NAME + "\\d\\.?\\d+","");
+      partPath = new Path(finalLocn);
     } else {
       partPath = new Path(partLocnRoot);
       int i = 0;
       for (FieldSchema partKey : table.getPartitionKeys()) {
         if (i++ != 0) {
+          fs.mkdirs(partPath); // Attempt to make the path in case it does not 
exist before we check
           applyGroupAndPerms(fs, partPath, perms, grpName, false);
         }
         partPath = constructPartialPartPath(partPath, 
partKey.getName().toLowerCase(), partKVs);
@@ -347,6 +358,7 @@ class FileOutputCommitterContainer exten
 
     // Apply the group and permissions to the leaf partition and files.
     // Need not bother in case of HDFS as permission is taken care of by 
setting UMask
+    fs.mkdirs(partPath); // Attempt to make the path in case it does not exist 
before we check
     if (!ShimLoader.getHadoopShims().getHCatShim().isFileInHDFS(fs, partPath)) 
{
       applyGroupAndPerms(fs, partPath, perms, grpName, true);
     }
@@ -370,6 +382,11 @@ class FileOutputCommitterContainer exten
   private void applyGroupAndPerms(FileSystem fs, Path dir, FsPermission 
permission,
                   String group, boolean recursive)
     throws IOException {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("applyGroupAndPerms : " + dir +
+          " perms: " + permission +
+          " group: " + group + " recursive: " + recursive);
+    }
     fs.setPermission(dir, permission);
     if (recursive) {
       for (FileStatus fileStatus : fs.listStatus(dir)) {
@@ -457,24 +474,38 @@ class FileOutputCommitterContainer exten
    *                 on whether other files exist where we're trying to copy
    * @throws java.io.IOException
    */
-  private void moveTaskOutputs(FileSystem fs,
-                 Path file,
-                 Path srcDir,
-                 Path destDir, final boolean dryRun) throws IOException {
+  private void moveTaskOutputs(FileSystem fs, Path file, Path srcDir,
+                 Path destDir, final boolean dryRun, boolean immutable
+      ) throws IOException {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("moveTaskOutputs "
+          + file + " from: " + srcDir + " to: " + destDir
+          + " dry: " + dryRun + " immutable: " + immutable);
+    }
+
+    if (dynamicPartitioningUsed) {
+      immutable = true; // Making sure we treat dynamic partitioning jobs as 
if they were immutable.
+    }
 
     if (file.getName().equals(TEMP_DIR_NAME) || 
file.getName().equals(LOGS_DIR_NAME) || 
file.getName().equals(SUCCEEDED_FILE_NAME)) {
       return;
     }
-    final Path finalOutputPath = getFinalPath(file, srcDir, destDir);
+
+    final Path finalOutputPath = getFinalPath(fs, file, srcDir, destDir, 
immutable);
+
     if (fs.isFile(file)) {
       if (dryRun){
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("Testing if moving file: [" + file + "] to ["
-              + finalOutputPath + "] would cause a problem");
-        }
-        if (fs.exists(finalOutputPath)) {
-          throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already 
exists in " + finalOutputPath
-              + ", duplicate publish not possible.");
+        if (immutable){
+          // Dryrun checks are meaningless for mutable table - we should 
always succeed
+          // unless there is a runtime IOException.
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("Testing if moving file: [" + file + "] to ["
+                + finalOutputPath + "] would cause a problem");
+          }
+          if (fs.exists(finalOutputPath)) {
+            throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already 
exists in "
+                + finalOutputPath + ", duplicate publish not possible.");
+          }
         }
       } else {
         if(LOG.isDebugEnabled()) {
@@ -493,12 +524,15 @@ class FileOutputCommitterContainer exten
         }
       }
     } else if(fs.getFileStatus(file).isDir()) {
+
       FileStatus[] children = fs.listStatus(file);
       FileStatus firstChild = null;
       if (children != null) {
         int index=0;
         while (index < children.length) {
-          if (!children[index].getPath().getName().equals(TEMP_DIR_NAME) && 
!children[index].getPath().getName().equals(LOGS_DIR_NAME) && 
!children[index].getPath().getName().equals(SUCCEEDED_FILE_NAME)) {
+          if ( !children[index].getPath().getName().equals(TEMP_DIR_NAME)
+              && !children[index].getPath().getName().equals(LOGS_DIR_NAME)
+              && 
!children[index].getPath().getName().equals(SUCCEEDED_FILE_NAME)) {
             firstChild = children[index];
             break;
           }
@@ -509,14 +543,17 @@ class FileOutputCommitterContainer exten
         // If the first child is directory, then rest would be directory too 
according to HCatalog dir structure
         // recurse in that case
         for (FileStatus child : children) {
-          moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun);
+          moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun, 
immutable);
         }
       } else {
 
         if (!dryRun) {
           if (dynamicPartitioningUsed) {
+
             // Optimization: if the first child is file, we have reached the 
leaf directory, move the parent directory itself
             // instead of moving each file under the directory. See 
HCATALOG-538
+            // Note for future Append implementation : This optimization is 
another reason dynamic
+            // partitioning is currently incompatible with append on mutable 
tables.
 
             final Path parentDir = finalOutputPath.getParent();
             // Create the directory
@@ -539,16 +576,21 @@ class FileOutputCommitterContainer exten
             }
             fs.delete(placeholder, false);
           } else {
+
             // In case of no partition we have to move each file
             for (FileStatus child : children) {
-              moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun);
+              moveTaskOutputs(fs, child.getPath(), srcDir, destDir, dryRun, 
immutable);
             }
+
           }
+
         } else {
-          if(fs.exists(finalOutputPath)) {
-            throw new HCatException(ErrorType.ERROR_MOVE_FAILED, "Data already 
exists in " + finalOutputPath
+          if(immutable && fs.exists(finalOutputPath) && 
!MetaStoreUtils.isDirEmpty(fs, finalOutputPath)) {
+
+            throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION, "Data 
already exists in " + finalOutputPath
                 + ", duplicate publish not possible.");
           }
+
         }
       }
     } else {
@@ -560,15 +602,16 @@ class FileOutputCommitterContainer exten
 
   /**
    * Find the final name of a given output file, given the output directory
-   * and the work directory.
+   * and the work directory. If immutable, attempt to create file of name
+   * _aN till we find an item that does not exist.
    * @param file the file to move
    * @param src the source directory
    * @param dest the target directory
    * @return the final path for the specific output file
    * @throws java.io.IOException
    */
-  private Path getFinalPath(Path file, Path src,
-                Path dest) throws IOException {
+  private Path getFinalPath(FileSystem fs, Path file, Path src,
+                Path dest, final boolean immutable) throws IOException {
     URI taskOutputUri = file.toUri();
     URI relativePath = src.toUri().relativize(taskOutputUri);
     if (taskOutputUri == relativePath) {
@@ -576,8 +619,43 @@ class FileOutputCommitterContainer exten
         src + " child = " + file);
     }
     if (relativePath.getPath().length() > 0) {
-      return new Path(dest, relativePath.getPath());
+
+      Path itemDest = new Path(dest, relativePath.getPath());
+      if (!immutable){
+        String name = relativePath.getPath();
+        String filetype;
+        int index = name.lastIndexOf('.');
+        if (index >= 0) {
+          filetype = name.substring(index);
+          name = name.substring(0, index);
+        } else {
+          filetype = "";
+        }
+
+        // Attempt to find COUNTER_MAX possible alternatives to a filename by
+        // appending _a_N and seeing if that destination also clashes. If we're
+        // still clashing after that, give up.
+        final int COUNTER_MAX = 1000;
+        int counter = 1;
+        for (; fs.exists(itemDest) && counter < COUNTER_MAX ; counter++) {
+          itemDest = new Path(dest, name + (APPEND_SUFFIX + counter) + 
filetype);
+        }
+
+        if (counter == COUNTER_MAX){
+          throw new HCatException(ErrorType.ERROR_MOVE_FAILED,
+              "Could not find a unique destination path for move: file = "
+                  + file + " , src = " + src + ", dest = " + dest);
+        }
+
+      }
+
+      if (LOG.isDebugEnabled()){
+        LOG.debug("FinalPath(file:"+file+":"+src+"->"+dest+"="+itemDest);
+      }
+
+      return itemDest;
     } else {
+
       return dest;
     }
   }
@@ -671,8 +749,10 @@ class FileOutputCommitterContainer exten
       //Move data from temp directory the actual table directory
       //No metastore operation required.
       Path src = new Path(jobInfo.getLocation());
-      moveTaskOutputs(fs, src, src, tblPath, false);
-      fs.delete(src, true);
+      moveTaskOutputs(fs, src, src, tblPath, false, table.isImmutable());
+      if (!src.equals(tblPath)){
+        fs.delete(src, true);
+      }
       return;
     }
 
@@ -715,15 +795,38 @@ class FileOutputCommitterContainer exten
         ptnInfos.add(InternalUtil.createPtnKeyValueMap(new 
Table(tableInfo.getTable()), ptn));
       }
 
+      /**
+       * Dynamic partitioning & Append incompatibility note:
+       *
+       * Currently, we do not support mixing dynamic partitioning and append 
in the
+       * same job. One reason is that we need exhaustive testing of corner 
cases
+       * for that, and a second reason is the behaviour of add_partitions. To 
support
+       * dynamic partitioning with append, we'd have to have a 
add_partitions_if_not_exist
+       * call, rather than an add_partitions call. Thus far, we've tried to 
keep the
+       * implementation of append jobtype-agnostic, but here, in code, we 
assume that
+       * a table is considered immutable if dynamic partitioning is enabled on 
the job.
+       *
+       * This does not mean that we can check before the job begins that this 
is going
+       * to be a dynamic partition job on an immutable table and thus fail the 
job, since
+       * it is quite possible to have a dynamic partitioning job run on an 
unpopulated
+       * immutable table. It simply means that at the end of the job, as far 
as copying
+       * in data is concerned, we will pretend that the table is immutable 
irrespective
+       * of what table.isImmutable() tells us.
+       */
+
       //Publish the new partition(s)
       if (dynamicPartitioningUsed && harProcessor.isEnabled() && 
(!partitionsToAdd.isEmpty())){
+
         if (!customDynamicLocationUsed) {
           Path src = new Path(ptnRootLocation);
           // check here for each dir we're copying out, to see if it
-          // already exists, error out if so
-          moveTaskOutputs(fs, src, src, tblPath, true);
-          moveTaskOutputs(fs, src, src, tblPath, false);
-          fs.delete(src, true);
+          // already exists, error out if so.
+          // Also, treat dyn-writes as writes to immutable tables.
+          moveTaskOutputs(fs, src, src, tblPath, true, true); // dryRun = 
true, immutable = true
+          moveTaskOutputs(fs, src, src, tblPath, false, true);
+          if (!src.equals(tblPath)){
+            fs.delete(src, true);
+          }
         } else {
           moveCustomLocationTaskOutputs(fs, table, hiveConf);
         }
@@ -744,21 +847,84 @@ class FileOutputCommitterContainer exten
         }
 
       }else{
+
         // no harProcessor, regular operation
         updateTableSchema(client, table, jobInfo.getOutputSchema());
         LOG.info("HAR not is not being used. The table {} has new partitions 
{}.", table.getTableName(), ptnInfos);
-        if (dynamicPartitioningUsed && (partitionsToAdd.size()>0)){
-          if (!customDynamicLocationUsed) {
-            Path src = new Path(ptnRootLocation);
-            moveTaskOutputs(fs, src, src, tblPath, true);
-            moveTaskOutputs(fs, src, src, tblPath, false);
-            fs.delete(src, true);
+        if (partitionsToAdd.size() > 0){
+          if (!dynamicPartitioningUsed ) {
+
+            // regular single-partition write into a partitioned table.
+            //Move data from temp directory the actual table directory
+            if (partitionsToAdd.size() > 1){
+              throw new HCatException(ErrorType.ERROR_PUBLISHING_PARTITION,
+                  "More than one partition to publish in non-dynamic 
partitioning job");
+            }
+            Partition p = partitionsToAdd.get(0);
+            Path src = new Path(jobInfo.getLocation());
+            Path dest = new Path(p.getSd().getLocation());
+            moveTaskOutputs(fs, src, src, dest, true, table.isImmutable());
+            moveTaskOutputs(fs,src,src,dest,false,table.isImmutable());
+            if (!src.equals(dest)){
+              fs.delete(src, true);
+            }
+
+            // Now, we check if the partition already exists. If not, we go 
ahead.
+            // If so, we error out if immutable, and if mutable, check that 
the partition's IF
+            // matches our current job's IF (table's IF) to check for 
compatibility. If compatible, we
+            // ignore and do not add. If incompatible, we error out again.
+
+            boolean publishRequired = false;
+            try {
+              Partition existingP = 
client.getPartition(p.getDbName(),p.getTableName(),p.getValues());
+              if (existingP != null){
+                if (table.isImmutable()){
+                  throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION,
+                      "Attempted duplicate partition publish on to immutable 
table");
+                } else {
+                  if (! 
existingP.getSd().getInputFormat().equals(table.getInputFormatClass().getName())){
+                    throw new 
HCatException(ErrorType.ERROR_PUBLISHING_PARTITION,
+                        "Attempted partition append, where old partition 
format was "
+                            + existingP.getSd().getInputFormat()
+                            + " and table format was "
+                            + table.getInputFormatClass().getName());
+                  }
+                }
+              } else {
+                publishRequired = true;
+              }
+            } catch (NoSuchObjectException e){
+              // All good, no such partition exists, move on.
+              publishRequired = true;
+            }
+            if (publishRequired){
+              client.add_partitions(partitionsToAdd);
+              partitionsAdded = partitionsToAdd;
+            }
+
           } else {
-            moveCustomLocationTaskOutputs(fs, table, hiveConf);
+            // Dynamic partitioning usecase
+            if (!customDynamicLocationUsed) {
+              Path src = new Path(ptnRootLocation);
+              moveTaskOutputs(fs, src, src, tblPath, true, true); // dryRun = 
true, immutable = true
+              moveTaskOutputs(fs, src, src, tblPath, false, true);
+              if (!src.equals(tblPath)){
+                fs.delete(src, true);
+              }
+            } else {
+              moveCustomLocationTaskOutputs(fs, table, hiveConf);
+            }
+            client.add_partitions(partitionsToAdd);
+            partitionsAdded = partitionsToAdd;
           }
         }
-        client.add_partitions(partitionsToAdd);
-        partitionsAdded = partitionsToAdd;
+
+        // Set permissions appropriately for each of the partitions we just 
created
+        // so as to have their permissions mimic the table permissions
+        for (Partition p : partitionsAdded){
+          applyGroupAndPerms(fs,new 
Path(p.getSd().getLocation()),tblStat.getPermission(),tblStat.getGroup(),true);
+        }
+
       }
     } catch (Exception e) {
       if (partitionsAdded.size() > 0) {
@@ -793,8 +959,8 @@ class FileOutputCommitterContainer exten
     for (Entry<String, Map<String, String>> entry : 
partitionsDiscoveredByPath.entrySet()) {
       Path src = new Path(entry.getKey());
       Path destPath = new Path(getFinalDynamicPartitionDestination(table, 
entry.getValue(), jobInfo));
-      moveTaskOutputs(fs, src, src, destPath, true);
-      moveTaskOutputs(fs, src, src, destPath, false);
+      moveTaskOutputs(fs, src, src, destPath, true, true); // dryRun = true, 
immutable = true
+      moveTaskOutputs(fs, src, src, destPath, false, true);
     }
     // delete the parent temp directory of all custom dynamic partitions
     Path parentPath = new Path(getCustomPartitionRootLocation(jobInfo, conf));

Modified: 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
--- 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java
 (original)
+++ 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputFormatContainer.java
 Fri Feb 28 23:04:36 2014
@@ -149,7 +149,10 @@ class FileOutputFormatContainer extends 
   }
 
   /**
-   * Handles duplicate publish of partition. Fails if partition already exists.
+   * Handles duplicate publish of partition or data into an unpartitioned table
+   * if the table is immutable
+   *
+   * For partitioned tables, fails if partition already exists.
    * For non partitioned tables, fails if files are present in table directory.
    * For dynamic partitioned publish, does nothing - check would need to be 
done at recordwriter time
    * @param context the job
@@ -161,17 +164,21 @@ class FileOutputFormatContainer extends 
    * @throws org.apache.thrift.TException
    */
   private static void handleDuplicatePublish(JobContext context, OutputJobInfo 
outputInfo,
-                         HiveMetaStoreClient client, Table table) throws 
IOException, MetaException, TException, NoSuchObjectException {
+      HiveMetaStoreClient client, Table table)
+      throws IOException, MetaException, TException, NoSuchObjectException {
 
     /*
-    * For fully specified ptn, follow strict checks for existence of 
partitions in metadata
-    * For unpartitioned tables, follow filechecks
-    * For partially specified tables:
-    *    This would then need filechecks at the start of a ptn write,
-    *    Doing metadata checks can get potentially very expensive (fat conf) if
-    *    there are a large number of partitions that match the partial 
specifications
-    */
+     * For fully specified ptn, follow strict checks for existence of 
partitions in metadata
+     * For unpartitioned tables, follow filechecks
+     * For partially specified tables:
+     *    This would then need filechecks at the start of a ptn write,
+     *    Doing metadata checks can get potentially very expensive (fat conf) 
if
+     *    there are a large number of partitions that match the partial 
specifications
+     */
 
+    if (!table.isImmutable()){
+      return;
+    }
     if (table.getPartitionKeys().size() > 0) {
       if (!outputInfo.isDynamicPartitioningUsed()) {
         List<String> partitionValues = getPartitionValueList(
@@ -181,6 +188,9 @@ class FileOutputFormatContainer extends 
           outputInfo.getTableName(), partitionValues, (short) 1);
 
         if (currentParts.size() > 0) {
+          // If a table is partitioned and immutable, then the presence
+          // of the partition alone is enough to throw an error - we do
+          // not need to check for emptiness to decide to throw an error
           throw new HCatException(ErrorType.ERROR_DUPLICATE_PARTITION);
         }
       }

Modified: 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
--- 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
 (original)
+++ 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
 Fri Feb 28 23:04:36 2014
@@ -111,6 +111,8 @@ public class FosterStorageHandler extend
       String parentPath = jobInfo.getTableInfo().getTableLocation();
       String dynHash = tableDesc.getJobProperties().get(
         HCatConstants.HCAT_DYNAMIC_PTN_JOBID);
+      String idHash = tableDesc.getJobProperties().get(
+          HCatConstants.HCAT_OUTPUT_ID_HASH);
 
       // For dynamic partitioned writes without all keyvalues specified,
       // we create a temp dir for the associated write job
@@ -122,6 +124,8 @@ public class FosterStorageHandler extend
           parentPath = new Path(parentPath, 
jobInfo.getCustomDynamicRoot()).toString();
         }
         parentPath = new Path(parentPath, 
FileOutputCommitterContainer.DYNTEMP_DIR_NAME + dynHash).toString();
+      } else {
+        parentPath = new 
Path(parentPath,FileOutputCommitterContainer.SCRATCH_DIR_NAME + 
idHash).toString();
       }
 
       String outputLocation;
@@ -139,8 +143,8 @@ public class FosterStorageHandler extend
         // honor custom location for external table apart from what metadata 
specifies
         outputLocation = jobInfo.getLocation();
       } else if (dynHash == null && jobInfo.getPartitionValues().size() == 0) {
-        // For non-partitioned tables, we send them to the temp dir
-        outputLocation = TEMP_DIR_NAME;
+        // Unpartitioned table, writing to the scratch dir directly is good 
enough.
+        outputLocation = "";
       } else {
         List<String> cols = new ArrayList<String>();
         List<String> values = new ArrayList<String>();
@@ -156,11 +160,15 @@ public class FosterStorageHandler extend
         outputLocation = FileUtils.makePartName(cols, values);
       }
 
-      jobInfo.setLocation(new Path(parentPath, outputLocation).toString());
+      if (outputLocation!= null && !outputLocation.isEmpty()){
+        jobInfo.setLocation(new Path(parentPath, outputLocation).toString());
+      } else {
+        jobInfo.setLocation(new Path(parentPath).toString());
+      }
 
       //only set output dir if partition is fully materialized
-      if (jobInfo.getPartitionValues().size()
-        == jobInfo.getTableInfo().getPartitionColumns().size()) {
+      if (jobInfo.getPartitionValues().size() ==
+          jobInfo.getTableInfo().getPartitionColumns().size()) {
         jobProperties.put("mapred.output.dir", jobInfo.getLocation());
       }
 

Modified: 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
--- 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java
 (original)
+++ 
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java
 Fri Feb 28 23:04:36 2014
@@ -113,6 +113,14 @@ public class HCatOutputFormat extends HC
         throw new HCatException(ErrorType.ERROR_NOT_SUPPORTED, "Store into a 
partition with sorted column definition from Pig/Mapreduce is not supported");
       }
 
+      // Set up a common id hash for this job, so that when we create any 
temporary directory
+      // later on, it is guaranteed to be unique.
+      String idHash;
+      if ((idHash = conf.get(HCatConstants.HCAT_OUTPUT_ID_HASH)) == null) {
+        idHash = String.valueOf(Math.random());
+      }
+      conf.set(HCatConstants.HCAT_OUTPUT_ID_HASH,idHash);
+
       if (table.getTTable().getPartitionKeysSize() == 0) {
         if ((outputJobInfo.getPartitionValues() != null) && 
(!outputJobInfo.getPartitionValues().isEmpty())) {
           // attempt made to save partition values in non-partitioned table - 
throw error.
@@ -153,9 +161,6 @@ public class HCatOutputFormat extends HC
           String dynHash;
           if ((dynHash = conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID)) == 
null) {
             dynHash = String.valueOf(Math.random());
-//              LOG.info("New dynHash : ["+dynHash+"]");
-//            }else{
-//              LOG.info("Old dynHash : ["+dynHash+"]");
           }
           conf.set(HCatConstants.HCAT_DYNAMIC_PTN_JOBID, dynHash);
 

Modified: 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
--- 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
 (original)
+++ 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
 Fri Feb 28 23:04:36 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
 import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe;
@@ -90,7 +91,11 @@ public abstract class HCatMapReduceTest 
     return false;
   }
 
-  protected String inputFormat() { 
+  protected boolean isTableImmutable() {
+    return true;
+  }
+
+  protected String inputFormat() {
     return RCFileInputFormat.class.getName();
   }
 
@@ -177,6 +182,9 @@ public abstract class HCatMapReduceTest 
     if (isTableExternal()) {
       tableParams.put("EXTERNAL", "TRUE");
     }
+    if (isTableImmutable()){
+      tableParams.put(hive_metastoreConstants.IS_IMMUTABLE,"true");
+    }
     tbl.setParameters(tableParams);
 
     client.createTable(tbl);

Modified: 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
--- 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
 (original)
+++ 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
 Fri Feb 28 23:04:36 2014
@@ -157,9 +157,11 @@ public class TestHCatDynamicPartitioned 
       assertTrue(exc != null);
       assertTrue(exc instanceof HCatException);
       assertTrue("Got exception of type [" + ((HCatException) 
exc).getErrorType().toString()
-          + "] Expected ERROR_PUBLISHING_PARTITION or ERROR_MOVE_FAILED",
+          + "] Expected ERROR_PUBLISHING_PARTITION or ERROR_MOVE_FAILED "
+          + "or ERROR_DUPLICATE_PARTITION",
           (ErrorType.ERROR_PUBLISHING_PARTITION == ((HCatException) 
exc).getErrorType())
               || (ErrorType.ERROR_MOVE_FAILED == ((HCatException) 
exc).getErrorType())
+              || (ErrorType.ERROR_DUPLICATE_PARTITION == ((HCatException) 
exc).getErrorType())
       );
     }
 

Added: 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java?rev=1573107&view=auto
==============================================================================
--- 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java
 (added)
+++ 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalNonPartitioned.java
 Fri Feb 28 23:04:36 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+public class TestHCatExternalNonPartitioned extends TestHCatNonPartitioned {
+
+  @Override
+  protected Boolean isTableExternal() {
+    return true;
+  }
+
+}

Added: 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java?rev=1573107&view=auto
==============================================================================
--- 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java
 (added)
+++ 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableDynamicPartitioned.java
 Fri Feb 28 23:04:36 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+public class TestHCatMutableDynamicPartitioned extends 
TestHCatDynamicPartitioned {
+
+  @Override
+  protected boolean isTableImmutable() {
+    return false;
+  }
+
+}

Added: 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java?rev=1573107&view=auto
==============================================================================
--- 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java
 (added)
+++ 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutableNonPartitioned.java
 Fri Feb 28 23:04:36 2014
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+public class TestHCatMutableNonPartitioned extends TestHCatNonPartitioned {
+
+
+  @Override
+  protected boolean isTableImmutable() {
+    return false;
+  }
+
+}

Added: 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java?rev=1573107&view=auto
==============================================================================
--- 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java
 (added)
+++ 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatMutablePartitioned.java
 Fri Feb 28 23:04:36 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+public class TestHCatMutablePartitioned extends TestHCatPartitioned {
+
+  @Override
+  protected boolean isTableImmutable() {
+    return false;
+  }
+
+}

Modified: 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
--- 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java
 (original)
+++ 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatNonPartitioned.java
 Fri Feb 28 23:04:36 2014
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.common.ErrorType;
 import org.apache.hive.hcatalog.common.HCatException;
 import org.apache.hive.hcatalog.data.DefaultHCatRecord;
@@ -38,6 +39,8 @@ import org.junit.Test;
 
 import static junit.framework.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 
 public class TestHCatNonPartitioned extends HCatMapReduceTest {
 
@@ -87,17 +90,21 @@ public class TestHCatNonPartitioned exte
     Map<String, String> partitionMap = new HashMap<String, String>();
     runMRCreate(null, partitionColumns, writeRecords, 10, true);
 
-    //Test for duplicate publish
+    //Test for duplicate publish -- this will either fail on job creation time
+    // and throw an exception, or will fail at runtime, and fail the job.
+
     IOException exc = null;
     try {
-      runMRCreate(null, partitionColumns, writeRecords, 20, true);
+      Job j = runMRCreate(null, partitionColumns, writeRecords, 20, true);
+      assertEquals(!isTableImmutable(),j.isSuccessful());
     } catch (IOException e) {
       exc = e;
+      assertTrue(exc instanceof HCatException);
+      assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException) 
exc).getErrorType());
+    }
+    if (!isTableImmutable()){
+      assertNull(exc);
     }
-
-    assertTrue(exc != null);
-    assertTrue(exc instanceof HCatException);
-    assertEquals(ErrorType.ERROR_NON_EMPTY_TABLE, ((HCatException) 
exc).getErrorType());
 
     //Test for publish with invalid partition key name
     exc = null;
@@ -105,17 +112,21 @@ public class TestHCatNonPartitioned exte
     partitionMap.put("px", "p1value2");
 
     try {
-      runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+      Job j = runMRCreate(partitionMap, partitionColumns, writeRecords, 20, 
true);
+      assertFalse(j.isSuccessful());
     } catch (IOException e) {
       exc = e;
+      assertTrue(exc != null);
+      assertTrue(exc instanceof HCatException);
+      assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) 
exc).getErrorType());
     }
 
-    assertTrue(exc != null);
-    assertTrue(exc instanceof HCatException);
-    assertEquals(ErrorType.ERROR_INVALID_PARTITION_VALUES, ((HCatException) 
exc).getErrorType());
-
-    //Read should get 10 rows
-    runMRRead(10);
+    //Read should get 10 rows if immutable, 30 if mutable
+    if (isTableImmutable()){
+      runMRRead(10);
+    } else {
+      runMRRead(30);
+    }
 
     hiveReadTest();
   }
@@ -132,6 +143,10 @@ public class TestHCatNonPartitioned exte
 
     ArrayList<String> res = new ArrayList<String>();
     driver.getResults(res);
-    assertEquals(10, res.size());
+    if (isTableImmutable()){
+      assertEquals(10, res.size());
+    }else {
+      assertEquals(30, res.size());
+    }
   }
 }

Modified: 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
--- 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java
 (original)
+++ 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatOutputFormat.java
 Fri Feb 28 23:04:36 2014
@@ -40,8 +40,11 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.ql.io.RCFileInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFileOutputFormat;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -154,7 +157,13 @@ public class TestHCatOutputFormat extend
   }
 
   public void publishTest(Job job) throws Exception {
-    OutputCommitter committer = new FileOutputCommitterContainer(job, null);
+    HCatOutputFormat hcof = new HCatOutputFormat();
+    TaskAttemptContext tac = 
ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptContext(
+        job.getConfiguration(), 
ShimLoader.getHadoopShims().getHCatShim().createTaskAttemptID());
+    OutputCommitter committer = hcof.getOutputCommitter(tac);
+    committer.setupJob(job);
+    committer.setupTask(tac);
+    committer.commitTask(tac);
     committer.commitJob(job);
 
     Partition part = client.getPartition(dbName, tblName, Arrays.asList("p1"));

Modified: 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
--- 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java
 (original)
+++ 
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatPartitioned.java
 Fri Feb 28 23:04:36 2014
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.common.ErrorType;
 import org.apache.hive.hcatalog.common.HCatException;
 import org.apache.hive.hcatalog.data.DefaultHCatRecord;
@@ -39,7 +40,9 @@ import org.junit.Test;
 
 import static junit.framework.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 
 public class TestHCatPartitioned extends HCatMapReduceTest {
 
@@ -99,17 +102,21 @@ public class TestHCatPartitioned extends
 
     runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
 
-    //Test for duplicate publish
+    //Test for duplicate publish -- this will either fail on job creation time
+    // and throw an exception, or will fail at runtime, and fail the job.
+
     IOException exc = null;
     try {
-      runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+      Job j = runMRCreate(partitionMap, partitionColumns, writeRecords, 20, 
true);
+      assertEquals(!isTableImmutable(),j.isSuccessful());
     } catch (IOException e) {
       exc = e;
+      assertTrue(exc instanceof HCatException);
+      assertTrue(ErrorType.ERROR_DUPLICATE_PARTITION.equals(((HCatException) 
exc).getErrorType()));
+    }
+    if (!isTableImmutable()){
+      assertNull(exc);
     }
-
-    assertNotNull(exc);
-    assertTrue(exc instanceof HCatException);
-    assertEquals(ErrorType.ERROR_DUPLICATE_PARTITION, ((HCatException) 
exc).getErrorType());
 
     //Test for publish with invalid partition key name
     exc = null;
@@ -118,15 +125,15 @@ public class TestHCatPartitioned extends
     partitionMap.put("px0", "p0value2");
 
     try {
-      runMRCreate(partitionMap, partitionColumns, writeRecords, 20, true);
+      Job j = runMRCreate(partitionMap, partitionColumns, writeRecords, 20, 
true);
+      assertFalse(j.isSuccessful());
     } catch (IOException e) {
       exc = e;
+      assertNotNull(exc);
+      assertTrue(exc instanceof HCatException);
+      assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException) 
exc).getErrorType());
     }
 
-    assertNotNull(exc);
-    assertTrue(exc instanceof HCatException);
-    assertEquals(ErrorType.ERROR_MISSING_PARTITION_KEY, ((HCatException) 
exc).getErrorType());
-
     //Test for publish with missing partition key values
     exc = null;
     partitionMap.clear();
@@ -156,16 +163,27 @@ public class TestHCatPartitioned extends
 //    assertEquals(ErrorType.ERROR_PUBLISHING_PARTITION, ((HCatException) 
exc).getErrorType());
     // With Dynamic partitioning, this isn't an error that the keyValues 
specified didn't values
 
-    //Read should get 10 + 20 rows
-    runMRRead(30);
+    //Read should get 10 + 20 rows if immutable, 50 (10+20+20) if mutable
+    if (isTableImmutable()){
+      runMRRead(30);
+    } else {
+      runMRRead(50);
+    }
 
     //Read with partition filter
     runMRRead(10, "part1 = \"p1value1\"");
-    runMRRead(20, "part1 = \"p1value2\"");
-    runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\"");
     runMRRead(10, "part0 = \"p0value1\"");
-    runMRRead(20, "part0 = \"p0value2\"");
-    runMRRead(30, "part0 = \"p0value1\" or part0 = \"p0value2\"");
+    if (isTableImmutable()){
+      runMRRead(20, "part1 = \"p1value2\"");
+      runMRRead(30, "part1 = \"p1value1\" or part1 = \"p1value2\"");
+      runMRRead(20, "part0 = \"p0value2\"");
+      runMRRead(30, "part0 = \"p0value1\" or part0 = \"p0value2\"");
+    } else {
+      runMRRead(40, "part1 = \"p1value2\"");
+      runMRRead(50, "part1 = \"p1value1\" or part1 = \"p1value2\"");
+      runMRRead(40, "part0 = \"p0value2\"");
+      runMRRead(50, "part0 = \"p0value1\" or part0 = \"p0value2\"");
+    }
 
     tableSchemaTest();
     columnOrderChangeTest();
@@ -331,8 +349,12 @@ public class TestHCatPartitioned extends
 
     runMRCreate(partitionMap, partitionColumns, writeRecords, 10, true);
 
-    //Read should get 10 + 20 + 10 + 10 + 20 rows
-    runMRRead(70);
+    if (isTableImmutable()){
+      //Read should get 10 + 20 + 10 + 10 + 20 rows
+      runMRRead(70);
+    } else {
+      runMRRead(90); // +20 from the duplicate publish
+    }
   }
 
   //Test that data inserted through hcatoutputformat is readable from hive
@@ -347,6 +369,12 @@ public class TestHCatPartitioned extends
 
     ArrayList<String> res = new ArrayList<String>();
     driver.getResults(res);
-    assertEquals(70, res.size());
+    if (isTableImmutable()){
+      //Read should get 10 + 20 + 10 + 10 + 20 rows
+      assertEquals(70, res.size());
+    } else {
+      assertEquals(90, res.size()); // +20 from the duplicate publish
+    }
+
   }
 }

Modified: 
hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerWrapper.java
URL: 
http://svn.apache.org/viewvc/hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerWrapper.java?rev=1573107&r1=1573106&r2=1573107&view=diff
==============================================================================
--- 
hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerWrapper.java
 (original)
+++ 
hive/trunk/hcatalog/hcatalog-pig-adapter/src/test/java/org/apache/hive/hcatalog/pig/TestHCatStorerWrapper.java
 Fri Feb 28 23:04:36 2014
@@ -73,7 +73,18 @@ public class TestHCatStorerWrapper exten
     server.executeBatch();
 
     Assert.assertTrue(tmpExternalDir.exists());
-    Assert.assertTrue(new File(tmpExternalDir.getPath().replaceAll("\\\\", 
"/") + "/" + "part-m-00000").exists());
+
+    boolean found = false;
+    File[] f = tmpExternalDir.listFiles();
+    if (f != null) {
+      for (File fin : f){
+        if (fin.getPath().contains("part-m-00000")){
+          found = true;
+        }
+      }
+    }
+
+    Assert.assertTrue(found);
 
     driver.run("select * from junit_external");
     ArrayList<String> res = new ArrayList<String>();


Reply via email to