Repository: hive Updated Branches: refs/heads/master e759bbaf2 -> 4c7f2d93c
HIVE-15882: HS2 generating high memory pressure with many partitions and concurrent queries (Misha Dmitriev, reviewed by Sahil, Mohit, Vihang and Rui) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4c7f2d93 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4c7f2d93 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4c7f2d93 Branch: refs/heads/master Commit: 4c7f2d93ca97504ba9ec6fb2f7a7f607ceb35eb5 Parents: e759bba Author: Misha Dmitriev <[email protected]> Authored: Thu Mar 2 13:26:12 2017 +0800 Committer: Rui Li <[email protected]> Committed: Thu Mar 2 13:26:12 2017 +0800 ---------------------------------------------------------------------- .../hadoop/hive/common/StringInternUtils.java | 144 +++++++++++++++++++ .../apache/hadoop/hive/ql/exec/Utilities.java | 11 +- .../org/apache/hadoop/hive/ql/hooks/Entity.java | 4 + .../hive/ql/io/CombineHiveInputFormat.java | 4 +- .../hadoop/hive/ql/io/HiveInputFormat.java | 3 +- .../hadoop/hive/ql/io/SymbolicInputFormat.java | 8 +- .../hadoop/hive/ql/lockmgr/HiveLockObject.java | 14 +- .../hadoop/hive/ql/metadata/Partition.java | 6 +- .../apache/hadoop/hive/ql/metadata/Table.java | 2 +- .../hive/ql/optimizer/GenMapRedUtils.java | 20 +-- .../physical/GenMRSkewJoinProcessor.java | 16 ++- .../physical/NullScanTaskDispatcher.java | 7 +- .../ql/plan/ConditionalResolverMergeFiles.java | 5 +- .../org/apache/hadoop/hive/ql/plan/MapWork.java | 16 ++- .../apache/hadoop/hive/ql/plan/MsckDesc.java | 4 +- .../hadoop/hive/ql/plan/PartitionDesc.java | 19 +-- 16 files changed, 227 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/4c7f2d93/common/src/java/org/apache/hadoop/hive/common/StringInternUtils.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/common/StringInternUtils.java b/common/src/java/org/apache/hadoop/hive/common/StringInternUtils.java new file mode 100644 index 0000000..c729991 --- /dev/null +++ b/common/src/java/org/apache/hadoop/hive/common/StringInternUtils.java @@ -0,0 +1,144 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.common; + +import org.apache.hadoop.fs.Path; + +import java.lang.reflect.Field; +import java.net.URI; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; + +/** + * Collection of utilities for string interning, common across Hive. + * We use the standard String.intern() call, that performs very well + * (no problems with PermGen overflowing, etc.) starting from JDK 7. + */ +public class StringInternUtils { + + // When a URI instance is initialized, it creates a bunch of private String + // fields, never bothering about their possible duplication. It would be + // best if we could tell URI constructor to intern these strings right away. + // Without this option, we can only use reflection to "fix" strings in these + // fields after a URI has been created. + private static Class uriClass = URI.class; + private static Field stringField, schemeField, authorityField, hostField, pathField, + fragmentField, schemeSpecificPartField; + + static { + try { + stringField = uriClass.getDeclaredField("string"); + schemeField = uriClass.getDeclaredField("scheme"); + authorityField = uriClass.getDeclaredField("authority"); + hostField = uriClass.getDeclaredField("host"); + pathField = uriClass.getDeclaredField("path"); + fragmentField = uriClass.getDeclaredField("fragment"); + schemeSpecificPartField = uriClass.getDeclaredField("schemeSpecificPart"); + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } + + // Note that the calls below will throw an exception if a Java SecurityManager + // is installed and configured to forbid invoking setAccessible(). In practice + // this is not a problem in Hive. + stringField.setAccessible(true); + schemeField.setAccessible(true); + authorityField.setAccessible(true); + hostField.setAccessible(true); + pathField.setAccessible(true); + fragmentField.setAccessible(true); + schemeSpecificPartField.setAccessible(true); + } + + public static URI internStringsInUri(URI uri) { + if (uri == null) return null; + try { + String string = (String) stringField.get(uri); + if (string != null) stringField.set(uri, string.intern()); + String scheme = (String) schemeField.get(uri); + if (scheme != null) schemeField.set(uri, scheme.intern()); + String authority = (String) authorityField.get(uri); + if (authority != null) authorityField.set(uri, authority.intern()); + String host = (String) hostField.get(uri); + if (host != null) hostField.set(uri, host.intern()); + String path = (String) pathField.get(uri); + if (path != null) pathField.set(uri, path.intern()); + String fragment = (String) fragmentField.get(uri); + if (fragment != null) fragmentField.set(uri, fragment.intern()); + String schemeSpecificPart = (String) schemeSpecificPartField.get(uri); + if (schemeSpecificPart != null) schemeSpecificPartField.set(uri, schemeSpecificPart.intern()); + } catch (Exception e) { + throw new RuntimeException(e); + } + return uri; + } + + public static Path internUriStringsInPath(Path path) { + if (path != null) internStringsInUri(path.toUri()); + return path; + } + + public static Path[] internUriStringsInPathArray(Path[] paths) { + if (paths != null) { + for (Path path : paths) { + internUriStringsInPath(path); + } + } + return paths; + } + + /** + * This method interns all the strings in the given list in place. That is, + * it iterates over the list, replaces each element with the interned copy + * and eventually returns the same list. + */ + public static List<String> internStringsInList(List<String> list) { + if (list != null) { + ListIterator<String> it = list.listIterator(); + while (it.hasNext()) { + it.set(it.next().intern()); + } + } + return list; + } + + /** Interns all the strings in the given array in place, returning the same array */ + public static String[] internStringsInArray(String[] strings) { + for (int i = 0; i < strings.length; i++) { + if (strings[i] != null) { + strings[i] = strings[i].intern(); + } + } + return strings; + } + + public static <K> Map<K, String> internValuesInMap(Map<K, String> map) { + if (map != null) { + for (K key : map.keySet()) { + String value = map.get(key); + if (value != null) { + map.put(key, value.intern()); + } + } + } + return map; + } + + public static String internIfNotNull(String s) { + if (s != null) s = s.intern(); + return s; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/4c7f2d93/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 906b4db..6693134 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -43,6 +43,7 @@ import org.apache.hadoop.hive.common.HiveInterruptUtils; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StatsSetupConst; +import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.MetaStoreUtils; @@ -3042,6 +3043,7 @@ public final class Utilities { continue; } + StringInternUtils.internUriStringsInPath(file); pathsProcessed.add(file); if (LOG.isDebugEnabled()) { @@ -3150,7 +3152,7 @@ public final class Utilities { } recWriter.close(false); - return newPath; + return StringInternUtils.internUriStringsInPath(newPath); } @SuppressWarnings("rawtypes") @@ -3173,15 +3175,13 @@ public final class Utilities { boolean oneRow = partDesc.getInputFileFormatClass() == OneNullRowInputFormat.class; - Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job, - props, oneRow); + Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job, props, oneRow); if (LOG.isInfoEnabled()) { LOG.info("Changed input file " + strPath + " to empty file " + newPath + " (" + oneRow + ")"); } // update the work - String strNewPath = newPath.toString(); work.addPathToAlias(newPath, work.getPathToAliases().get(path)); work.removePathToAlias(path); @@ -3206,8 +3206,7 @@ public final class Utilities { Properties props = tableDesc.getProperties(); HiveOutputFormat outFileFormat = HiveFileFormatUtils.getHiveOutputFormat(job, tableDesc); - Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job, - props, false); + Path newPath = createEmptyFile(hiveScratchDir, outFileFormat, job, props, false); if (LOG.isInfoEnabled()) { LOG.info("Changed input file for alias " + alias + " to " + newPath); http://git-wip-us.apache.org/repos/asf/hive/blob/4c7f2d93/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java index 0842066..131c1e1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java @@ -327,6 +327,10 @@ public class Entity implements Serializable { } private String computeName() { + return doComputeName().intern(); + } + + private String doComputeName() { switch (typ) { case DATABASE: return "database:" + database.getName(); http://git-wip-us.apache.org/repos/asf/hive/blob/4c7f2d93/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java index e91064b..7a113bf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveInputFormat.java @@ -35,6 +35,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hive.common.StringInternUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -340,7 +341,7 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ // combine splits only from same tables and same partitions. Do not combine splits from multiple // tables or multiple partitions. - Path[] paths = combine.getInputPathsShim(job); + Path[] paths = StringInternUtils.internUriStringsInPathArray(combine.getInputPathsShim(job)); List<Path> inpDirs = new ArrayList<Path>(); List<Path> inpFiles = new ArrayList<Path>(); @@ -660,6 +661,7 @@ public class CombineHiveInputFormat<K extends WritableComparable, V extends Writ Map<Path, ArrayList<String>> result = new HashMap<>(); for (Map.Entry <Path, ArrayList<String>> entry : pathToAliases.entrySet()) { Path newKey = Path.getPathWithoutSchemeAndAuthority(entry.getKey()); + StringInternUtils.internUriStringsInPath(newKey); result.put(newKey, entry.getValue()); } return result; http://git-wip-us.apache.org/repos/asf/hive/blob/4c7f2d93/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 51530ac..1cb9557 100755 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.Map.Entry; +import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +58,6 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc; -import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorDeserializeType; import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorMapOperatorReadType; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; @@ -454,6 +454,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable> } } } + StringInternUtils.internUriStringsInPathArray(dirs); return dirs; } http://git-wip-us.apache.org/repos/asf/hive/blob/4c7f2d93/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java index 55b3b55..b534e35 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/SymbolicInputFormat.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; +import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; @@ -75,10 +76,11 @@ public class SymbolicInputFormat implements ReworkMapredInputFormat { // no check for the line? How to check? // if the line is invalid for any reason, the job will fail. FileStatus[] matches = fileSystem.globStatus(new Path(line)); - for(FileStatus fileStatus :matches) { + for (FileStatus fileStatus : matches) { Path schemaLessPath = Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath()); - toAddPathToPart.put(schemaLessPath, partDesc); - pathToAliases.put(schemaLessPath, aliases); + StringInternUtils.internUriStringsInPath(schemaLessPath); + toAddPathToPart.put(schemaLessPath, partDesc); + pathToAliases.put(schemaLessPath, aliases); } } } finally { http://git-wip-us.apache.org/repos/asf/hive/blob/4c7f2d93/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java index 82dc898..fff03df 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveLockObject.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.ql.metadata.DummyPartition; import org.apache.hadoop.hive.ql.metadata.Hive; @@ -53,9 +54,10 @@ public class HiveLockObject { String lockMode, String queryStr) { this.queryId = removeDelimiter(queryId); - this.lockTime = removeDelimiter(lockTime); + this.lockTime = StringInternUtils.internIfNotNull(removeDelimiter(lockTime)); this.lockMode = removeDelimiter(lockMode); - this.queryStr = removeDelimiter(queryStr == null ? null : queryStr.trim()); + this.queryStr = StringInternUtils.internIfNotNull( + removeDelimiter(queryStr == null ? null : queryStr.trim())); } /** @@ -71,9 +73,9 @@ public class HiveLockObject { String[] elem = data.split(":"); queryId = elem[0]; - lockTime = elem[1]; + lockTime = StringInternUtils.internIfNotNull(elem[1]); lockMode = elem[2]; - queryStr = elem[3]; + queryStr = StringInternUtils.internIfNotNull(elem[3]); if (elem.length >= 5) { clientIp = elem[4]; } @@ -178,12 +180,12 @@ public class HiveLockObject { public HiveLockObject(String path, HiveLockObjectData lockData) { this.pathNames = new String[1]; - this.pathNames[0] = path; + this.pathNames[0] = StringInternUtils.internIfNotNull(path); this.data = lockData; } public HiveLockObject(String[] paths, HiveLockObjectData lockData) { - this.pathNames = paths; + this.pathNames = StringInternUtils.internStringsInArray(paths); this.data = lockData; } http://git-wip-us.apache.org/repos/asf/hive/blob/4c7f2d93/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java index c0edde9..bff1688 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import org.apache.hadoop.hive.common.StringInternUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FileStatus; @@ -156,7 +157,7 @@ public class Partition implements Serializable { org.apache.hadoop.hive.metastore.api.Partition tPartition) throws HiveException { this.table = table; - this.tPartition = tPartition; + setTPartition(tPartition); if (table.isView()) { return; @@ -458,6 +459,7 @@ public class Partition implements Serializable { */ public void setTPartition( org.apache.hadoop.hive.metastore.api.Partition partition) { + StringInternUtils.internStringsInList(partition.getValues()); tPartition = partition; } @@ -522,7 +524,7 @@ public class Partition implements Serializable { throw new HiveException( "partition spec is invalid. field.getName() does not exist in input."); } - pvals.add(val); + pvals.add(val.intern()); } tPartition.setValues(pvals); } http://git-wip-us.apache.org/repos/asf/hive/blob/4c7f2d93/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java index c6ae6f2..3e771ad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java @@ -865,7 +865,7 @@ public class Table implements Serializable { List<FieldSchema> fsl = getPartCols(); List<String> tpl = tp.getValues(); - LinkedHashMap<String, String> spec = new LinkedHashMap<String, String>(); + LinkedHashMap<String, String> spec = new LinkedHashMap<String, String>(fsl.size()); for (int i = 0; i < fsl.size(); i++) { FieldSchema fs = fsl.get(i); String value = tpl.get(i); http://git-wip-us.apache.org/repos/asf/hive/blob/4c7f2d93/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 24d1681..0e67ea6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -38,6 +38,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.BlobStorageUtils; +import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; @@ -249,9 +250,11 @@ public final class GenMapRedUtils { TableDesc tt_desc = tt_descLst.get(pos); MapWork mWork = plan.getMapWork(); if (mWork.getPathToAliases().get(taskTmpDir) == null) { - mWork.removePathToAlias(new Path(taskTmpDir)); - mWork.addPathToAlias(new Path(taskTmpDir),taskTmpDir); - mWork.addPathToPartitionInfo(new Path(taskTmpDir), new PartitionDesc(tt_desc, null)); + taskTmpDir = taskTmpDir.intern(); + Path taskTmpDirPath = StringInternUtils.internUriStringsInPath(new Path(taskTmpDir)); + mWork.removePathToAlias(taskTmpDirPath); + mWork.addPathToAlias(taskTmpDirPath, taskTmpDir); + mWork.addPathToPartitionInfo(taskTmpDirPath, new PartitionDesc(tt_desc, null)); mWork.getAliasToWork().put(taskTmpDir, topOperators.get(pos)); } } @@ -771,7 +774,7 @@ public final class GenMapRedUtils { if (topOp instanceof TableScanOperator) { try { - Utilities.addSchemaEvolutionToTableScanOperator( + Utilities.addSchemaEvolutionToTableScanOperator( (StructObjectInspector) tt_desc.getDeserializer().getObjectInspector(), (TableScanOperator) topOp); } catch (Exception e) { @@ -780,7 +783,7 @@ public final class GenMapRedUtils { } if (!local) { - plan.addPathToAlias(path,alias); + plan.addPathToAlias(path, alias); plan.addPathToPartitionInfo(path, new PartitionDesc(tt_desc, null)); plan.getAliasToWork().put(alias, topOp); } else { @@ -1543,16 +1546,17 @@ public final class GenMapRedUtils { TableScanOperator topOp, FileSinkDesc fsDesc) { ArrayList<String> aliases = new ArrayList<String>(); - Path inputDir = fsDesc.getFinalDirName(); + Path inputDir = StringInternUtils.internUriStringsInPath(fsDesc.getFinalDirName()); + String inputDirStr = inputDir.toString().intern(); TableDesc tblDesc = fsDesc.getTableInfo(); - aliases.add(inputDir.toString()); // dummy alias: just use the input path + aliases.add(inputDirStr); // dummy alias: just use the input path // constructing the default MapredWork MapredWork cMrPlan = GenMapRedUtils.getMapRedWorkFromConf(conf); MapWork cplan = cMrPlan.getMapWork(); cplan.addPathToAlias(inputDir, aliases); cplan.addPathToPartitionInfo(inputDir, new PartitionDesc(tblDesc, null)); - cplan.getAliasToWork().put(inputDir.toString(), topOp); + cplan.getAliasToWork().put(inputDirStr, topOp); cplan.setMapperCannotSpanPartns(true); return cplan; http://git-wip-us.apache.org/repos/asf/hive/blob/4c7f2d93/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java index ede4fcb..93202c3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/GenMRSkewJoinProcessor.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.ConditionalTask; @@ -261,7 +262,7 @@ public final class GenMRSkewJoinProcessor { Operator<? extends OperatorDesc> tblScan_op = parentOps[i]; ArrayList<String> aliases = new ArrayList<String>(); - String alias = src.toString(); + String alias = src.toString().intern(); aliases.add(alias); Path bigKeyDirPath = bigKeysDirMap.get(src); newPlan.addPathToAlias(bigKeyDirPath, aliases); @@ -389,18 +390,21 @@ public final class GenMRSkewJoinProcessor { private static String RESULTS = "results"; static Path getBigKeysDir(Path baseDir, Byte srcTbl) { - return new Path(baseDir, skewJoinPrefix + UNDERLINE + BIGKEYS + UNDERLINE + srcTbl); + return StringInternUtils.internUriStringsInPath( + new Path(baseDir, skewJoinPrefix + UNDERLINE + BIGKEYS + UNDERLINE + srcTbl)); } static Path getBigKeysSkewJoinResultDir(Path baseDir, Byte srcTbl) { - return new Path(baseDir, skewJoinPrefix + UNDERLINE + BIGKEYS - + UNDERLINE + RESULTS + UNDERLINE + srcTbl); + return StringInternUtils.internUriStringsInPath( + new Path(baseDir, skewJoinPrefix + UNDERLINE + BIGKEYS + + UNDERLINE + RESULTS + UNDERLINE + srcTbl)); } static Path getSmallKeysDir(Path baseDir, Byte srcTblBigTbl, Byte srcTblSmallTbl) { - return new Path(baseDir, skewJoinPrefix + UNDERLINE + SMALLKEYS - + UNDERLINE + srcTblBigTbl + UNDERLINE + srcTblSmallTbl); + return StringInternUtils.internUriStringsInPath( + new Path(baseDir, skewJoinPrefix + UNDERLINE + SMALLKEYS + + UNDERLINE + srcTblBigTbl + UNDERLINE + srcTblSmallTbl)); } } http://git-wip-us.apache.org/repos/asf/hive/blob/4c7f2d93/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java index 0882ae2..fb7e18e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/NullScanTaskDispatcher.java @@ -18,10 +18,9 @@ package org.apache.hadoop.hive.ql.optimizer.physical; -import org.apache.hadoop.mapred.InputFormat; - import java.io.IOException; +import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.ZeroRowsInputFormat; @@ -35,7 +34,6 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.ServiceLoader; import java.util.Map.Entry; import java.util.Stack; @@ -125,8 +123,9 @@ public class NullScanTaskDispatcher implements Dispatcher { // Prefix partition with something to avoid it being a hidden file. Path fakePath = new Path(NullScanFileSystem.getBase() + newPartition.getTableName() + "/part" + encode(newPartition.getPartSpec())); + StringInternUtils.internUriStringsInPath(fakePath); work.addPathToPartitionInfo(fakePath, newPartition); - work.addPathToAlias(fakePath, new ArrayList<String>(allowed)); + work.addPathToAlias(fakePath, new ArrayList<>(allowed)); aliasesAffected.removeAll(allowed); if (aliasesAffected.isEmpty()) { work.removePathToAlias(path); http://git-wip-us.apache.org/repos/asf/hive/blob/4c7f2d93/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java index 68b0ad9..4266569 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ConditionalResolverMergeFiles.java @@ -317,10 +317,9 @@ public class ConditionalResolverMergeFiles implements ConditionalResolver, private PartitionDesc generateDPFullPartSpec(DynamicPartitionCtx dpCtx, FileStatus[] status, TableDesc tblDesc, int i) { - Map<String, String> fullPartSpec = new LinkedHashMap<String, String>( - dpCtx.getPartSpec()); + LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<>(dpCtx.getPartSpec()); Warehouse.makeSpecFromName(fullPartSpec, status[i].getPath()); - PartitionDesc pDesc = new PartitionDesc(tblDesc, (LinkedHashMap) fullPartSpec); + PartitionDesc pDesc = new PartitionDesc(tblDesc, fullPartSpec); return pDesc; } http://git-wip-us.apache.org/repos/asf/hive/blob/4c7f2d93/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java index d4bdd96..2120400 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.plan; +import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -169,6 +170,9 @@ public class MapWork extends BaseWork { } public void setPathToAliases(final LinkedHashMap<Path, ArrayList<String>> pathToAliases) { + for (Path p : pathToAliases.keySet()) { + StringInternUtils.internUriStringsInPath(p); + } this.pathToAliases = pathToAliases; } @@ -179,10 +183,10 @@ public class MapWork extends BaseWork { public void addPathToAlias(Path path, String newAlias){ ArrayList<String> aliases = pathToAliases.get(path); if (aliases == null) { - aliases=new ArrayList<String>(); + aliases = new ArrayList<>(); pathToAliases.put(path, aliases); } - aliases.add(newAlias); + aliases.add(newAlias.intern()); } @@ -391,10 +395,11 @@ public class MapWork extends BaseWork { @SuppressWarnings("nls") public void addMapWork(Path path, String alias, Operator<?> work, PartitionDesc pd) { + StringInternUtils.internUriStringsInPath(path); ArrayList<String> curAliases = pathToAliases.get(path); if (curAliases == null) { assert (pathToPartitionInfo.get(path) == null); - curAliases = new ArrayList<String>(); + curAliases = new ArrayList<>(); pathToAliases.put(path, curAliases); pathToPartitionInfo.put(path, pd); } else { @@ -425,6 +430,7 @@ public class MapWork extends BaseWork { public void resolveDynamicPartitionStoredAsSubDirsMerge(HiveConf conf, Path path, TableDesc tblDesc, ArrayList<String> aliases, PartitionDesc partDesc) { + StringInternUtils.internUriStringsInPath(path); pathToAliases.put(path, aliases); pathToPartitionInfo.put(path, partDesc); } @@ -491,9 +497,11 @@ public class MapWork extends BaseWork { } public void mergeAliasedInput(String alias, Path pathDir, PartitionDesc partitionInfo) { + StringInternUtils.internUriStringsInPath(pathDir); + alias = alias.intern(); ArrayList<String> aliases = pathToAliases.get(pathDir); if (aliases == null) { - aliases = new ArrayList<String>(Arrays.asList(alias)); + aliases = new ArrayList<>(Arrays.asList(alias)); pathToAliases.put(pathDir, aliases); pathToPartitionInfo.put(pathDir, partitionInfo); } else { http://git-wip-us.apache.org/repos/asf/hive/blob/4c7f2d93/ql/src/java/org/apache/hadoop/hive/ql/plan/MsckDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MsckDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MsckDesc.java index b7a7e4b..68a0164 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MsckDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MsckDesc.java @@ -59,8 +59,8 @@ public class MsckDesc extends DDLWork implements Serializable { super(); this.tableName = tableName; this.partSpecs = new ArrayList<LinkedHashMap<String, String>>(partSpecs.size()); - for (int i = 0; i < partSpecs.size(); i++) { - this.partSpecs.add(new LinkedHashMap<String, String>(partSpecs.get(i))); + for (Map<String, String> partSpec : partSpecs) { + this.partSpecs.add(new LinkedHashMap<>(partSpec)); } this.resFile = resFile.toString(); this.repairPartitions = repairPartitions; http://git-wip-us.apache.org/repos/asf/hive/blob/4c7f2d93/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java index 73981e8..d05c1c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java @@ -29,6 +29,7 @@ import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.StringInternUtils; import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -73,7 +74,7 @@ public class PartitionDesc implements Serializable, Cloneable { private VectorPartitionDesc vectorPartitionDesc; public void setBaseFileName(String baseFileName) { - this.baseFileName = baseFileName; + this.baseFileName = baseFileName.intern(); } public PartitionDesc() { @@ -81,12 +82,12 @@ public class PartitionDesc implements Serializable, Cloneable { public PartitionDesc(final TableDesc table, final LinkedHashMap<String, String> partSpec) { this.tableDesc = table; - this.partSpec = partSpec; + setPartSpec(partSpec); } public PartitionDesc(final Partition part) throws HiveException { PartitionDescConstructorHelper(part, getTableDesc(part.getTable()), true); - if(Utilities.isInputFileFormatSelfDescribing(this)) { + if (Utilities.isInputFileFormatSelfDescribing(this)) { // if IF is self describing no need to send column info per partition, since its not used anyway. Table tbl = part.getTable(); setProperties(MetaStoreUtils.getSchemaWithoutCols(part.getTPartition().getSd(), part.getTPartition().getSd(), @@ -107,7 +108,7 @@ public class PartitionDesc implements Serializable, Cloneable { public PartitionDesc(final Partition part,final TableDesc tblDesc, boolean usePartSchemaProperties) throws HiveException { - PartitionDescConstructorHelper(part,tblDesc, usePartSchemaProperties); + PartitionDescConstructorHelper(part, tblDesc, usePartSchemaProperties); //We use partition schema properties to set the partition descriptor properties // if usePartSchemaProperties is set to true. if (usePartSchemaProperties) { @@ -121,7 +122,7 @@ public class PartitionDesc implements Serializable, Cloneable { private void PartitionDescConstructorHelper(final Partition part,final TableDesc tblDesc, boolean setInputFileFormat) throws HiveException { this.tableDesc = tblDesc; - this.partSpec = part.getSpec(); + setPartSpec(part.getSpec()); if (setInputFileFormat) { setInputFileFormatClass(part.getInputFormatClass()); } else { @@ -145,10 +146,11 @@ public class PartitionDesc implements Serializable, Cloneable { } public void setPartSpec(final LinkedHashMap<String, String> partSpec) { + StringInternUtils.internValuesInMap(partSpec); this.partSpec = partSpec; } - public Class<? extends InputFormat> getInputFileFormatClass() { + public Class<? extends InputFormat> getInputFileFormatClass() { if (inputFileFormatClass == null && tableDesc != null) { setInputFileFormatClass(tableDesc.getInputFileFormatClass()); } @@ -289,8 +291,7 @@ public class PartitionDesc implements Serializable, Cloneable { ret.tableDesc = (TableDesc) tableDesc.clone(); // The partition spec is not present if (partSpec != null) { - ret.partSpec = new java.util.LinkedHashMap<String, String>(); - ret.partSpec.putAll(partSpec); + ret.partSpec = new LinkedHashMap<>(partSpec); } if (vectorPartitionDesc != null) { ret.vectorPartitionDesc = vectorPartitionDesc.clone(); @@ -379,7 +380,7 @@ public class PartitionDesc implements Serializable, Cloneable { if (path == null) { return; } - baseFileName = path.getName(); + baseFileName = path.getName().intern(); } public void intern(Interner<TableDesc> interner) {
