Author: khorgath
Date: Thu Feb 13 19:01:15 2014
New Revision: 1568008
URL: http://svn.apache.org/r1568008
Log:
HIVE-6109 : Support customized location for EXTERNAL tables created by Dynamic
Partitioning (Satish Mittal via Sushanth Sowmyan)
Added:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java?rev=1568008&r1=1568007&r2=1568008&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/common/HCatConstants.java
Thu Feb 13 19:01:15 2014
@@ -121,6 +121,7 @@ public final class HCatConstants {
public static final String HCAT_DYNAMIC_PTN_JOBID = HCAT_KEY_OUTPUT_BASE +
"dynamic.jobid";
public static final boolean HCAT_IS_DYNAMIC_MAX_PTN_CHECK_ENABLED = false;
+ public static final String HCAT_DYNAMIC_CUSTOM_PATTERN =
"hcat.dynamic.partitioning.custom.pattern";
// Message Bus related properties.
public static final String HCAT_DEFAULT_TOPIC_PREFIX = "hcat";
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java?rev=1568008&r1=1568007&r2=1568008&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FileOutputCommitterContainer.java
Thu Feb 13 19:01:15 2014
@@ -69,10 +69,13 @@ class FileOutputCommitterContainer exten
private static final String TEMP_DIR_NAME = "_temporary";
private static final String LOGS_DIR_NAME = "_logs";
+ /** The directory under which data is initially written for a partitioned
table */
+ static final String DYNTEMP_DIR_NAME = "_DYN";
private static final Logger LOG =
LoggerFactory.getLogger(FileOutputCommitterContainer.class);
private final boolean dynamicPartitioningUsed;
private boolean partitionsDiscovered;
+ private final boolean customDynamicLocationUsed;
private Map<String, Map<String, String>> partitionsDiscoveredByPath;
private Map<String, JobContext> contextDiscoveredByPath;
@@ -97,6 +100,14 @@ class FileOutputCommitterContainer exten
this.partitionsDiscovered = !dynamicPartitioningUsed;
cachedStorageHandler =
HCatUtil.getStorageHandler(context.getConfiguration(),
jobInfo.getTableInfo().getStorerInfo());
+ Table table = new Table(jobInfo.getTableInfo().getTable());
+ if (dynamicPartitioningUsed &&
Boolean.valueOf((String)table.getProperty("EXTERNAL"))
+ && jobInfo.getCustomDynamicPath() != null
+ && jobInfo.getCustomDynamicPath().length() > 0) {
+ customDynamicLocationUsed = true;
+ } else {
+ customDynamicLocationUsed = false;
+ }
}
@Override
@@ -164,8 +175,12 @@ class FileOutputCommitterContainer exten
Path src;
OutputJobInfo jobInfo = HCatOutputFormat.getJobInfo(jobContext);
if (dynamicPartitioningUsed) {
- src = new Path(getPartitionRootLocation(jobInfo.getLocation(),
jobInfo.getTableInfo().getTable()
- .getPartitionKeysSize()));
+ if (!customDynamicLocationUsed) {
+ src = new Path(getPartitionRootLocation(jobInfo.getLocation(),
jobInfo.getTableInfo().getTable()
+ .getPartitionKeysSize()));
+ } else {
+ src = new Path(getCustomPartitionRootLocation(jobInfo,
jobContext.getConfiguration()));
+ }
} else {
src = new Path(jobInfo.getLocation());
}
@@ -235,7 +250,26 @@ class FileOutputCommitterContainer exten
throw new IOException("The method cleanupJob is deprecated and should not
be called.");
}
+ private String getCustomPartitionRootLocation(OutputJobInfo jobInfo,
Configuration conf) {
+ if (ptnRootLocation == null) {
+ // we only need to calculate it once, it'll be the same for other
partitions in this job.
+ String parentPath = jobInfo.getTableInfo().getTableLocation();
+ if (jobInfo.getCustomDynamicRoot() != null
+ && jobInfo.getCustomDynamicRoot().length() > 0) {
+ parentPath = new Path(parentPath,
jobInfo.getCustomDynamicRoot()).toString();
+ }
+ Path ptnRoot = new Path(parentPath, DYNTEMP_DIR_NAME +
+ conf.get(HCatConstants.HCAT_DYNAMIC_PTN_JOBID));
+ ptnRootLocation = ptnRoot.toString();
+ }
+ return ptnRootLocation;
+ }
+
private String getPartitionRootLocation(String ptnLocn, int numPtnKeys) {
+ if (customDynamicLocationUsed) {
+ return null;
+ }
+
if (ptnRootLocation == null) {
// we only need to calculate it once, it'll be the same for other
partitions in this job.
Path ptnRoot = new Path(ptnLocn);
@@ -255,6 +289,7 @@ class FileOutputCommitterContainer exten
* @param jobInfo The OutputJobInfo.
* @param partLocnRoot The table-equivalent location root of the partition
* (temporary dir if dynamic partition, table dir if
static)
+ * @param dynPartPath The path of dynamic partition which is created
* @param partKVs The keyvalue pairs that form the partition
* @param outputSchema The output schema for the partition
* @param params The parameters to store inside the partition
@@ -268,7 +303,7 @@ class FileOutputCommitterContainer exten
private Partition constructPartition(
JobContext context, OutputJobInfo jobInfo,
- String partLocnRoot, Map<String, String> partKVs,
+ String partLocnRoot, String dynPartPath, Map<String, String> partKVs,
HCatSchema outputSchema, Map<String, String> params,
Table table, FileSystem fs,
String grpName, FsPermission perms) throws IOException {
@@ -292,7 +327,10 @@ class FileOutputCommitterContainer exten
// Sets permissions and group name on partition dirs and files.
Path partPath;
- if (Boolean.valueOf((String)table.getProperty("EXTERNAL"))
+ if (customDynamicLocationUsed) {
+ partPath = new Path(dynPartPath);
+ } else if (!dynamicPartitioningUsed
+ && Boolean.valueOf((String)table.getProperty("EXTERNAL"))
&& jobInfo.getLocation() != null && jobInfo.getLocation().length() >
0) {
// honor external table that specifies the location
partPath = new Path(jobInfo.getLocation());
@@ -315,7 +353,7 @@ class FileOutputCommitterContainer exten
// Set the location in the StorageDescriptor
if (dynamicPartitioningUsed) {
- String dynamicPartitionDestination =
getFinalDynamicPartitionDestination(table, partKVs);
+ String dynamicPartitionDestination =
getFinalDynamicPartitionDestination(table, partKVs, jobInfo);
if (harProcessor.isEnabled()) {
harProcessor.exec(context, partition, partPath);
partition.getSd().setLocation(
@@ -344,14 +382,25 @@ class FileOutputCommitterContainer exten
}
}
- private String getFinalDynamicPartitionDestination(Table table, Map<String,
String> partKVs) {
- //
file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA
->
- // file:///tmp/hcat_junit_warehouse/employee/emp_country=IN/emp_state=KA
+ private String getFinalDynamicPartitionDestination(Table table, Map<String,
String> partKVs,
+ OutputJobInfo jobInfo) {
Path partPath = new Path(table.getTTable().getSd().getLocation());
- for (FieldSchema partKey : table.getPartitionKeys()) {
- partPath = constructPartialPartPath(partPath,
partKey.getName().toLowerCase(), partKVs);
+ if (!customDynamicLocationUsed) {
+ //
file:///tmp/hcat_junit_warehouse/employee/_DYN0.7770480401313761/emp_country=IN/emp_state=KA
->
+ // file:///tmp/hcat_junit_warehouse/employee/emp_country=IN/emp_state=KA
+ for (FieldSchema partKey : table.getPartitionKeys()) {
+ partPath = constructPartialPartPath(partPath,
partKey.getName().toLowerCase(), partKVs);
+ }
+
+ return partPath.toString();
+ } else {
+ // if custom root specified, update the parent path
+ if (jobInfo.getCustomDynamicRoot() != null
+ && jobInfo.getCustomDynamicRoot().length() > 0) {
+ partPath = new Path(partPath, jobInfo.getCustomDynamicRoot());
+ }
+ return new Path(partPath, HCatFileUtil.resolveCustomPath(jobInfo,
partKVs, false)).toString();
}
- return partPath.toString();
}
private Map<String, String> getStorerParameterMap(StorerInfo storer) {
@@ -480,8 +529,11 @@ class FileOutputCommitterContainer exten
if (LOG.isDebugEnabled()) {
LOG.debug("Moving directory: " + file + " to " + parentDir);
}
- if (!fs.rename(file, parentDir)) {
- final String msg = "Failed to move file: " + file + " to " +
parentDir;
+
+ // If custom dynamic location provided, need to rename to final
output path
+ Path dstPath = !customDynamicLocationUsed ? parentDir :
finalOutputPath;
+ if (!fs.rename(file, dstPath)) {
+ final String msg = "Failed to move file: " + file + " to " +
dstPath;
LOG.error(msg);
throw new HCatException(ErrorType.ERROR_MOVE_FAILED, msg);
}
@@ -576,7 +628,12 @@ class FileOutputCommitterContainer exten
for (FileStatus st : status) {
LinkedHashMap<String, String> fullPartSpec = new
LinkedHashMap<String, String>();
- Warehouse.makeSpecFromName(fullPartSpec, st.getPath());
+ if (!customDynamicLocationUsed) {
+ Warehouse.makeSpecFromName(fullPartSpec, st.getPath());
+ } else {
+ HCatFileUtil.getPartKeyValuesForCustomLocation(fullPartSpec,
jobInfo,
+ st.getPath().toString());
+ }
partitionsDiscoveredByPath.put(st.getPath().toString(),
fullPartSpec);
JobConf jobConf = (JobConf)context.getConfiguration();
JobContext currContext = HCatMapRedUtil.createJobContext(
@@ -636,7 +693,7 @@ class FileOutputCommitterContainer exten
partitionsToAdd.add(
constructPartition(
context,jobInfo,
- tblPath.toString(), jobInfo.getPartitionValues()
+ tblPath.toString(), null, jobInfo.getPartitionValues()
,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
,table, fs
,grpName,perms));
@@ -645,7 +702,8 @@ class FileOutputCommitterContainer exten
partitionsToAdd.add(
constructPartition(
context,jobInfo,
-
getPartitionRootLocation(entry.getKey(),entry.getValue().size()),
entry.getValue()
+
getPartitionRootLocation(entry.getKey(),entry.getValue().size())
+ ,entry.getKey(), entry.getValue()
,jobInfo.getOutputSchema(), getStorerParameterMap(storer)
,table, fs
,grpName,perms));
@@ -659,13 +717,16 @@ class FileOutputCommitterContainer exten
//Publish the new partition(s)
if (dynamicPartitioningUsed && harProcessor.isEnabled() &&
(!partitionsToAdd.isEmpty())){
-
- Path src = new Path(ptnRootLocation);
- // check here for each dir we're copying out, to see if it
- // already exists, error out if so
- moveTaskOutputs(fs, src, src, tblPath, true);
- moveTaskOutputs(fs, src, src, tblPath, false);
- fs.delete(src, true);
+ if (!customDynamicLocationUsed) {
+ Path src = new Path(ptnRootLocation);
+ // check here for each dir we're copying out, to see if it
+ // already exists, error out if so
+ moveTaskOutputs(fs, src, src, tblPath, true);
+ moveTaskOutputs(fs, src, src, tblPath, false);
+ fs.delete(src, true);
+ } else {
+ moveCustomLocationTaskOutputs(fs, table, hiveConf);
+ }
try {
updateTableSchema(client, table, jobInfo.getOutputSchema());
LOG.info("HAR is being used. The table {} has new partitions {}.",
table.getTableName(), ptnInfos);
@@ -687,10 +748,14 @@ class FileOutputCommitterContainer exten
updateTableSchema(client, table, jobInfo.getOutputSchema());
LOG.info("HAR not is not being used. The table {} has new partitions
{}.", table.getTableName(), ptnInfos);
if (dynamicPartitioningUsed && (partitionsToAdd.size()>0)){
- Path src = new Path(ptnRootLocation);
- moveTaskOutputs(fs, src, src, tblPath, true);
- moveTaskOutputs(fs, src, src, tblPath, false);
- fs.delete(src, true);
+ if (!customDynamicLocationUsed) {
+ Path src = new Path(ptnRootLocation);
+ moveTaskOutputs(fs, src, src, tblPath, true);
+ moveTaskOutputs(fs, src, src, tblPath, false);
+ fs.delete(src, true);
+ } else {
+ moveCustomLocationTaskOutputs(fs, table, hiveConf);
+ }
}
client.add_partitions(partitionsToAdd);
partitionsAdded = partitionsToAdd;
@@ -720,6 +785,24 @@ class FileOutputCommitterContainer exten
}
}
+ private void moveCustomLocationTaskOutputs(FileSystem fs, Table table,
Configuration conf)
+ throws IOException {
+ // in case of custom dynamic partitions, we can't just move the sub-tree
of partition root
+ // directory since the partitions location contain regex pattern. We need
to first find the
+ // final destination of each partition and move its output.
+ for (Entry<String, Map<String, String>> entry :
partitionsDiscoveredByPath.entrySet()) {
+ Path src = new Path(entry.getKey());
+ Path destPath = new Path(getFinalDynamicPartitionDestination(table,
entry.getValue(), jobInfo));
+ moveTaskOutputs(fs, src, src, destPath, true);
+ moveTaskOutputs(fs, src, src, destPath, false);
+ }
+ // delete the parent temp directory of all custom dynamic partitions
+ Path parentPath = new Path(getCustomPartitionRootLocation(jobInfo, conf));
+ if (fs.exists(parentPath)) {
+ fs.delete(parentPath, true);
+ }
+ }
+
private void cancelDelegationTokens(JobContext context) throws IOException{
LOG.info("Cancelling deletgation token for the job.");
HiveMetaStoreClient client = null;
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java?rev=1568008&r1=1568007&r2=1568008&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/FosterStorageHandler.java
Thu Feb 13 19:01:15 2014
@@ -49,9 +49,6 @@ import java.util.Map;
public class FosterStorageHandler extends DefaultStorageHandler {
public Configuration conf;
- /** The directory under which data is initially written for a partitioned
table */
- protected static final String DYNTEMP_DIR_NAME = "_DYN";
-
/** The directory under which data is initially written for a non
partitioned table */
protected static final String TEMP_DIR_NAME = "_TEMP";
@@ -118,17 +115,28 @@ public class FosterStorageHandler extend
// For dynamic partitioned writes without all keyvalues specified,
// we create a temp dir for the associated write job
if (dynHash != null) {
- parentPath = new Path(parentPath,
- DYNTEMP_DIR_NAME + dynHash).toString();
+ // if external table and custom root specified, update the parent path
+ if (Boolean.valueOf((String)tableDesc.getProperties().get("EXTERNAL"))
+ && jobInfo.getCustomDynamicRoot() != null
+ && jobInfo.getCustomDynamicRoot().length() > 0) {
+ parentPath = new Path(parentPath,
jobInfo.getCustomDynamicRoot()).toString();
+ }
+ parentPath = new Path(parentPath,
FileOutputCommitterContainer.DYNTEMP_DIR_NAME + dynHash).toString();
}
String outputLocation;
- if ((dynHash == null)
+ if ((dynHash != null)
+ && Boolean.valueOf((String)tableDesc.getProperties().get("EXTERNAL"))
+ && jobInfo.getCustomDynamicPath() != null
+ && jobInfo.getCustomDynamicPath().length() > 0) {
+ // dynamic partitioning with custom path; resolve the custom path
+ // using partition column values
+ outputLocation = HCatFileUtil.resolveCustomPath(jobInfo, null, true);
+ } else if ((dynHash == null)
&&
Boolean.valueOf((String)tableDesc.getProperties().get("EXTERNAL"))
&& jobInfo.getLocation() != null && jobInfo.getLocation().length()
> 0) {
// honor custom location for external table apart from what metadata
specifies
- // only if we're not using dynamic partitioning - see HIVE-5011
outputLocation = jobInfo.getLocation();
} else if (dynHash == null && jobInfo.getPartitionValues().size() == 0) {
// For non-partitioned tables, we send them to the temp dir
Added:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java?rev=1568008&view=auto
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java
(added)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatFileUtil.java
Thu Feb 13 19:01:15 2014
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hive.hcatalog.mapreduce;
+
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.Path;
+
+public class HCatFileUtil {
+
+ // regex of the form: ${column name}. Following characters are not allowed
in column name:
+ // whitespace characters, /, {, }, \
+ private static final Pattern customPathPattern =
Pattern.compile("(\\$\\{)([^\\s/\\{\\}\\\\]+)(\\})");
+
+ // This method parses the custom dynamic path and replaces each occurrence
+ // of column name within regex pattern with its corresponding value, if
provided
+ public static String resolveCustomPath(OutputJobInfo jobInfo,
+ Map<String, String> dynPartKVs, boolean createRegexPath) {
+ // get custom path string
+ String customPath = jobInfo.getCustomDynamicPath();
+ // create matcher for custom path
+ Matcher matcher = customPathPattern.matcher(customPath);
+ // get the set of all partition columns in custom path
+ HashSet<String> partColumns = new HashSet<String>();
+ Map<String, String> partKVs = dynPartKVs != null ? dynPartKVs :
+ jobInfo.getPartitionValues();
+
+ // build the final custom path string by replacing each column name with
+ // its value, if provided
+ StringBuilder sb = new StringBuilder();
+ int previousEndIndex = 0;
+ while (matcher.find()) {
+ // append the path substring since previous match
+ sb.append(customPath.substring(previousEndIndex, matcher.start()));
+ if (createRegexPath) {
+ // append the first group within pattern: "${"
+ sb.append(matcher.group(1));
+ }
+
+ // column name is the second group from current match
+ String columnName = matcher.group(2).toLowerCase();
+ partColumns.add(columnName);
+
+ // find the value of matched column
+ String columnValue = partKVs.get(columnName);
+ // if column value is provided, replace column name with value
+ if (columnValue != null) {
+ sb.append(columnValue);
+ } else {
+ sb.append("__HIVE_DEFAULT_PARTITION__");
+ }
+
+ if (createRegexPath) {
+ // append the third group within pattern: "}"
+ sb.append(matcher.group(3));
+ }
+
+ // update startIndex
+ previousEndIndex = matcher.end();
+ }
+
+ // append the trailing path string, if any
+ if (previousEndIndex < customPath.length()) {
+ sb.append(customPath.substring(previousEndIndex, customPath.length()));
+ }
+
+ // validate that the set of partition columns found in custom path must
match
+ // the set of dynamic partitions
+ if (partColumns.size() != jobInfo.getDynamicPartitioningKeys().size()) {
+ throw new IllegalArgumentException("Unable to configure custom dynamic
location, "
+ + " mismatch between number of dynamic partition columns obtained["
+ partColumns.size()
+ + "] and number of dynamic partition columns required["
+ + jobInfo.getDynamicPartitioningKeys().size() + "]");
+ }
+
+ return sb.toString();
+ }
+
+ public static void getPartKeyValuesForCustomLocation(Map<String, String>
partSpec,
+ OutputJobInfo jobInfo, String partitionPath) {
+ // create matchers for custom path string as well as actual dynamic
partition path created
+ Matcher customPathMatcher =
customPathPattern.matcher(jobInfo.getCustomDynamicPath());
+ Matcher dynamicPathMatcher = customPathPattern.matcher(partitionPath);
+
+ while (customPathMatcher.find() && dynamicPathMatcher.find()) {
+ // get column name from custom path matcher and column value from
dynamic path matcher
+ partSpec.put(customPathMatcher.group(2), dynamicPathMatcher.group(2));
+ }
+
+ // add any partition key values provided as part of job info
+ partSpec.putAll(jobInfo.getPartitionValues());
+ }
+
+ public static void setCustomPath(String customPathFormat, OutputJobInfo
jobInfo) {
+ // find the root of all custom paths from custom pattern. The root is the
+ // largest prefix in input pattern string that doesn't match
customPathPattern
+ Path customPath = new Path(customPathFormat);
+ URI customURI = customPath.toUri();
+ while (customPath != null && !customPath.toString().isEmpty()) {
+ Matcher m = customPathPattern.matcher(customPath.toString());
+ if (!m.find()) {
+ break;
+ }
+ customPath = customPath.getParent();
+ }
+
+ URI rootURI = customPath.toUri();
+ URI childURI = rootURI.relativize(customURI);
+ jobInfo.setCustomDynamicLocation(rootURI.getPath(), childURI.getPath());
+ }
+}
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java?rev=1568008&r1=1568007&r2=1568008&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/HCatOutputFormat.java
Thu Feb 13 19:01:15 2014
@@ -159,6 +159,11 @@ public class HCatOutputFormat extends HC
}
conf.set(HCatConstants.HCAT_DYNAMIC_PTN_JOBID, dynHash);
+ // if custom pattern is set in case of dynamic partitioning,
configure custom path
+ String customPattern =
conf.get(HCatConstants.HCAT_DYNAMIC_CUSTOM_PATTERN);
+ if (customPattern != null) {
+ HCatFileUtil.setCustomPath(customPattern, outputJobInfo);
+ }
}
outputJobInfo.setPartitionValues(valueMap);
Modified:
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java?rev=1568008&r1=1568007&r2=1568008&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java
(original)
+++
hive/trunk/hcatalog/core/src/main/java/org/apache/hive/hcatalog/mapreduce/OutputJobInfo.java
Thu Feb 13 19:01:15 2014
@@ -50,6 +50,12 @@ public class OutputJobInfo implements Se
/** The location of the partition being written */
private String location;
+ /** The root location of custom dynamic partitions being written */
+ private String customDynamicRoot;
+
+ /** The relative path of custom dynamic partitions being written */
+ private String customDynamicPath;
+
/** The partition values to publish to, if used for output*/
private Map<String, String> partitionValues;
@@ -163,6 +169,28 @@ public class OutputJobInfo implements Se
}
/**
+ * @param customDynamicLocation the custom location for dynamic partitions
+ */
+ void setCustomDynamicLocation(String customDynamicRoot, String
customDynamicPath) {
+ this.customDynamicRoot = customDynamicRoot;
+ this.customDynamicPath = customDynamicPath;
+ }
+
+ /**
+ * @return the root location for custom dynamic partitions
+ */
+ String getCustomDynamicRoot() {
+ return customDynamicRoot;
+ }
+
+ /**
+ * @return the relative path custom location for dynamic partitions
+ */
+ String getCustomDynamicPath() {
+ return customDynamicPath;
+ }
+
+ /**
* Sets the value of partitionValues
* @param partitionValues the partition values to set
*/
Modified:
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java?rev=1568008&r1=1568007&r2=1568008&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
(original)
+++
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/HCatMapReduceTest.java
Thu Feb 13 19:01:15 2014
@@ -84,6 +84,7 @@ public abstract class HCatMapReduceTest
protected abstract List<FieldSchema> getTableColumns();
private static FileSystem fs;
+ private String externalTableLocation = null;
protected Boolean isTableExternal() {
return false;
@@ -123,6 +124,12 @@ public abstract class HCatMapReduceTest
String databaseName = (dbName == null) ?
MetaStoreUtils.DEFAULT_DATABASE_NAME : dbName;
client.dropTable(databaseName, tableName);
+ // in case of external table, drop the table contents as well
+ if (isTableExternal() && (externalTableLocation != null)) {
+ if (fs.exists(new Path(externalTableLocation))) {
+ fs.delete(new Path(externalTableLocation), true);
+ }
+ }
} catch (Exception e) {
e.printStackTrace();
throw e;
@@ -167,6 +174,9 @@ public abstract class HCatMapReduceTest
sd.setOutputFormat(outputFormat());
Map<String, String> tableParams = new HashMap<String, String>();
+ if (isTableExternal()) {
+ tableParams.put("EXTERNAL", "TRUE");
+ }
tbl.setParameters(tableParams);
client.createTable(tbl);
@@ -234,7 +244,8 @@ public abstract class HCatMapReduceTest
Job runMRCreate(Map<String, String> partitionValues,
List<HCatFieldSchema> partitionColumns, List<HCatRecord> records,
int writeCount, boolean assertWrite) throws Exception {
- return runMRCreate(partitionValues, partitionColumns, records, writeCount,
assertWrite, true);
+ return runMRCreate(partitionValues, partitionColumns, records, writeCount,
assertWrite,
+ true, null);
}
/**
@@ -250,7 +261,8 @@ public abstract class HCatMapReduceTest
*/
Job runMRCreate(Map<String, String> partitionValues,
List<HCatFieldSchema> partitionColumns, List<HCatRecord> records,
- int writeCount, boolean assertWrite, boolean asSingleMapTask) throws
Exception {
+ int writeCount, boolean assertWrite, boolean asSingleMapTask,
+ String customDynamicPathPattern) throws Exception {
writeRecords = records;
MapCreate.writeCount = 0;
@@ -283,6 +295,9 @@ public abstract class HCatMapReduceTest
job.setOutputFormatClass(HCatOutputFormat.class);
OutputJobInfo outputJobInfo = OutputJobInfo.create(dbName, tableName,
partitionValues);
+ if (customDynamicPathPattern != null) {
+ job.getConfiguration().set(HCatConstants.HCAT_DYNAMIC_CUSTOM_PATTERN,
customDynamicPathPattern);
+ }
HCatOutputFormat.setOutput(job, outputJobInfo);
job.setMapOutputKeyClass(BytesWritable.class);
@@ -313,6 +328,10 @@ public abstract class HCatMapReduceTest
Assert.assertEquals(writeCount, MapCreate.writeCount);
}
+ if (isTableExternal()) {
+ externalTableLocation = outputJobInfo.getTableInfo().getTableLocation();
+ }
+
return job;
}
Modified:
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java?rev=1568008&r1=1568007&r2=1568008&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
(original)
+++
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatDynamicPartitioned.java
Thu Feb 13 19:01:15 2014
@@ -101,7 +101,7 @@ public class TestHCatDynamicPartitioned
*/
@Test
public void testHCatDynamicPartitionedTable() throws Exception {
- runHCatDynamicPartitionedTable(true);
+ runHCatDynamicPartitionedTable(true, null);
}
/**
@@ -110,12 +110,13 @@ public class TestHCatDynamicPartitioned
*/
@Test
public void testHCatDynamicPartitionedTableMultipleTask() throws Exception {
- runHCatDynamicPartitionedTable(false);
+ runHCatDynamicPartitionedTable(false, null);
}
- protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask)
throws Exception {
+ protected void runHCatDynamicPartitionedTable(boolean asSingleMapTask,
+ String customDynamicPathPattern) throws Exception {
generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
- runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true,
asSingleMapTask);
+ runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS, true,
asSingleMapTask, customDynamicPathPattern);
runMRRead(NUM_RECORDS);
@@ -142,7 +143,8 @@ public class TestHCatDynamicPartitioned
IOException exc = null;
try {
generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
- Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS,
false);
+ Job job = runMRCreate(null, dataColumns, writeRecords, NUM_RECORDS,
false,
+ true, customDynamicPathPattern);
if (HCatUtil.isHadoop23()) {
Assert.assertTrue(job.isSuccessful()==false);
Modified:
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
URL:
http://svn.apache.org/viewvc/hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java?rev=1568008&r1=1568007&r2=1568008&view=diff
==============================================================================
---
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
(original)
+++
hive/trunk/hcatalog/core/src/test/java/org/apache/hive/hcatalog/mapreduce/TestHCatExternalDynamicPartitioned.java
Thu Feb 13 19:01:15 2014
@@ -19,6 +19,9 @@
package org.apache.hive.hcatalog.mapreduce;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
public class TestHCatExternalDynamicPartitioned extends
TestHCatDynamicPartitioned {
@Override
@@ -26,4 +29,20 @@ public class TestHCatExternalDynamicPart
return true;
}
+ @BeforeClass
+ public static void generateInputData() throws Exception {
+ tableName = "testHCatExternalDynamicPartitionedTable";
+ generateWriteRecords(NUM_RECORDS, NUM_PARTITIONS, 0);
+ generateDataColumns();
+ }
+
+ /**
+ * Run the external dynamic partitioning test but with single map task
+ * @throws Exception
+ */
+ @Test
+ public void testHCatExternalDynamicCustomLocation() throws Exception {
+ runHCatDynamicPartitionedTable(true, "mapred/externalDynamicOutput/${p1}");
+ }
+
}