This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.2.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 86c29d74667a439398d8a7b015b1ff1d97abe138 Author: Asish Kumar <[email protected]> AuthorDate: Fri May 15 06:50:35 2026 +0530 fix: clear Hive work map after combine split failures (#18719) * fix: clear Hive work map after combine split failures HoodieCombineHiveInputFormat cleared Hive's work map only after successful split generation, leaving ThreadLocal work state behind when split classification or generation failed. Move the cleanup into a finally block and add a regression test that forces getSplits to fail before verifying Utilities.clearWorkMapForConf is still invoked. --- .../hadoop/hive/HoodieCombineHiveInputFormat.java | 174 +++++++++++---------- .../hive/TestHoodieCombineHiveInputFormat.java | 32 ++++ 2 files changed, 120 insertions(+), 86 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java index a71695be427e..9634b7f6b097 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java @@ -324,111 +324,113 @@ public class HoodieCombineHiveInputFormat<K extends WritableComparable, V extend public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { PerfLogger perfLogger = SessionState.getPerfLogger(); perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS); - init(job); - List<InputSplit> result = new ArrayList<>(); + try { + init(job); + List<InputSplit> result = new ArrayList<>(); - Path[] paths = getInputPaths(job); + Path[] paths = getInputPaths(job); - List<Path> nonCombinablePaths = new ArrayList<>(paths.length / 2); - List<Path> combinablePaths = new ArrayList<>(paths.length / 2); + List<Path> nonCombinablePaths = new ArrayList<>(paths.length / 2); + List<Path> combinablePaths = new ArrayList<>(paths.length / 2); - int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM, - (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD)); + int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM, + (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD)); - // This check is necessary because for Spark branch, the result array from - // getInputPaths() above could be empty, and therefore numThreads could be 0. - // In that case, Executors.newFixedThreadPool will fail. - if (numThreads > 0) { - try { - Set<Integer> nonCombinablePathIndices = getNonCombinablePathIndices(job, paths, numThreads); - for (int i = 0; i < paths.length; i++) { - if (nonCombinablePathIndices.contains(i)) { - nonCombinablePaths.add(paths[i]); - } else { - combinablePaths.add(paths[i]); + // This check is necessary because for Spark branch, the result array from + // getInputPaths() above could be empty, and therefore numThreads could be 0. + // In that case, Executors.newFixedThreadPool will fail. + if (numThreads > 0) { + try { + Set<Integer> nonCombinablePathIndices = getNonCombinablePathIndices(job, paths, numThreads); + for (int i = 0; i < paths.length; i++) { + if (nonCombinablePathIndices.contains(i)) { + nonCombinablePaths.add(paths[i]); + } else { + combinablePaths.add(paths[i]); + } } + } catch (Exception e) { + LOG.error("Error checking non-combinable path", e); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); + throw new IOException(e); } - } catch (Exception e) { - LOG.error("Error checking non-combinable path", e); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); - throw new IOException(e); } - } - // Store the previous value for the path specification - String oldPaths = job.get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR); - LOG.debug("The received input paths are: [{}] against the property {}", oldPaths, - org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR); - - // Process the normal splits - if (nonCombinablePaths.size() > 0) { - FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray(new Path[0])); - InputSplit[] splits = super.getSplits(job, numSplits); - Collections.addAll(result, splits); - } + // Store the previous value for the path specification + String oldPaths = job.get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR); + LOG.debug("The received input paths are: [{}] against the property {}", oldPaths, + org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR); - // Process the combine splits - if (combinablePaths.size() > 0) { - FileInputFormat.setInputPaths(job, combinablePaths.toArray(new Path[0])); - Map<Path, PartitionDesc> pathToPartitionInfo = this.pathToPartitionInfo != null ? this.pathToPartitionInfo - : Utilities.getMapWork(job).getPathToPartitionInfo(); - InputSplit[] splits = getCombineSplits(job, numSplits, pathToPartitionInfo); - Collections.addAll(result, splits); - } + // Process the normal splits + if (nonCombinablePaths.size() > 0) { + FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray(new Path[0])); + InputSplit[] splits = super.getSplits(job, numSplits); + Collections.addAll(result, splits); + } - // Restore the old path information back - // This is just to prevent incompatibilities with previous versions Hive - // if some application depends on the original value being set. - if (oldPaths != null) { - job.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, oldPaths); - } + // Process the combine splits + if (combinablePaths.size() > 0) { + FileInputFormat.setInputPaths(job, combinablePaths.toArray(new Path[0])); + Map<Path, PartitionDesc> pathToPartitionInfo = this.pathToPartitionInfo != null ? this.pathToPartitionInfo + : Utilities.getMapWork(job).getPathToPartitionInfo(); + InputSplit[] splits = getCombineSplits(job, numSplits, pathToPartitionInfo); + Collections.addAll(result, splits); + } - // clear work from ThreadLocal after splits generated in case of thread is reused in pool. - Utilities.clearWorkMapForConf(job); + // Restore the old path information back + // This is just to prevent incompatibilities with previous versions Hive + // if some application depends on the original value being set. + if (oldPaths != null) { + job.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, oldPaths); + } - // build internal schema for the query - if (!result.isEmpty()) { - ArrayList<String> uniqTablePaths = new ArrayList<>(); - Arrays.stream(paths).forEach(path -> { - final HoodieStorage storage; - try { - FileSystem fs = path.getFileSystem(job); - storage = HoodieStorageUtils.getStorage( - HadoopFSUtils.convertToStoragePath(path), HadoopFSUtils.getStorageConf(fs.getConf())); - Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path)); - if (tablePath.isPresent()) { - uniqTablePaths.add(tablePath.get().toUri().toString()); + // build internal schema for the query + if (!result.isEmpty()) { + ArrayList<String> uniqTablePaths = new ArrayList<>(); + Arrays.stream(paths).forEach(path -> { + final HoodieStorage storage; + try { + FileSystem fs = path.getFileSystem(job); + storage = HoodieStorageUtils.getStorage( + HadoopFSUtils.convertToStoragePath(path), HadoopFSUtils.getStorageConf(fs.getConf())); + Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path)); + if (tablePath.isPresent()) { + uniqTablePaths.add(tablePath.get().toUri().toString()); + } + } catch (IOException e) { + throw new RuntimeException(e); } - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + }); - try { - for (String path : uniqTablePaths) { - HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(new HadoopStorageConfiguration(job)).build(); - TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); - String avroSchema = schemaUtil.getTableSchema().toString(); - Option<InternalSchema> internalSchema = schemaUtil.getTableInternalSchemaFromCommitMetadata(); - if (internalSchema.isPresent()) { - LOG.info("Set internal and avro schema cache with path: {}", path); - job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, avroSchema); - job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, SerDeHelper.toJson(internalSchema.get())); - } else { - // always sets up the cache so that we can distinguish with the scenario where the cache was never set(e.g. in tests). - job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, ""); - job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, ""); + try { + for (String path : uniqTablePaths) { + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(path).setConf(new HadoopStorageConfiguration(job)).build(); + TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient); + String avroSchema = schemaUtil.getTableSchema().toString(); + Option<InternalSchema> internalSchema = schemaUtil.getTableInternalSchemaFromCommitMetadata(); + if (internalSchema.isPresent()) { + LOG.info("Set internal and avro schema cache with path: {}", path); + job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, avroSchema); + job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, SerDeHelper.toJson(internalSchema.get())); + } else { + // always sets up the cache so that we can distinguish with the scenario where the cache was never set(e.g. in tests). + job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, ""); + job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, ""); + } } + } catch (Exception e) { + LOG.warn("Failed to set schema cache", e); } - } catch (Exception e) { - LOG.warn("Failed to set schema cache", e); } - } - LOG.info("Number of all splits {}", result.size()); - perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); - return result.toArray(new InputSplit[result.size()]); + LOG.info("Number of all splits {}", result.size()); + perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS); + return result.toArray(new InputSplit[result.size()]); + } finally { + // Clear work from ThreadLocal after each getSplits attempt, in case the thread is reused in a pool. + Utilities.clearWorkMapForConf(job); + } } private void processPaths(JobConf job, CombineFileInputFormatShim combine, List<CombineFileSplit> iss, Path... path) diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java index bf331eb55c84..44ceac534004 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java @@ -67,6 +67,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; import java.io.File; import java.io.IOException; @@ -80,7 +82,13 @@ import static org.apache.hadoop.hive.ql.exec.Utilities.HAS_MAP_WORK; import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_MAPPER_CLASS; import static org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { @@ -119,6 +127,30 @@ public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness { } } + @Test + public void testClearWorkMapForConfOnGetSplitsFailure() throws Exception { + StorageConfiguration<Configuration> conf = HoodieTestUtils.getDefaultStorageConf(); + File inputDir = tempDir.resolve("input").toFile(); + assertTrue(inputDir.mkdirs()); + + MapredWork mrwork = new MapredWork(); + Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString()); + Utilities.setMapRedWork(conf.unwrap(), mrwork, mapWorkPath); + JobConf jobConf = new JobConf(conf.unwrap()); + FileInputFormat.setInputPaths(jobConf, inputDir.getPath()); + jobConf.set(HAS_MAP_WORK, "true"); + jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName()); + + HoodieCombineHiveInputFormat combineHiveInputFormat = spy(new HoodieCombineHiveInputFormat()); + doThrow(new RuntimeException("path classification failed")).when(combineHiveInputFormat) + .getNonCombinablePathIndices(eq(jobConf), any(Path[].class), anyInt()); + + try (MockedStatic<Utilities> utilities = Mockito.mockStatic(Utilities.class, Mockito.CALLS_REAL_METHODS)) { + assertThrows(IOException.class, () -> combineHiveInputFormat.getSplits(jobConf, 1)); + utilities.verify(() -> Utilities.clearWorkMapForConf(jobConf)); + } + } + @Test public void testInternalSchemaCacheForMR() throws Exception { // test for HUDI-8182
