This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 69b38fde32f HIVE-27944: Possible deadlock in HIVE-LLAP when reading location-based Iceberg tables (#5934) 69b38fde32f is described below commit 69b38fde32f467621803ae1fe701547aabeca571 Author: PLASH SPEED <plashsp...@foxmail.com> AuthorDate: Fri Jul 11 06:26:30 2025 +0800 HIVE-27944: Possible deadlock in HIVE-LLAP when reading location-based Iceberg tables (#5934) --- .../hadoop/hive/ql/exec/tez/SplitGrouper.java | 40 ++++------- .../hadoop/hive/ql/io/CombineHiveRecordReader.java | 15 ++-- .../hadoop/hive/ql/io/HiveFileFormatUtils.java | 79 ++++++++-------------- .../apache/hadoop/hive/ql/io/IOPrepareCache.java | 27 +++----- 4 files changed, 63 insertions(+), 98 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java index 2ce7ed88c8b..d091f0390a7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java @@ -29,11 +29,10 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; +import com.google.common.collect.Maps; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat; -import org.apache.hadoop.mapred.InputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -66,15 +65,10 @@ * also enforces restrictions around schema, file format and bucketing. */ public class SplitGrouper { - private static final Logger LOG = LoggerFactory.getLogger(SplitGrouper.class); - // TODO This needs to be looked at. Map of Map to Map... Made concurrent for now since split generation - // can happen in parallel. - private static final Map<Map<Path, PartitionDesc>, Map<Path, PartitionDesc>> cache = - new ConcurrentHashMap<>(); - private final TezMapredSplitsGrouper tezGrouper = new TezMapredSplitsGrouper(); + private final Map<Path, Path> cache = Maps.newHashMap(); /** * group splits for each bucket separately - while evenly filling all the @@ -91,7 +85,7 @@ public Multimap<Integer, InputSplit> group(Configuration conf, // allocate map bucket id to grouped splits Multimap<Integer, InputSplit> bucketGroupedSplitMultimap = - ArrayListMultimap.<Integer, InputSplit> create(); + ArrayListMultimap.create(); // use the tez grouper to combine splits once per bucket for (int bucketId : bucketSplitMultimap.keySet()) { @@ -137,9 +131,8 @@ public List<TaskLocationHint> createTaskLocationHints(InputSplit[] splits, boole String [] locations = split.getLocations(); if (locations != null && locations.length > 0) { // Worthwhile only if more than 1 split, consistentGroupingEnabled and is a FileSplit - if (consistentLocations && locations.length > 1 && split instanceof FileSplit) { + if (consistentLocations && locations.length > 1 && split instanceof FileSplit fileSplit) { Arrays.sort(locations); - FileSplit fileSplit = (FileSplit) split; Path path = fileSplit.getPath(); long startLocation = fileSplit.getStart(); int hashCode = Objects.hash(path, startLocation); @@ -153,8 +146,8 @@ public List<TaskLocationHint> createTaskLocationHints(InputSplit[] splits, boole locationHints.add(TaskLocationHint.createTaskLocationHint(locationSet, null)); } else { locationHints.add(TaskLocationHint - .createTaskLocationHint(new LinkedHashSet<String>(Arrays.asList(split - .getLocations())), null)); + .createTaskLocationHint(new LinkedHashSet<>(Arrays.asList(split + .getLocations())), null)); } } else { locationHints.add(TaskLocationHint.createTaskLocationHint(null, null)); @@ -189,16 +182,15 @@ public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf, Conf boolean isMinorCompaction = true; MapWork mapWork = populateMapWork(jobConf, inputName); // ArrayListMultimap is important here to retain the ordering for the splits. - Multimap<Integer, InputSplit> schemaGroupedSplitMultiMap = ArrayListMultimap.<Integer, InputSplit> create(); + Multimap<Integer, InputSplit> schemaGroupedSplitMultiMap = ArrayListMultimap.create(); if (HiveConf.getVar(jobConf, HiveConf.ConfVars.SPLIT_GROUPING_MODE).equalsIgnoreCase("compactor")) { List<Path> paths = Utilities.getInputPathsTez(jobConf, mapWork); for (Path path : paths) { List<String> aliases = mapWork.getPathToAliases().get(path); if ((aliases != null) && (aliases.size() == 1)) { - Operator<? extends OperatorDesc> op = mapWork.getAliasToWork().get(aliases.get(0)); - if ((op != null) && (op instanceof TableScanOperator)) { - TableScanOperator tableScan = (TableScanOperator) op; - PartitionDesc partitionDesc = mapWork.getAliasToPartnInfo().get(aliases.get(0)); + Operator<? extends OperatorDesc> op = mapWork.getAliasToWork().get(aliases.getFirst()); + if (op instanceof TableScanOperator tableScan) { + PartitionDesc partitionDesc = mapWork.getAliasToPartnInfo().get(aliases.getFirst()); isMinorCompaction &= AcidUtils.isCompactionTable(partitionDesc.getTableDesc().getProperties()); if (!tableScan.getConf().isTranscationalTable() && !isMinorCompaction) { String splitPath = getFirstSplitPath(splits); @@ -260,7 +252,7 @@ private String getFirstSplitPath(InputSplit[] splits) { Multimap<Integer, InputSplit> getCompactorSplitGroups(InputSplit[] rawSplits, Configuration conf, boolean isMinorCompaction) { // Note: For our case, this multimap will essentially contain one value (one TezGroupedSplit) per key - Multimap<Integer, InputSplit> bucketSplitMultiMap = ArrayListMultimap.<Integer, InputSplit> create(); + Multimap<Integer, InputSplit> bucketSplitMultiMap = ArrayListMultimap.create(); HiveInputFormat.HiveInputSplit[] splits = new HiveInputFormat.HiveInputSplit[rawSplits.length]; int i = 0; for (InputSplit is : rawSplits) { @@ -345,10 +337,10 @@ private Map<Integer, Integer> estimateBucketSizes(int availableSlots, float wave Map<Integer, Collection<InputSplit>> bucketSplitMap) { // mapping of bucket id to size of all splits in bucket in bytes - Map<Integer, Long> bucketSizeMap = new HashMap<Integer, Long>(); + Map<Integer, Long> bucketSizeMap = new HashMap<>(); // mapping of bucket id to number of required tasks to run - Map<Integer, Integer> bucketTaskMap = new HashMap<Integer, Integer>(); + Map<Integer, Integer> bucketTaskMap = new HashMap<>(); // TODO HIVE-12255. Make use of SplitSizeEstimator. // The actual task computation needs to be looked at as well. @@ -362,12 +354,11 @@ private Map<Integer, Integer> estimateBucketSizes(int availableSlots, float wave // the case of SMB join. So in this case, we can do an early exit by not doing the // calculation for bucketSizeMap. Each bucket will assume it can fill availableSlots * waves // (preset to 0.5) for SMB join. - if (!(s instanceof FileSplit)) { + if (!(s instanceof FileSplit fsplit)) { bucketTaskMap.put(bucketId, (int) (availableSlots * waves)); earlyExit = true; continue; } - FileSplit fsplit = (FileSplit) s; size += fsplit.getLength(); totalSize += fsplit.getLength(); } @@ -413,7 +404,7 @@ private static MapWork populateMapWork(JobConf jobConf, String inputName) { } private boolean schemaEvolved(InputSplit s, InputSplit prevSplit, boolean groupAcrossFiles, - MapWork work) throws IOException { + MapWork work) throws IOException { boolean retval = false; Path path = ((FileSplit) s).getPath(); PartitionDesc pd = HiveFileFormatUtils.getFromPathRecursively( @@ -434,7 +425,6 @@ private boolean schemaEvolved(InputSplit s, InputSplit prevSplit, boolean groupA previousDeserializerClass = prevPD.getDeserializerClassName(); previousInputFormatClass = prevPD.getInputFileFormatClass(); } - if ((currentInputFormatClass != previousInputFormatClass) || (!currentDeserializerClass.equals(previousDeserializerClass))) { retval = true; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java index c0118d5e3d8..d483646d724 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/CombineHiveRecordReader.java @@ -42,6 +42,8 @@ import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.lib.CombineFileSplit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * CombineHiveRecordReader. @@ -51,7 +53,7 @@ */ public class CombineHiveRecordReader<K extends WritableComparable, V extends Writable> extends HiveContextAwareRecordReader<K, V> { - private org.slf4j.Logger LOG = org.slf4j.LoggerFactory.getLogger(CombineHiveRecordReader.class); + private static final Logger LOG = LoggerFactory.getLogger(CombineHiveRecordReader.class); private Map<Path, PartitionDesc> pathToPartInfo; @@ -104,17 +106,16 @@ public CombineHiveRecordReader(InputSplit split, Configuration conf, //If current split is from the same file as preceding split and the preceding split has footerbuffer, //the current split should use the preceding split's footerbuffer in order to skip footer correctly. - if (preReader != null && preReader instanceof CombineHiveRecordReader - && ((CombineHiveRecordReader)preReader).getFooterBuffer() != null) { + if (preReader instanceof CombineHiveRecordReader + && ((CombineHiveRecordReader) preReader).getFooterBuffer() != null) { if (partition != 0 && hsplit.getPaths()[partition -1].equals(hsplit.getPaths()[partition])) this.setFooterBuffer(((CombineHiveRecordReader)preReader).getFooterBuffer()); } - } private PartitionDesc extractSinglePartSpec(CombineHiveInputSplit hsplit) throws IOException { PartitionDesc part = null; - Map<Map<Path,PartitionDesc>, Map<Path,PartitionDesc>> cache = new HashMap<>(); + Map<Path, Path> cache = new HashMap<>(); for (Path path : hsplit.getPaths()) { PartitionDesc otherPart = HiveFileFormatUtils.getFromPathRecursively( pathToPartInfo, path, cache); @@ -147,12 +148,12 @@ public void doClose() throws IOException { @Override public K createKey() { - return (K) recordReader.createKey(); + return recordReader.createKey(); } @Override public V createValue() { - return (V) recordReader.createValue(); + return recordReader.createValue(); } @Override diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java index 20fde98c264..bd3970f8b7a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java @@ -130,10 +130,6 @@ private FileChecker() { .build(); } - public Set<Class<? extends InputFormat>> registeredClasses() { - return inputFormatCheckerMap.keySet(); - } - public Set<Class<? extends InputFormat>> registeredTextClasses() { return textInputFormatCheckerMap.keySet(); } @@ -179,7 +175,6 @@ public static Class<? extends OutputFormat> getOutputFormatSubstitute( * * Note: an empty set of files is considered compliant. */ - @SuppressWarnings("unchecked") public static boolean checkInputFormat(FileSystem fs, HiveConf conf, Class<? extends InputFormat> inputFormatCls, List<FileStatus> files) throws HiveException { @@ -202,7 +197,7 @@ public static boolean checkInputFormat(FileSystem fs, HiveConf conf, .getInputFormatCheckerInstance(checkerCls); try { if (checkerInstance == null) { - checkerInstance = checkerCls.newInstance(); + checkerInstance = checkerCls.getDeclaredConstructor().newInstance(); FileChecker.getInstance().putInputFormatCheckerInstance(checkerCls, checkerInstance); } return checkerInstance.validateInput(fs, conf, files); @@ -213,7 +208,6 @@ public static boolean checkInputFormat(FileSystem fs, HiveConf conf, return true; } - @SuppressWarnings("unchecked") private static boolean checkTextInputFormat(FileSystem fs, HiveConf conf, List<FileStatus> files) throws HiveException { List<FileStatus> files2 = new LinkedList<>(files); @@ -270,13 +264,13 @@ public static RecordWriter getHiveRecordWriter(JobConf jc, if (isCompressed) { jc_output = new JobConf(jc); String codecStr = conf.getCompressCodec(); - if (codecStr != null && !codecStr.trim().equals("")) { + if (codecStr != null && !codecStr.trim().isEmpty()) { Class<? extends CompressionCodec> codec = JavaUtils.loadClass(codecStr); FileOutputFormat.setOutputCompressorClass(jc_output, codec); } String type = conf.getCompressType(); - if (type != null && !type.trim().equals("")) { + if (type != null && !type.trim().isEmpty()) { CompressionType style = CompressionType.valueOf(type); SequenceFileOutputFormat.setOutputCompressionType(jc, style); } @@ -288,18 +282,16 @@ public static RecordWriter getHiveRecordWriter(JobConf jc, } } - public static HiveOutputFormat<?, ?> getHiveOutputFormat(Configuration conf, TableDesc tableDesc) - throws HiveException { + public static HiveOutputFormat<?, ?> getHiveOutputFormat(Configuration conf, TableDesc tableDesc) { return getHiveOutputFormat(conf, tableDesc.getOutputFileFormatClass()); } - public static HiveOutputFormat<?, ?> getHiveOutputFormat(Configuration conf, PartitionDesc partDesc) - throws HiveException { + public static HiveOutputFormat<?, ?> getHiveOutputFormat(Configuration conf, PartitionDesc partDesc) { return getHiveOutputFormat(conf, partDesc.getOutputFileFormatClass()); } private static HiveOutputFormat<?, ?> getHiveOutputFormat( - Configuration conf, Class<? extends OutputFormat> outputClass) throws HiveException { + Configuration conf, Class<? extends OutputFormat> outputClass) { OutputFormat<?, ?> outputFormat = ReflectionUtil.newInstance(outputClass, conf); if (!(outputFormat instanceof HiveOutputFormat)) { outputFormat = new HivePassThroughOutputFormat(outputFormat); @@ -307,11 +299,6 @@ public static RecordWriter getHiveRecordWriter(JobConf jc, return (HiveOutputFormat<?, ?>) outputFormat; } - public static RecordUpdater getAcidRecordUpdater(JobConf jc, TableDesc tableInfo, int bucket, FileSinkDesc conf, - Path outPath, ObjectInspector inspector, Reporter reporter, int rowIdColNum) throws HiveException, IOException { - return getAcidRecordUpdater(jc, tableInfo, bucket, conf, outPath, inspector, reporter, rowIdColNum, null); - } - public static RecordUpdater getAcidRecordUpdater(JobConf jc, TableDesc tableInfo, int bucket, FileSinkDesc conf, Path outPath, ObjectInspector inspector, @@ -320,7 +307,7 @@ public static RecordUpdater getAcidRecordUpdater(JobConf jc, TableDesc tableInfo HiveOutputFormat<?, ?> hiveOutputFormat = getHiveOutputFormat(jc, tableInfo); AcidOutputFormat<?, ?> acidOutputFormat = null; if (hiveOutputFormat instanceof AcidOutputFormat) { - acidOutputFormat = (AcidOutputFormat)hiveOutputFormat; + acidOutputFormat = (AcidOutputFormat) hiveOutputFormat; } else { throw new HiveException("Unable to create RecordUpdater for HiveOutputFormat that does not " + "implement AcidOutputFormat"); @@ -360,37 +347,26 @@ private static RecordUpdater getRecordUpdater(JobConf jc, } public static <T> T getFromPathRecursively(Map<Path, T> pathToPartitionInfo, Path dir, - Map<Map<Path, T>, Map<Path, T>> cacheMap) throws IOException { + Map<Path, Path> cacheMap) throws IOException { return getFromPathRecursively(pathToPartitionInfo, dir, cacheMap, false); } public static <T> T getFromPathRecursively(Map<Path, T> pathToPartitionInfo, Path dir, - Map<Map<Path, T>, Map<Path, T>> cacheMap, boolean ignoreSchema) throws IOException { + Map<Path, Path> cacheMap, boolean ignoreSchema) throws IOException { return getFromPathRecursively(pathToPartitionInfo, dir, cacheMap, ignoreSchema, false); } public static <T> T getFromPathRecursively(Map<Path, T> pathToPartitionInfo, Path dir, - Map<Map<Path, T>, Map<Path, T>> cacheMap, boolean ignoreSchema, boolean ifPresent) + Map<Path, Path> cacheMap, boolean ignoreSchema, boolean ifPresent) throws IOException { T part = getFromPath(pathToPartitionInfo, dir); if (part == null && (ignoreSchema - || (dir.toUri().getScheme() == null || dir.toUri().getScheme().trim().equals("")) + || (dir.toUri().getScheme() == null || dir.toUri().getScheme().trim().isEmpty()) || FileUtils.pathsContainNoScheme(pathToPartitionInfo.keySet()))) { - Map<Path, T> newPathToPartitionInfo = null; - if (cacheMap != null) { - newPathToPartitionInfo = cacheMap.get(pathToPartitionInfo); - } - - if (newPathToPartitionInfo == null) { // still null - newPathToPartitionInfo = populateNewT(pathToPartitionInfo); - - if (cacheMap != null) { - cacheMap.put(pathToPartitionInfo, newPathToPartitionInfo); - } - } + Map<Path, T> newPathToPartitionInfo = populateNewT(pathToPartitionInfo, cacheMap); part = getFromPath(newPathToPartitionInfo, dir); } if (part != null || ifPresent) { @@ -401,13 +377,21 @@ public static <T> T getFromPathRecursively(Map<Path, T> pathToPartitionInfo, Pat } } - private static <T> Map<Path, T> populateNewT(Map<Path, T> pathToPartitionInfo) { - Map<Path, T> newPathToPartitionInfo = new HashMap<>(); - for (Map.Entry<Path, T> entry: pathToPartitionInfo.entrySet()) { - T partDesc = entry.getValue(); - Path pathOnly = Path.getPathWithoutSchemeAndAuthority(entry.getKey()); + private static <T> Map<Path, T> populateNewT(Map<Path, T> pathToPartitionInfo, + Map<Path, Path> cacheMap) { + Map<Path, T> newPathToPartitionInfo = new HashMap<>(pathToPartitionInfo.size()); + + pathToPartitionInfo.forEach((originalPath, partDesc) -> { + Path pathOnly = cacheMap != null ? + cacheMap.get(originalPath) : null; + if (pathOnly == null) { + pathOnly = Path.getPathWithoutSchemeAndAuthority(originalPath); + if (cacheMap != null) { + cacheMap.put(originalPath, pathOnly); + } + } newPathToPartitionInfo.put(pathOnly, partDesc); - } + }); return newPathToPartitionInfo; } @@ -416,9 +400,9 @@ private static <T> T getFromPath( // We first do exact match, and then do prefix matching. The latter is due to input dir // could be /dir/ds='2001-02-21'/part-03 where part-03 is not part of partition - Path path = FileUtils.getParentRegardlessOfScheme(dir,pathToPartitionInfo.keySet()); + Path path = FileUtils.getParentRegardlessOfScheme(dir, pathToPartitionInfo.keySet()); - if(path == null) { + if (path == null) { // FIXME: old implementation returned null; exception maybe? return null; } @@ -427,10 +411,7 @@ private static <T> T getFromPath( private static boolean foundAlias(Map<Path, List<String>> pathToAliases, Path path) { List<String> aliases = pathToAliases.get(path); - if ((aliases == null) || (aliases.isEmpty())) { - return false; - } - return true; + return (aliases != null) && !aliases.isEmpty(); } private static Path getMatchingPath(Map<Path, List<String>> pathToAliases, Path dir) { @@ -467,7 +448,7 @@ private static Path getMatchingPath(Map<Path, List<String>> pathToAliases, Path **/ public static List<Operator<? extends OperatorDesc>> doGetWorksFromPath(Map<Path, List<String>> pathToAliases, Map<String, Operator<? extends OperatorDesc>> aliasToWork, Path dir) { - List<Operator<? extends OperatorDesc>> opList = new ArrayList<Operator<? extends OperatorDesc>>(); + List<Operator<? extends OperatorDesc>> opList = new ArrayList<>(); List<String> aliases = doGetAliasesFromPath(pathToAliases, dir); for (String alias : aliases) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/IOPrepareCache.java b/ql/src/java/org/apache/hadoop/hive/ql/io/IOPrepareCache.java index 312c4b02850..c5f0f8f7100 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/IOPrepareCache.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/IOPrepareCache.java @@ -22,7 +22,6 @@ import java.util.Map; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.plan.PartitionDesc; /** * IOPrepareCache is used to cache pre-query io-related objects. @@ -31,7 +30,8 @@ */ public class IOPrepareCache { - private static ThreadLocal<IOPrepareCache> threadLocalIOPrepareCache = new ThreadLocal<IOPrepareCache>(); + private static final ThreadLocal<IOPrepareCache> threadLocalIOPrepareCache = + new ThreadLocal<>(); public static IOPrepareCache get() { IOPrepareCache cache = IOPrepareCache.threadLocalIOPrepareCache.get(); @@ -39,32 +39,25 @@ public static IOPrepareCache get() { threadLocalIOPrepareCache.set(new IOPrepareCache()); cache = IOPrepareCache.threadLocalIOPrepareCache.get(); } - return cache; } - public void clear() { - if(partitionDescMap != null) { - partitionDescMap.clear(); - } - } - - private Map<Map<Path, PartitionDesc>, Map<Path, PartitionDesc>> partitionDescMap; + private Map<Path, Path> partitionDescMap; - public Map<Map<Path, PartitionDesc>, Map<Path, PartitionDesc>> allocatePartitionDescMap() { + public Map<Path, Path> allocatePartitionDescMap() { if (partitionDescMap == null) { partitionDescMap = new HashMap<>(); } return partitionDescMap; } - public Map<Map<Path, PartitionDesc>, Map<Path, PartitionDesc>> getPartitionDescMap() { + public Map<Path, Path> getPartitionDescMap() { return partitionDescMap; } - public void setPartitionDescMap( - Map<Map<Path, PartitionDesc>, Map<Path, PartitionDesc>> partitionDescMap) { - this.partitionDescMap = partitionDescMap; - } - + public void clear() { + if (partitionDescMap != null) { + partitionDescMap.clear(); + } + } }