Author: travis
Date: Thu Sep 13 17:09:01 2012
New Revision: 1384405

URL: http://svn.apache.org/viewvc?rev=1384405&view=rev
Log:
HCATALOG-500 HCatStorer should honor user-specified path for external tables

Added:
    
incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java
    
incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerWrapper.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    
incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java
    
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
    
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
    
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1384405&r1=1384404&r2=1384405&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Thu Sep 13 17:09:01 2012
@@ -38,6 +38,8 @@ Trunk (unreleased changes)
   HCAT-427 Document storage-based authorization (lefty via gates)
 
   IMPROVEMENTS
+  HCAT-500 HCatStorer should honor user-specified path for external tables 
(pengfeng via traviscrawford)
+
   HCAT-493 Convert classes with 2 space indentation to 4 space indentation for 
consistent style (amalakar via traviscrawford)
 
   HCAT-489 HCatalog style cleanups and readd javac debug option 
(traviscrawford)

Modified: 
incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java?rev=1384405&r1=1384404&r2=1384405&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java
 (original)
+++ 
incubator/hcatalog/trunk/hcatalog-pig-adapter/src/main/java/org/apache/hcatalog/pig/HCatStorer.java
 Thu Sep 13 17:09:01 2012
@@ -116,6 +116,10 @@ public class HCatStorer extends HCatBase
                     "Schema for data cannot be determined.",
                     PigHCatUtil.PIG_EXCEPTION_CODE);
             }
+            String externalLocation = (String) 
udfProps.getProperty(HCatConstants.HCAT_PIG_STORER_EXTERNAL_LOCATION);
+            if (externalLocation != null) {
+                outputJobInfo.setLocation(externalLocation);
+            }
             try {
                 HCatOutputFormat.setOutput(job, outputJobInfo);
             } catch (HCatException he) {

Added: 
incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java?rev=1384405&view=auto
==============================================================================
--- 
incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java
 (added)
+++ 
incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/HCatStorerWrapper.java
 Thu Sep 13 17:09:01 2012
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.pig;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.pig.impl.util.UDFContext;
+
+/**
+ * This class is used to test the HCAT_PIG_STORER_EXTERNAL_LOCATION property 
used in HCatStorer.
+ * When this property is set, HCatStorer writes the output to the location it 
specifies. Since
+ * the property can only be set in the UDFContext, we need this simpler 
wrapper to do three things:
+ * <ol>
+ * <li> save the external dir specified in the Pig script </li>
+ * <li> set the same UDFContext signature as HCatStorer </li>
+ * <li> before {@link HCatStorer#setStoreLocation(String, Job)}, set the 
external dir in the UDFContext.</li>
+ * </ol>
+ */
+public class HCatStorerWrapper extends HCatStorer {
+
+    private String sign;
+    private String externalDir;
+
+    public HCatStorerWrapper(String partSpecs, String schema, String 
externalDir) throws Exception {
+       super(partSpecs, schema);
+       this.externalDir = externalDir;
+    }
+
+    public HCatStorerWrapper(String partSpecs, String externalDir) throws 
Exception {
+       super(partSpecs);
+       this.externalDir = externalDir;
+    }
+
+    public HCatStorerWrapper(String externalDir) throws Exception{
+       super();
+       this.externalDir = externalDir;
+    }
+
+    @Override
+    public void setStoreLocation(String location, Job job) throws IOException {
+       Properties udfProps = UDFContext.getUDFContext().getUDFProperties(
+               this.getClass(), new String[] { sign });
+       udfProps.setProperty(HCatConstants.HCAT_PIG_STORER_EXTERNAL_LOCATION, 
externalDir);
+       super.setStoreLocation(location, job);
+    }
+
+    @Override
+    public void setStoreFuncUDFContextSignature(String signature) {
+       sign = signature;
+       super.setStoreFuncUDFContextSignature(signature);
+    }
+}

Added: 
incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerWrapper.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerWrapper.java?rev=1384405&view=auto
==============================================================================
--- 
incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerWrapper.java
 (added)
+++ 
incubator/hcatalog/trunk/hcatalog-pig-adapter/src/test/java/org/apache/hcatalog/pig/TestHCatStorerWrapper.java
 Thu Sep 13 17:09:01 2012
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.pig;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.UUID;
+
+import org.apache.commons.lang.SystemUtils;
+import org.apache.hadoop.hive.ql.CommandNeedRetryException;
+import org.apache.hcatalog.HcatTestUtils;
+import org.apache.hcatalog.mapreduce.HCatBaseTest;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This test checks the {@link 
HCatConstants#HCAT_PIG_STORER_EXTERNAL_LOCATION} that we can set in the
+ * UDFContext of {@link HCatStorer} so that it writes to the specified 
external location.
+ *
+ * Since {@link HCatStorer} does not allow extra parameters in the 
constructor, we use {@link HCatStorerWrapper}
+ * that always treats the last parameter as the external path.
+ */
+public class TestHCatStorerWrapper extends HCatBaseTest {
+
+    private static final String INPUT_FILE_NAME = TEST_DATA_DIR + 
"/input.data";
+
+    @Test
+    public void testStoreExternalTableWithExternalDir() throws IOException, 
CommandNeedRetryException{
+
+       File tmpExternalDir = new File(SystemUtils.getJavaIoTmpDir(), 
UUID.randomUUID().toString());
+       tmpExternalDir.deleteOnExit();
+
+       String part_val = "100";
+
+       driver.run("drop table junit_external");
+       String createTable = "create external table junit_external(a int, b 
string) partitioned by (c string) stored as RCFILE";
+       Assert.assertEquals(0, driver.run(createTable).getResponseCode());
+
+       int LOOP_SIZE = 3;
+       String[] inputData = new String[LOOP_SIZE*LOOP_SIZE];
+       int k = 0;
+       for(int i = 1; i <= LOOP_SIZE; i++) {
+           String si = i + "";
+           for(int j=1;j<=LOOP_SIZE;j++) {
+               inputData[k++] = si + "\t"+j;
+           }
+       }
+       HcatTestUtils.createTestDataFile(INPUT_FILE_NAME, inputData);
+       PigServer server = new PigServer(ExecType.LOCAL);
+       server.setBatchOn();
+       logAndRegister(server, "A = load '"+INPUT_FILE_NAME+"' as (a:int, 
b:chararray);");
+       logAndRegister(server, "store A into 'default.junit_external' using " + 
HCatStorerWrapper.class.getName()
+               + "('c=" + part_val + "','" + tmpExternalDir.getAbsolutePath() 
+ "');");
+       server.executeBatch();
+
+       Assert.assertTrue(tmpExternalDir.exists());
+       Assert.assertTrue(new File(tmpExternalDir.getAbsoluteFile() + "/" + 
"part-m-00000").exists());
+
+       driver.run("select * from junit_external");
+       ArrayList<String> res = new ArrayList<String>();
+       driver.getResults(res);
+       driver.run("drop table junit_external");
+       Iterator<String> itr = res.iterator();
+       for(int i = 1; i <= LOOP_SIZE; i++) {
+           String si = i + "";
+           for(int j=1;j<=LOOP_SIZE;j++) {
+               Assert.assertEquals( si + "\t" + j + "\t" + 
part_val,itr.next());
+           }
+       }
+       Assert.assertFalse(itr.hasNext());
+
+    }
+}

Modified: 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java?rev=1384405&r1=1384404&r2=1384405&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java 
(original)
+++ 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/common/HCatConstants.java 
Thu Sep 13 17:09:01 2012
@@ -43,6 +43,14 @@ public final class HCatConstants {
     public static final String HCAT_PIG_INNER_FIELD_NAME = 
"hcat.pig.inner.field.name";
     public static final String HCAT_PIG_INNER_FIELD_NAME_DEFAULT = 
"innerfield";
 
+    /**
+     * {@value} (default: null)
+     * When the property is set in the UDFContext of the {@link HCatStorer}, 
{@link HCatStorer} writes
+     * to the location it specifies instead of the default HCatalog location 
format. An example can be found
+     * in @{link HCatStorerWrapper}.
+     */
+    public static final String HCAT_PIG_STORER_EXTERNAL_LOCATION = 
HCAT_PIG_STORER + ".external.location";
+
     //The keys used to store info into the job Configuration
     public static final String HCAT_KEY_BASE = "mapreduce.lib.hcat";
 

Modified: 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1384405&r1=1384404&r2=1384405&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
 (original)
+++ 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FileOutputCommitterContainer.java
 Thu Sep 13 17:09:01 2012
@@ -288,7 +288,7 @@ class FileOutputCommitterContainer exten
             if (!dynamicPartitioningUsed) {
                 partitionsToAdd.add(
                     constructPartition(
-                        context,
+                        context, jobInfo,
                         tblPath.toString(), jobInfo.getPartitionValues()
                         , jobInfo.getOutputSchema(), 
getStorerParameterMap(storer)
                         , table, fs
@@ -297,7 +297,7 @@ class FileOutputCommitterContainer exten
                 for (Entry<String, Map<String, String>> entry : 
partitionsDiscoveredByPath.entrySet()) {
                     partitionsToAdd.add(
                         constructPartition(
-                            context,
+                            context, jobInfo,
                             getPartitionRootLocation(entry.getKey(), 
entry.getValue().size()), entry.getValue()
                             , jobInfo.getOutputSchema(), 
getStorerParameterMap(storer)
                             , table, fs
@@ -402,6 +402,8 @@ class FileOutputCommitterContainer exten
 
     /**
      * Generate partition metadata object to be used to add to metadata.
+     * @param context The job context.
+     * @param jobInfo The OutputJobInfo.
      * @param partLocnRoot The table-equivalent location root of the partition
      *                       (temporary dir if dynamic partition, table dir if 
static)
      * @param partKVs The keyvalue pairs that form the partition
@@ -416,7 +418,7 @@ class FileOutputCommitterContainer exten
      */
 
     private Partition constructPartition(
-        JobContext context,
+        JobContext context, OutputJobInfo jobInfo,
         String partLocnRoot, Map<String, String> partKVs,
         HCatSchema outputSchema, Map<String, String> params,
         Table table, FileSystem fs,
@@ -440,16 +442,26 @@ class FileOutputCommitterContainer exten
 
         // Sets permissions and group name on partition dirs and files.
 
-        Path partPath = new Path(partLocnRoot);
-        int i = 0;
-        for (FieldSchema partKey : table.getPartitionKeys()) {
-            if (i++ != 0) {
-                applyGroupAndPerms(fs, partPath, perms, grpName, false);
+        Path partPath;
+        if (Boolean.valueOf((String)table.getProperty("EXTERNAL"))
+               && jobInfo.getLocation() != null && 
jobInfo.getLocation().length() > 0) {
+            // honor external table that specifies the location
+            partPath = new Path(jobInfo.getLocation());
+        } else {
+            partPath = new Path(partLocnRoot);
+            int i = 0;
+            for (FieldSchema partKey : table.getPartitionKeys()) {
+                if (i++ != 0) {
+                    applyGroupAndPerms(fs, partPath, perms, grpName, false);
+                }
+                partPath = constructPartialPartPath(partPath, 
partKey.getName().toLowerCase(), partKVs);
             }
-            partPath = constructPartialPartPath(partPath, 
partKey.getName().toLowerCase(), partKVs);
         }
+
         // Apply the group and permissions to the leaf partition and files.
         applyGroupAndPerms(fs, partPath, perms, grpName, true);
+
+        // Set the location in the StorageDescriptor
         if (dynamicPartitioningUsed) {
             String dynamicPartitionDestination = 
getFinalDynamicPartitionDestination(table, partKVs);
             if (harProcessor.isEnabled()) {

Modified: 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java?rev=1384405&r1=1384404&r2=1384405&view=diff
==============================================================================
--- 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java
 (original)
+++ 
incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/FosterStorageHandler.java
 Thu Sep 13 17:09:01 2012
@@ -116,8 +116,12 @@ public class FosterStorageHandler extend
 
             String outputLocation;
 
-            // For non-partitioned tables, we send them to the temp dir
-            if (dynHash == null && jobInfo.getPartitionValues().size() == 0) {
+            if 
(Boolean.valueOf((String)tableDesc.getProperties().get("EXTERNAL"))
+                   && jobInfo.getLocation() != null && 
jobInfo.getLocation().length() > 0) {
+                // honor external table that specifies the location
+                outputLocation = jobInfo.getLocation();
+            } else if (dynHash == null && jobInfo.getPartitionValues().size() 
== 0) {
+                // For non-partitioned tables, we send them to the temp dir
                 outputLocation = TEMP_DIR_NAME;
             } else {
                 List<String> cols = new ArrayList<String>();


Reply via email to