This is an automated email from the ASF dual-hosted git repository.

nsivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0915f7776be2 fix: Isolate classloader-aware parallel execution in 
HoodiePreCommitValidatorEngineContext (#18585)
0915f7776be2 is described below

commit 0915f7776be2a7d08ea0f23000deeceff7caa8c6
Author: ashokkumar-allu <[email protected]>
AuthorDate: Fri Jun 12 21:09:16 2026 -0500

    fix: Isolate classloader-aware parallel execution in 
HoodiePreCommitValidatorEngineContext (#18585)
    
    Move the classloader-aware parallel execution logic out of 
HoodieLocalEngineContext
    into a new dedicated ExecutorServiceBasedEngineContext that extends
    HoodieLocalEngineContext and overrides map() using an 
ExecutorService-backed thread
    pool. This avoids polluting HoodieLocalEngineContext with 
pre-commit-specific
    concerns and follows the reviewer's suggestion.
    
    Co-authored-by: gallu <[email protected]>
---
 .../hudi/client/utils/SparkValidatorUtils.java     |  38 +--
 .../client/validator/SparkPreCommitValidator.java  |  12 +
 .../hudi/client/utils/TestSparkValidatorUtils.java | 209 +++++++++++++++
 .../validator/TestSparkPreCommitValidator.java     | 143 ++++++++++
 .../engine/ExecutorServiceBasedEngineContext.java  | 290 +++++++++++++++++++++
 .../TestExecutorServiceBasedEngineContext.java     | 112 ++++++++
 .../engine/TestHoodieLocalEngineContext.java       |  12 +-
 7 files changed, 792 insertions(+), 24 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
index 40ee7d8cefff..233f622beff6 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
@@ -23,6 +23,7 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.validator.SparkPreCommitValidator;
 import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.ExecutorServiceBasedEngineContext;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.BaseFile;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -50,7 +51,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -83,7 +83,7 @@ public class SparkValidatorUtils {
       Dataset<Row> afterState = getRecordsFromPendingCommits(sqlContext, 
partitionsModified, writeMetadata, table, instantTime);
       Dataset<Row> beforeState = getRecordsFromCommittedFiles(sqlContext, 
partitionsModified, table, afterState.schema());
 
-      Stream<SparkPreCommitValidator> validators = 
Arrays.stream(config.getPreCommitValidators().split(","))
+      List<SparkPreCommitValidator> validators = 
Arrays.stream(config.getPreCommitValidators().split(","))
           .map(String::trim)
           .filter(s -> !s.isEmpty())
           .flatMap(validatorClass -> {
@@ -105,10 +105,12 @@ public class SparkValidatorUtils {
             } catch (ReflectiveOperationException e) {
               throw new HoodieValidationException("Failed to instantiate 
validator: " + validatorClass, e);
             }
-          });
+          })
+          .collect(Collectors.toList());
 
-      boolean allSuccess = validators.map(v -> runValidatorAsync(v, 
writeMetadata, beforeState, afterState, 
instantTime)).map(CompletableFuture::join)
-          .reduce(true, Boolean::logicalAnd);
+      boolean allSuccess = new 
ExecutorServiceBasedEngineContext(context.getStorageConf())
+          .map(validators, v -> runValidator(v, writeMetadata, beforeState, 
afterState, instantTime), validators.size())
+          .stream().reduce(true, Boolean::logicalAnd);
 
       if (allSuccess) {
         LOG.info("All validations succeeded");
@@ -120,20 +122,20 @@ public class SparkValidatorUtils {
   }
 
   /**
-   * Run validators in a separate thread pool for parallelism. Each of 
validator can submit a distributed spark job if needed.
+   * Run a single validator synchronously in the calling thread; parallelism 
across validators is
+   * provided by the {@link ExecutorServiceBasedEngineContext#map} call site. 
Each validator may submit a distributed Spark
+   * job if needed.
    */
-  private static CompletableFuture<Boolean> 
runValidatorAsync(SparkPreCommitValidator validator, HoodieWriteMetadata<?> 
writeMetadata,
-                                                              Dataset<Row> 
beforeState, Dataset<Row> afterState, String instantTime) {
-    return CompletableFuture.supplyAsync(() -> {
-      try {
-        validator.validate(instantTime, writeMetadata, beforeState, 
afterState);
-        LOG.info("validation complete for {}", validator.getClass().getName());
-        return true;
-      } catch (HoodieValidationException e) {
-        LOG.error("validation failed for {}", validator.getClass().getName(), 
e);
-        return false;
-      }
-    });
+  private static boolean runValidator(SparkPreCommitValidator validator, 
HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata,
+                                      Dataset<Row> beforeState, Dataset<Row> 
afterState, String instantTime) {
+    try {
+      validator.validate(instantTime, writeMetadata, beforeState, afterState);
+      LOG.info("validation complete for {}", validator.getClass().getName());
+      return true;
+    } catch (HoodieValidationException e) {
+      LOG.error("validation failed for {}", validator.getClass().getName(), e);
+      return false;
+    }
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
index 7e1da34c2442..b06e80bf6392 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/validator/SparkPreCommitValidator.java
@@ -82,6 +82,18 @@ public abstract class SparkPreCommitValidator<T, I, K, O 
extends HoodieData<Writ
     HoodieTimer timer = HoodieTimer.start();
     try {
       validateRecordsBeforeAndAfter(before, after, 
getPartitionsModified(writeResult));
+    } catch (HoodieValidationException e) {
+      throw e;
+    } catch (RuntimeException e) {
+      // Unexpected bug (NPE, ClassCastException, etc.) — re-throw as-is so it 
propagates
+      // crash-loud with the original stack trace instead of being silently 
swallowed as a
+      // generic "validation failed" message.
+      log.error("Validator {} threw unexpected exception for instant {}", 
getClass().getName(), instantTime, e);
+      throw e;
+    } catch (Exception e) {
+      // Checked exception — promote to RuntimeException so it propagates 
crash-loud.
+      log.error("Validator {} threw unexpected checked exception for instant 
{}", getClass().getName(), instantTime, e);
+      throw new RuntimeException(e);
     } finally {
       long duration = timer.endTimer();
       log.info(getClass() + " validator took " + duration + " ms" + ", metrics 
on? " + getWriteConfig().isMetricsOn());
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/utils/TestSparkValidatorUtils.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/utils/TestSparkValidatorUtils.java
index c69756f63903..ac54baea9c34 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/utils/TestSparkValidatorUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/utils/TestSparkValidatorUtils.java
@@ -20,18 +20,30 @@ package org.apache.hudi.client.utils;
 
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.SparkPreCommitValidator;
 import org.apache.hudi.client.validator.SqlQuerySingleResultPreCommitValidator;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /**
  * Tests for {@link SparkValidatorUtils}.
@@ -95,4 +107,201 @@ public class TestSparkValidatorUtils extends 
HoodieClientTestBase {
           "Should have 2 commits (one with data, one empty)");
     }
   }
+
+  /**
+   * Verifies that two custom validators are both invoked in parallel via 
ExecutorServiceBasedEngineContext
+   * without ClassNotFoundException, confirming that classloader-loaded user 
validator classes execute correctly.
+   */
+  @Test
+  public void testTwoValidatorsBothInvoked() throws Exception {
+    CountingValidator.INVOCATION_COUNT.set(0);
+
+    HoodieWriteConfig configWithTwoValidators = getConfigBuilder()
+        .withPreCommitValidatorConfig(
+            HoodiePreCommitValidatorConfig.newBuilder()
+                .withPreCommitValidator(
+                    CountingValidator.class.getName() + "," + 
CountingValidator.class.getName())
+                .build())
+        .build();
+
+    try (SparkRDDWriteClient writeClient = 
getHoodieWriteClient(configWithTwoValidators)) {
+      String commit = "001";
+      writeBatch(
+          writeClient,
+          commit,
+          "000",
+          Option.empty(),
+          "000",
+          5,
+          generateWrapRecordsFn(false, configWithTwoValidators, 
dataGen::generateInserts),
+          SparkRDDWriteClient::bulkInsert,
+          true,
+          5,
+          5,
+          1,
+          false,
+          INSTANT_GENERATOR);
+    }
+
+    Assertions.assertEquals(2, CountingValidator.INVOCATION_COUNT.get(),
+        "Both configured validators must have been invoked in parallel");
+  }
+
+  /**
+   * Verifies that when a validator throws {@link HoodieValidationException}, 
it surfaces somewhere
+   * in the exception cause chain of the write operation.
+   * The write client wraps validator exceptions in a {@code 
HoodieInsertException}, so we walk
+   * the cause chain rather than asserting the top-level type.
+   */
+  @Test
+  public void testValidatorFailurePropagatesException() throws Exception {
+    HoodieWriteConfig configWithFailingValidator = getConfigBuilder()
+        .withPreCommitValidatorConfig(
+            HoodiePreCommitValidatorConfig.newBuilder()
+                .withPreCommitValidator(FailingValidator.class.getName())
+                .build())
+        .build();
+
+    try (SparkRDDWriteClient writeClient = 
getHoodieWriteClient(configWithFailingValidator)) {
+      String commit = "001";
+      Exception thrown = assertThrows(Exception.class, () ->
+          writeBatch(
+              writeClient,
+              commit,
+              "000",
+              Option.empty(),
+              "000",
+              5,
+              generateWrapRecordsFn(false, configWithFailingValidator, 
dataGen::generateInserts),
+              SparkRDDWriteClient::bulkInsert,
+              true,
+              5,
+              5,
+              1,
+              false,
+              INSTANT_GENERATOR),
+          "A failing validator must cause the write operation to throw");
+
+      // Walk the cause chain: bulkInsert wraps the HoodieValidationException 
in HoodieInsertException.
+      Throwable cause = thrown;
+      while (cause != null && !(cause instanceof HoodieValidationException)) {
+        cause = cause.getCause();
+      }
+      Assertions.assertNotNull(cause,
+          "HoodieValidationException must appear somewhere in the exception 
cause chain");
+      Assertions.assertInstanceOf(HoodieValidationException.class, cause);
+    }
+  }
+
+  /**
+   * Verifies that when a validator throws an unexpected RuntimeException 
(e.g. NPE or
+   * IllegalStateException — a validator bug), the exception is NOT silently 
swallowed as a
+   * generic "validation failed" message. The original exception must appear 
in the cause chain
+   * so operators can diagnose the real problem.
+   */
+  @Test
+  public void testUnexpectedValidatorExceptionIsNotSilenced() throws Exception 
{
+    HoodieWriteConfig configWithBuggyValidator = getConfigBuilder()
+        .withPreCommitValidatorConfig(
+            HoodiePreCommitValidatorConfig.newBuilder()
+                .withPreCommitValidator(BuggyValidator.class.getName())
+                .build())
+        .build();
+
+    try (SparkRDDWriteClient writeClient = 
getHoodieWriteClient(configWithBuggyValidator)) {
+      String commit = "001";
+      Exception thrown = assertThrows(Exception.class, () ->
+          writeBatch(
+              writeClient,
+              commit,
+              "000",
+              Option.empty(),
+              "000",
+              5,
+              generateWrapRecordsFn(false, configWithBuggyValidator, 
dataGen::generateInserts),
+              SparkRDDWriteClient::bulkInsert,
+              true,
+              5,
+              5,
+              1,
+              false,
+              INSTANT_GENERATOR),
+          "A buggy validator must still cause the write to fail");
+
+      // The original IllegalStateException must be visible somewhere in the 
cause chain.
+      // It must NOT be silently converted into a plain "At least one 
pre-commit validation failed".
+      Throwable cause = thrown;
+      boolean foundOriginal = false;
+      while (cause != null) {
+        if (cause instanceof IllegalStateException
+            && "simulated bug in validator".equals(cause.getMessage())) {
+          foundOriginal = true;
+          break;
+        }
+        cause = cause.getCause();
+      }
+      Assertions.assertTrue(foundOriginal,
+          "The original IllegalStateException from the buggy validator must 
appear in the cause chain, "
+              + "not be buried under a generic 'validation failed' message. 
Full exception: " + thrown);
+    }
+  }
+
+  /**
+   * Minimal validator that records each invocation. Must be a public static 
class so that
+   * ReflectionUtils can instantiate it by name during runValidators.
+   */
+  public static class CountingValidator<T, I, K, O extends 
HoodieData<WriteStatus>>
+      extends SparkPreCommitValidator<T, I, K, O> {
+
+    static final AtomicInteger INVOCATION_COUNT = new AtomicInteger(0);
+
+    public CountingValidator(HoodieSparkTable<T> table, HoodieEngineContext 
context,
+                             HoodieWriteConfig config) {
+      super(table, context, config);
+    }
+
+    @Override
+    protected void validateRecordsBeforeAndAfter(Dataset<Row> before, 
Dataset<Row> after,
+                                                 Set<String> 
partitionsAffected) {
+      INVOCATION_COUNT.incrementAndGet();
+    }
+  }
+
+  /**
+   * Validator that always fails with {@link HoodieValidationException}. Must 
be a public static
+   * class so that ReflectionUtils can instantiate it by name during 
runValidators.
+   */
+  public static class FailingValidator<T, I, K, O extends 
HoodieData<WriteStatus>>
+      extends SparkPreCommitValidator<T, I, K, O> {
+
+    public FailingValidator(HoodieSparkTable<T> table, HoodieEngineContext 
context,
+                            HoodieWriteConfig config) {
+      super(table, context, config);
+    }
+
+    @Override
+    protected void validateRecordsBeforeAndAfter(Dataset<Row> before, 
Dataset<Row> after,
+                                                 Set<String> 
partitionsAffected) {
+      throw new HoodieValidationException("intentional failure from 
FailingValidator");
+    }
+  }
+
+  /**
+   * Validator that throws an unexpected RuntimeException (simulates a 
validator bug such as NPE).
+   * Must be a public static class so that ReflectionUtils can instantiate it 
by name.
+   */
+  public static class BuggyValidator<T, I, K, O extends 
HoodieData<WriteStatus>>
+      extends SparkPreCommitValidator<T, I, K, O> {
+
+    public BuggyValidator(HoodieSparkTable<T> table, HoodieEngineContext 
context,
+                          HoodieWriteConfig config) {
+      super(table, context, config);
+    }
+
+    @Override
+    protected void validateRecordsBeforeAndAfter(Dataset<Row> before, 
Dataset<Row> after,
+                                                 Set<String> 
partitionsAffected) {
+      throw new IllegalStateException("simulated bug in validator");
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/validator/TestSparkPreCommitValidator.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/validator/TestSparkPreCommitValidator.java
new file mode 100644
index 000000000000..83ee85302cbe
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/validator/TestSparkPreCommitValidator.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collections;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit tests for exception-handling behavior in {@link 
SparkPreCommitValidator#validate}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class TestSparkPreCommitValidator {
+
+  @Mock
+  @SuppressWarnings("rawtypes")
+  private HoodieSparkTable table;
+
+  @Mock
+  private HoodieEngineContext engineContext;
+
+  @Mock
+  private HoodieWriteConfig writeConfig;
+
+  @Mock
+  @SuppressWarnings("rawtypes")
+  private HoodieWriteMetadata writeMetadata;
+
+  @BeforeEach
+  @SuppressWarnings("unchecked")
+  void setUp() {
+    when(writeConfig.getTableName()).thenReturn("test-table");
+    when(writeConfig.isMetricsOn()).thenReturn(false);
+    
when(writeMetadata.getWriteStats()).thenReturn(Option.of(Collections.emptyList()));
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  void testValidateSucceeds() {
+    SparkPreCommitValidator<?, ?, ?, HoodieData<WriteStatus>> validator =
+        new NoOpValidator(table, engineContext, writeConfig);
+    assertDoesNotThrow(() -> validator.validate("001", writeMetadata, null, 
null),
+        "validate must not throw when validateRecordsBeforeAndAfter completes 
normally");
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  void testValidateRethrowsUnexpectedRuntimeException() {
+    RuntimeException cause = new RuntimeException("disk full");
+    SparkPreCommitValidator<?, ?, ?, HoodieData<WriteStatus>> validator =
+        new ThrowingValidator(table, engineContext, writeConfig, cause);
+
+    RuntimeException ex = assertThrows(RuntimeException.class,
+        () -> validator.validate("001", writeMetadata, null, null));
+    assertSame(cause, ex,
+        "unexpected RuntimeException must propagate as-is so the operator sees 
the original stack trace, "
+            + "not a generic 'validation failed' message");
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  void testValidateReThrowsValidationException() {
+    HoodieValidationException original = new HoodieValidationException("bad 
data");
+    SparkPreCommitValidator<?, ?, ?, HoodieData<WriteStatus>> validator =
+        new ThrowingValidator(table, engineContext, writeConfig, original);
+
+    HoodieValidationException ex = 
assertThrows(HoodieValidationException.class,
+        () -> validator.validate("001", writeMetadata, null, null));
+    assertSame(original, ex,
+        "HoodieValidationException must be rethrown as-is without additional 
wrapping");
+  }
+
+  /** Minimal concrete validator that completes normally. */
+  private static class NoOpValidator<T, I, K, O extends 
HoodieData<WriteStatus>>
+      extends SparkPreCommitValidator<T, I, K, O> {
+
+    NoOpValidator(HoodieSparkTable<T> table, HoodieEngineContext context, 
HoodieWriteConfig config) {
+      super(table, context, config);
+    }
+
+    @Override
+    protected void validateRecordsBeforeAndAfter(Dataset<Row> before, 
Dataset<Row> after,
+                                                 Set<String> 
partitionsAffected) {
+      // no-op — validation always passes
+    }
+  }
+
+  /** Minimal concrete validator that throws a fixed exception from 
validateRecordsBeforeAndAfter. */
+  private static class ThrowingValidator<T, I, K, O extends 
HoodieData<WriteStatus>>
+      extends SparkPreCommitValidator<T, I, K, O> {
+
+    private final RuntimeException toThrow;
+
+    ThrowingValidator(HoodieSparkTable<T> table, HoodieEngineContext context,
+                      HoodieWriteConfig config, RuntimeException toThrow) {
+      super(table, context, config);
+      this.toThrow = toThrow;
+    }
+
+    @Override
+    protected void validateRecordsBeforeAndAfter(Dataset<Row> before, 
Dataset<Row> after,
+                                                 Set<String> 
partitionsAffected) {
+      throw toThrow;
+    }
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/ExecutorServiceBasedEngineContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/ExecutorServiceBasedEngineContext.java
new file mode 100644
index 000000000000..c6ba0d736acf
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/ExecutorServiceBasedEngineContext.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.engine;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.data.HoodieAccumulator;
+import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.data.HoodieListPairData;
+import org.apache.hudi.common.data.HoodiePairData;
+import org.apache.hudi.common.function.FunctionWrapper;
+import org.apache.hudi.common.function.SerializableBiFunction;
+import org.apache.hudi.common.function.SerializableConsumer;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.function.SerializablePairFlatMapFunction;
+import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.ImmutablePair;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.keygen.KeyGenerator;
+import org.apache.hudi.storage.StorageConfiguration;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ForkJoinPool;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A general-purpose {@link HoodieEngineContext} that executes all parallel 
operations on a
+ * dedicated classloader-aware {@link ExecutorService}, so classes resolved by 
the application
+ * classloader remain visible to worker threads on Java 11+.
+ *
+ * <p>The pool is lazily created once per JVM (JLS §12.4.2) and shared across 
all instances.
+ * Worker threads are daemon threads and carry the classloader of this class, 
preventing
+ * {@code ClassNotFoundException} that occurs when the common {@link 
ForkJoinPool} is used
+ * because its workers do not inherit the submitting thread's context 
classloader.
+ */
+public class ExecutorServiceBasedEngineContext extends HoodieEngineContext {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(ExecutorServiceBasedEngineContext.class);
+
+  // Lazy-initialized, daemon, fixed thread pool whose workers carry the 
correct classloader.
+  // JLS §12.4.2 guarantees thread-safe initialization via class-loading locks.
+  private static class PoolHolder {
+    static final ExecutorService INSTANCE = createExecutorService();
+  }
+
+  private static ExecutorService createExecutorService() {
+    int parallelism = ForkJoinPool.commonPool().getParallelism();
+    ClassLoader cl = ExecutorServiceBasedEngineContext.class.getClassLoader();
+    ExecutorService executor = Executors.newFixedThreadPool(parallelism, r -> {
+      Thread t = Executors.defaultThreadFactory().newThread(r);
+      t.setContextClassLoader(cl);
+      t.setDaemon(true);
+      return t;
+    });
+    LOG.info("Created ExecutorServiceBasedEngineContext pool with {} threads", 
parallelism);
+    return executor;
+  }
+
+  public ExecutorServiceBasedEngineContext(StorageConfiguration<?> conf) {
+    super(conf, new LocalTaskContextSupplier());
+  }
+
+  // ---- Core parallel helpers ----
+
+  /**
+   * Submits each element to the executor pool and collects results in input 
order.
+   * RuntimeExceptions / Errors from {@code func} are re-thrown as-is after 
unwrapping
+   * the {@link CompletionException} wrapper.
+   */
+  private <I, O> List<O> mapAsync(List<I> data, Function<I, O> func) {
+    List<CompletableFuture<O>> futures = data.stream()
+        .map(item -> CompletableFuture.supplyAsync(() -> func.apply(item), 
PoolHolder.INSTANCE))
+        .collect(Collectors.toList());
+    try {
+      CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])).join();
+    } catch (CompletionException e) {
+      throw rethrowUnwrapped(e);
+    }
+    return 
futures.stream().map(CompletableFuture::join).collect(Collectors.toList());
+  }
+
+  private static RuntimeException rethrowUnwrapped(CompletionException e) {
+    Throwable cause = e.getCause();
+    if (cause instanceof RuntimeException) {
+      throw (RuntimeException) cause;
+    }
+    if (cause instanceof Error) {
+      throw (Error) cause;
+    }
+    throw e;
+  }
+
+  // ---- HoodieEngineContext implementations ----
+
+  @Override
+  public HoodieAccumulator newAccumulator() {
+    return HoodieAtomicLongAccumulator.create();
+  }
+
+  @Override
+  public <T> HoodieData<T> emptyHoodieData() {
+    return HoodieListData.eager(Collections.emptyList());
+  }
+
+  @Override
+  public <K, V> HoodiePairData<K, V> emptyHoodiePairData() {
+    return HoodieListPairData.eager(Collections.emptyList());
+  }
+
+  @Override
+  public <T> HoodieData<T> parallelize(List<T> data, int parallelism) {
+    return HoodieListData.eager(data);
+  }
+
+  @Override
+  public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int 
parallelism) {
+    return mapAsync(data, FunctionWrapper.throwingMapWrapper(func));
+  }
+
+  @Override
+  public <I, K, V> List<V> mapToPairAndReduceByKey(List<I> data, 
SerializablePairFunction<I, K, V> mapToPairFunc,
+                                                    SerializableBiFunction<V, 
V, V> reduceFunc, int parallelism) {
+    List<Pair<K, V>> pairs = mapAsync(data, 
FunctionWrapper.throwingMapToPairWrapper(mapToPairFunc));
+    return pairs.stream()
+        .collect(Collectors.groupingBy(Pair::getKey)).values().stream()
+        .map(list -> list.stream().map(Pair::getValue)
+            
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null))
+        .filter(Objects::nonNull)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public <I, K, V> Stream<ImmutablePair<K, V>> 
mapPartitionsToPairAndReduceByKey(
+      Stream<I> data, SerializablePairFlatMapFunction<Iterator<I>, K, V> 
flatMapToPairFunc,
+      SerializableBiFunction<V, V, V> reduceFunc, int parallelism) {
+    try {
+      return CompletableFuture.supplyAsync(() ->
+          
FunctionWrapper.throwingFlatMapToPairWrapper(flatMapToPairFunc).apply(data.iterator())
+              .collect(Collectors.groupingBy(Pair::getKey)).entrySet().stream()
+              .map(entry -> new ImmutablePair<>(entry.getKey(),
+                  entry.getValue().stream().map(Pair::getValue)
+                      
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null)))
+              .filter(Objects::nonNull),
+          PoolHolder.INSTANCE).join();
+    } catch (CompletionException e) {
+      throw rethrowUnwrapped(e);
+    }
+  }
+
+  @Override
+  public <I, K, V> List<V> reduceByKey(List<Pair<K, V>> data, 
SerializableBiFunction<V, V, V> reduceFunc,
+                                        int parallelism) {
+    // Group by key (sequential), then reduce each group in parallel on the 
executor.
+    Map<K, List<V>> grouped = data.stream()
+        .collect(Collectors.groupingBy(Pair::getKey,
+            Collectors.mapping(Pair::getValue, Collectors.toList())));
+    return mapAsync(new ArrayList<>(grouped.entrySet()),
+        entry -> entry.getValue().stream()
+            
.reduce(FunctionWrapper.throwingReduceWrapper(reduceFunc)).orElse(null))
+        .stream().filter(Objects::nonNull).collect(Collectors.toList());
+  }
+
+  @Override
+  public <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, 
Stream<O>> func, int parallelism) {
+    return mapAsync(data, FunctionWrapper.throwingFlatMapWrapper(func))
+        .stream().flatMap(s -> s).collect(Collectors.toList());
+  }
+
+  @Override
+  public <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int 
parallelism) {
+    List<CompletableFuture<Void>> futures = data.stream()
+        .map(item -> CompletableFuture.runAsync(
+            () -> 
FunctionWrapper.throwingForeachWrapper(consumer).accept(item), 
PoolHolder.INSTANCE))
+        .collect(Collectors.toList());
+    try {
+      CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[0])).join();
+    } catch (CompletionException e) {
+      throw rethrowUnwrapped(e);
+    }
+  }
+
+  @Override
+  public <I, K, V> Map<K, V> mapToPair(List<I> data, 
SerializablePairFunction<I, K, V> func, Integer parallelism) {
+    return mapAsync(data, FunctionWrapper.throwingMapToPairWrapper(func))
+        .stream().collect(Collectors.toMap(Pair::getLeft, Pair::getRight, 
(oldVal, newVal) -> newVal));
+  }
+
+  @Override
+  public void setProperty(EngineProperty key, String value) {
+    // no operation
+  }
+
+  @Override
+  public Option<String> getProperty(EngineProperty key) {
+    return Option.empty();
+  }
+
+  @Override
+  public void setJobStatus(String activeModule, String activityDescription) {
+    // no operation
+  }
+
+  @Override
+  public void clearJobStatus() {
+    // no operation
+  }
+
+  @Override
+  public void putCachedDataIds(HoodieDataCacheKey cacheKey, int... ids) {
+    // no operation
+  }
+
+  @Override
+  public List<Integer> getCachedDataIds(HoodieDataCacheKey cacheKey) {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<Integer> removeCachedDataIds(HoodieDataCacheKey cacheKey) {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public void cancelJob(String jobId) {
+    // no operation
+  }
+
+  @Override
+  public void cancelAllJobs() {
+    // no operation
+  }
+
+  @Override
+  public <I, O> O aggregate(HoodieData<I> data, O zeroValue, 
Functions.Function2<O, I, O> seqOp,
+                             Functions.Function2<O, O, O> combOp) {
+    return data.collectAsList().stream().reduce(zeroValue, seqOp::apply, 
combOp::apply);
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> ReaderContextFactory<T> 
getReaderContextFactory(HoodieTableMetaClient metaClient) {
+    return (ReaderContextFactory<T>) getEngineReaderContextFactory(metaClient);
+  }
+
+  @Override
+  public ReaderContextFactory<?> 
getEngineReaderContextFactory(HoodieTableMetaClient metaClient) {
+    return new AvroReaderContextFactory(metaClient, new TypedProperties());
+  }
+
+  @Override
+  public KeyGenerator createKeyGenerator(TypedProperties props) throws 
IOException {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+}
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/engine/TestExecutorServiceBasedEngineContext.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/engine/TestExecutorServiceBasedEngineContext.java
new file mode 100644
index 000000000000..475aa4c9d2dc
--- /dev/null
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/engine/TestExecutorServiceBasedEngineContext.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.engine;
+
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.storage.HoodieStorage;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorage;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestExecutorServiceBasedEngineContext {
+
+  private ExecutorServiceBasedEngineContext context;
+
+  @BeforeEach
+  void setUp() {
+    HoodieStorage storage = getDefaultStorage();
+    context = new ExecutorServiceBasedEngineContext(storage.getConf());
+  }
+
+  @Test
+  void testMapHappyPath() {
+    List<Integer> result = context.map(Arrays.asList(1, 2, 3), x -> x * 2, 3);
+    assertEquals(Arrays.asList(2, 4, 6), 
result.stream().sorted().collect(Collectors.toList()));
+  }
+
+  @Test
+  void testMapEmptyList() {
+    List<Integer> result = 
context.map(java.util.Collections.<Integer>emptyList(), x -> x * 2, 0);
+    assertTrue(result.isEmpty(), "map over empty list must return empty list");
+  }
+
+  @Test
+  void testMapPreservesInputOrder() {
+    List<Integer> input = IntStream.range(0, 
100).boxed().collect(Collectors.toList());
+    List<Integer> result = context.map(input, x -> x, 100);
+    assertEquals(input, result, "map must return results in the same order as 
the input list");
+  }
+
+  @Test
+  void testMapPropagatesRuntimeException() {
+    RuntimeException original = new RuntimeException("boom");
+    RuntimeException thrown = assertThrows(RuntimeException.class, () ->
+        context.map(java.util.Collections.singletonList(1), x -> {
+          throw original;
+        }, 1));
+    // throwingMapWrapper wraps ALL exceptions in HoodieException; original is 
the direct cause
+    assertInstanceOf(HoodieException.class, thrown);
+    assertSame(original, thrown.getCause());
+  }
+
+  @Test
+  void testMapPropagatesHoodieException() {
+    HoodieException original = new HoodieException("hoodie-boom");
+    RuntimeException thrown = assertThrows(RuntimeException.class, () ->
+        context.map(java.util.Collections.singletonList(1), x -> {
+          throw original;
+        }, 1));
+    assertInstanceOf(HoodieException.class, thrown);
+    assertSame(original, thrown.getCause());
+  }
+
+  @Test
+  void testMapWrapsCheckedException() {
+    Exception checkedCause = new Exception("checked!");
+    RuntimeException thrown = assertThrows(RuntimeException.class, () ->
+        context.map(java.util.Collections.singletonList(1), x -> {
+          throw checkedCause;
+        }, 1));
+    assertInstanceOf(HoodieException.class, thrown);
+    assertSame(checkedCause, thrown.getCause());
+  }
+
+  @Test
+  void testWorkerThreadClassloader() {
+    ClassLoader[] captured = new ClassLoader[1];
+    context.map(java.util.Collections.singletonList(1), x -> {
+      captured[0] = Thread.currentThread().getContextClassLoader();
+      return x;
+    }, 1);
+    assertEquals(ExecutorServiceBasedEngineContext.class.getClassLoader(), 
captured[0],
+        "Worker threads must use ExecutorServiceBasedEngineContext classloader 
to avoid ClassNotFoundException on Java 11+");
+  }
+}
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/engine/TestHoodieLocalEngineContext.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/engine/TestHoodieLocalEngineContext.java
index bdb858c763e4..c4fc1e285f7c 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/engine/TestHoodieLocalEngineContext.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/engine/TestHoodieLocalEngineContext.java
@@ -150,24 +150,24 @@ public class TestHoodieLocalEngineContext {
         ImmutablePair.of("key1", 42),
         ImmutablePair.of("key2", 17)
     );
-    
+
     HoodiePairData<String, Integer> pairData = 
HoodieListPairData.lazy(singleValuePairs);
-    
+
     // Create a function that just returns the values
     SerializableFunction<Iterator<Integer>, Iterator<Integer>> func = iterator 
-> {
       List<Integer> values = new ArrayList<>();
       iterator.forEachRemaining(values::add);
       return values.iterator();
     };
-    
+
     List<String> shardIndices = Arrays.asList("key1", "key2");
     HoodieData<Integer> result = context.mapGroupsByKey(pairData, func, 
shardIndices, false);
-    
+
     List<Integer> resultList = result.collectAsList();
-    
+
     // Verify the results
     assertEquals(2, resultList.size());
     assertTrue(resultList.contains(42));
     assertTrue(resultList.contains(17));
   }
-} 
\ No newline at end of file
+}


Reply via email to