This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 071b3f125b41 fix: clear Hive work map after combine split failures
(#18719)
071b3f125b41 is described below
commit 071b3f125b410b3feb8ee90a2ee8073eec67e254
Author: Asish Kumar <[email protected]>
AuthorDate: Fri May 15 06:50:35 2026 +0530
fix: clear Hive work map after combine split failures (#18719)
* fix: clear Hive work map after combine split failures
HoodieCombineHiveInputFormat cleared Hive's work map only after successful
split generation, leaving ThreadLocal work state behind when split
classification or generation failed.
Move the cleanup into a finally block and add a regression test that forces
getSplits to fail before verifying Utilities.clearWorkMapForConf is still
invoked.
---
.../hadoop/hive/HoodieCombineHiveInputFormat.java | 174 +++++++++++----------
.../hive/TestHoodieCombineHiveInputFormat.java | 32 ++++
2 files changed, 120 insertions(+), 86 deletions(-)
diff --git
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
index a71695be427e..9634b7f6b097 100644
---
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
+++
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/hive/HoodieCombineHiveInputFormat.java
@@ -324,111 +324,113 @@ public class HoodieCombineHiveInputFormat<K extends
WritableComparable, V extend
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException
{
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
- init(job);
- List<InputSplit> result = new ArrayList<>();
+ try {
+ init(job);
+ List<InputSplit> result = new ArrayList<>();
- Path[] paths = getInputPaths(job);
+ Path[] paths = getInputPaths(job);
- List<Path> nonCombinablePaths = new ArrayList<>(paths.length / 2);
- List<Path> combinablePaths = new ArrayList<>(paths.length / 2);
+ List<Path> nonCombinablePaths = new ArrayList<>(paths.length / 2);
+ List<Path> combinablePaths = new ArrayList<>(paths.length / 2);
- int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM,
- (int) Math.ceil((double) paths.length / DEFAULT_NUM_PATH_PER_THREAD));
+ int numThreads = Math.min(MAX_CHECK_NONCOMBINABLE_THREAD_NUM,
+ (int) Math.ceil((double) paths.length /
DEFAULT_NUM_PATH_PER_THREAD));
- // This check is necessary because for Spark branch, the result array from
- // getInputPaths() above could be empty, and therefore numThreads could be
0.
- // In that case, Executors.newFixedThreadPool will fail.
- if (numThreads > 0) {
- try {
- Set<Integer> nonCombinablePathIndices =
getNonCombinablePathIndices(job, paths, numThreads);
- for (int i = 0; i < paths.length; i++) {
- if (nonCombinablePathIndices.contains(i)) {
- nonCombinablePaths.add(paths[i]);
- } else {
- combinablePaths.add(paths[i]);
+ // This check is necessary because for Spark branch, the result array
from
+ // getInputPaths() above could be empty, and therefore numThreads could
be 0.
+ // In that case, Executors.newFixedThreadPool will fail.
+ if (numThreads > 0) {
+ try {
+ Set<Integer> nonCombinablePathIndices =
getNonCombinablePathIndices(job, paths, numThreads);
+ for (int i = 0; i < paths.length; i++) {
+ if (nonCombinablePathIndices.contains(i)) {
+ nonCombinablePaths.add(paths[i]);
+ } else {
+ combinablePaths.add(paths[i]);
+ }
}
+ } catch (Exception e) {
+ LOG.error("Error checking non-combinable path", e);
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
+ throw new IOException(e);
}
- } catch (Exception e) {
- LOG.error("Error checking non-combinable path", e);
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
- throw new IOException(e);
}
- }
- // Store the previous value for the path specification
- String oldPaths =
job.get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
- LOG.debug("The received input paths are: [{}] against the property {}",
oldPaths,
- org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
-
- // Process the normal splits
- if (nonCombinablePaths.size() > 0) {
- FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray(new
Path[0]));
- InputSplit[] splits = super.getSplits(job, numSplits);
- Collections.addAll(result, splits);
- }
+ // Store the previous value for the path specification
+ String oldPaths =
job.get(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
+ LOG.debug("The received input paths are: [{}] against the property {}",
oldPaths,
+ org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR);
- // Process the combine splits
- if (combinablePaths.size() > 0) {
- FileInputFormat.setInputPaths(job, combinablePaths.toArray(new Path[0]));
- Map<Path, PartitionDesc> pathToPartitionInfo = this.pathToPartitionInfo
!= null ? this.pathToPartitionInfo
- : Utilities.getMapWork(job).getPathToPartitionInfo();
- InputSplit[] splits = getCombineSplits(job, numSplits,
pathToPartitionInfo);
- Collections.addAll(result, splits);
- }
+ // Process the normal splits
+ if (nonCombinablePaths.size() > 0) {
+ FileInputFormat.setInputPaths(job, nonCombinablePaths.toArray(new
Path[0]));
+ InputSplit[] splits = super.getSplits(job, numSplits);
+ Collections.addAll(result, splits);
+ }
- // Restore the old path information back
- // This is just to prevent incompatibilities with previous versions Hive
- // if some application depends on the original value being set.
- if (oldPaths != null) {
- job.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
oldPaths);
- }
+ // Process the combine splits
+ if (combinablePaths.size() > 0) {
+ FileInputFormat.setInputPaths(job, combinablePaths.toArray(new
Path[0]));
+ Map<Path, PartitionDesc> pathToPartitionInfo =
this.pathToPartitionInfo != null ? this.pathToPartitionInfo
+ : Utilities.getMapWork(job).getPathToPartitionInfo();
+ InputSplit[] splits = getCombineSplits(job, numSplits,
pathToPartitionInfo);
+ Collections.addAll(result, splits);
+ }
- // clear work from ThreadLocal after splits generated in case of thread is
reused in pool.
- Utilities.clearWorkMapForConf(job);
+ // Restore the old path information back
+ // This is just to prevent incompatibilities with previous versions Hive
+ // if some application depends on the original value being set.
+ if (oldPaths != null) {
+
job.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR,
oldPaths);
+ }
- // build internal schema for the query
- if (!result.isEmpty()) {
- ArrayList<String> uniqTablePaths = new ArrayList<>();
- Arrays.stream(paths).forEach(path -> {
- final HoodieStorage storage;
- try {
- FileSystem fs = path.getFileSystem(job);
- storage = HoodieStorageUtils.getStorage(
- HadoopFSUtils.convertToStoragePath(path),
HadoopFSUtils.getStorageConf(fs.getConf()));
- Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage,
HadoopFSUtils.convertToStoragePath(path));
- if (tablePath.isPresent()) {
- uniqTablePaths.add(tablePath.get().toUri().toString());
+ // build internal schema for the query
+ if (!result.isEmpty()) {
+ ArrayList<String> uniqTablePaths = new ArrayList<>();
+ Arrays.stream(paths).forEach(path -> {
+ final HoodieStorage storage;
+ try {
+ FileSystem fs = path.getFileSystem(job);
+ storage = HoodieStorageUtils.getStorage(
+ HadoopFSUtils.convertToStoragePath(path),
HadoopFSUtils.getStorageConf(fs.getConf()));
+ Option<StoragePath> tablePath =
TablePathUtils.getTablePath(storage, HadoopFSUtils.convertToStoragePath(path));
+ if (tablePath.isPresent()) {
+ uniqTablePaths.add(tablePath.get().toUri().toString());
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
}
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- });
+ });
- try {
- for (String path : uniqTablePaths) {
- HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(path).setConf(new
HadoopStorageConfiguration(job)).build();
- TableSchemaResolver schemaUtil = new TableSchemaResolver(metaClient);
- String avroSchema = schemaUtil.getTableSchema().toString();
- Option<InternalSchema> internalSchema =
schemaUtil.getTableInternalSchemaFromCommitMetadata();
- if (internalSchema.isPresent()) {
- LOG.info("Set internal and avro schema cache with path: {}", path);
- job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, avroSchema);
- job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path,
SerDeHelper.toJson(internalSchema.get()));
- } else {
- // always sets up the cache so that we can distinguish with the
scenario where the cache was never set(e.g. in tests).
- job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, "");
- job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, "");
+ try {
+ for (String path : uniqTablePaths) {
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(path).setConf(new
HadoopStorageConfiguration(job)).build();
+ TableSchemaResolver schemaUtil = new
TableSchemaResolver(metaClient);
+ String avroSchema = schemaUtil.getTableSchema().toString();
+ Option<InternalSchema> internalSchema =
schemaUtil.getTableInternalSchemaFromCommitMetadata();
+ if (internalSchema.isPresent()) {
+ LOG.info("Set internal and avro schema cache with path: {}",
path);
+ job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, avroSchema);
+ job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path,
SerDeHelper.toJson(internalSchema.get()));
+ } else {
+ // always sets up the cache so that we can distinguish with the
scenario where the cache was never set(e.g. in tests).
+ job.set(SCHEMA_CACHE_KEY_PREFIX + "." + path, "");
+ job.set(INTERNAL_SCHEMA_CACHE_KEY_PREFIX + "." + path, "");
+ }
}
+ } catch (Exception e) {
+ LOG.warn("Failed to set schema cache", e);
}
- } catch (Exception e) {
- LOG.warn("Failed to set schema cache", e);
}
- }
- LOG.info("Number of all splits {}", result.size());
- perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
- return result.toArray(new InputSplit[result.size()]);
+ LOG.info("Number of all splits {}", result.size());
+ perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
+ return result.toArray(new InputSplit[result.size()]);
+ } finally {
+ // Clear work from ThreadLocal after each getSplits attempt, in case the
thread is reused in a pool.
+ Utilities.clearWorkMapForConf(job);
+ }
}
private void processPaths(JobConf job, CombineFileInputFormatShim combine,
List<CombineFileSplit> iss, Path... path)
diff --git
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
index bf331eb55c84..44ceac534004 100644
---
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
+++
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
@@ -67,6 +67,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
@@ -80,7 +82,13 @@ import static
org.apache.hadoop.hive.ql.exec.Utilities.HAS_MAP_WORK;
import static org.apache.hadoop.hive.ql.exec.Utilities.MAPRED_MAPPER_CLASS;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
public class TestHoodieCombineHiveInputFormat extends HoodieCommonTestHarness {
@@ -119,6 +127,30 @@ public class TestHoodieCombineHiveInputFormat extends
HoodieCommonTestHarness {
}
}
+ @Test
+ public void testClearWorkMapForConfOnGetSplitsFailure() throws Exception {
+ StorageConfiguration<Configuration> conf =
HoodieTestUtils.getDefaultStorageConf();
+ File inputDir = tempDir.resolve("input").toFile();
+ assertTrue(inputDir.mkdirs());
+
+ MapredWork mrwork = new MapredWork();
+ Path mapWorkPath = new Path(tempDir.toAbsolutePath().toString());
+ Utilities.setMapRedWork(conf.unwrap(), mrwork, mapWorkPath);
+ JobConf jobConf = new JobConf(conf.unwrap());
+ FileInputFormat.setInputPaths(jobConf, inputDir.getPath());
+ jobConf.set(HAS_MAP_WORK, "true");
+ jobConf.set(MAPRED_MAPPER_CLASS, ExecMapper.class.getName());
+
+ HoodieCombineHiveInputFormat combineHiveInputFormat = spy(new
HoodieCombineHiveInputFormat());
+ doThrow(new RuntimeException("path classification
failed")).when(combineHiveInputFormat)
+ .getNonCombinablePathIndices(eq(jobConf), any(Path[].class), anyInt());
+
+ try (MockedStatic<Utilities> utilities =
Mockito.mockStatic(Utilities.class, Mockito.CALLS_REAL_METHODS)) {
+ assertThrows(IOException.class, () ->
combineHiveInputFormat.getSplits(jobConf, 1));
+ utilities.verify(() -> Utilities.clearWorkMapForConf(jobConf));
+ }
+ }
+
@Test
public void testInternalSchemaCacheForMR() throws Exception {
// test for HUDI-8182