This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c290aace53 [Improve][Zeta] Fix JobMaster reset app classloader twice
(#7063)
c290aace53 is described below
commit c290aace5329cacfeb3bdf4fbd2d7d53ab3ebc76
Author: Jia Fan <[email protected]>
AuthorDate: Wed Jun 26 20:42:17 2024 +0800
[Improve][Zeta] Fix JobMaster reset app classloader twice (#7063)
---
.../seatunnel/engine/server/master/JobMaster.java | 63 +++++++++++-----------
1 file changed, 31 insertions(+), 32 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 1b7bf6bdad..ae1f1f0de3 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -222,45 +222,44 @@ public class JobMaster {
nodeEngine.getSerializationService(),
classLoader,
jobImmutableInformation.getLogicalDag());
- if (!restart
- && !logicalDag.isStartWithSavePoint()
- &&
ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions())
- .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION)
- .equals(SaveModeExecuteLocation.CLUSTER)) {
- try {
- Thread.currentThread().setContextClassLoader(classLoader);
+ try {
+ Thread.currentThread().setContextClassLoader(classLoader);
+ if (!restart
+ && !logicalDag.isStartWithSavePoint()
+ &&
ReadonlyConfig.fromMap(logicalDag.getJobConfig().getEnvOptions())
+ .get(EnvCommonOptions.SAVEMODE_EXECUTE_LOCATION)
+ .equals(SaveModeExecuteLocation.CLUSTER)) {
logicalDag.getLogicalVertexMap().values().stream()
.map(LogicalVertex::getAction)
.filter(action -> action instanceof SinkAction)
.map(sink -> ((SinkAction<?, ?, ?, ?>) sink).getSink())
.forEach(JobMaster::handleSaveMode);
- } finally {
- Thread.currentThread().setContextClassLoader(appClassLoader);
}
- }
- final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple =
- PlanUtils.fromLogicalDAG(
- logicalDag,
- nodeEngine,
- jobImmutableInformation,
- initializationTimestamp,
- executorService,
- flakeIdGenerator,
- runningJobStateIMap,
- runningJobStateTimestampsIMap,
- engineConfig.getQueueType(),
- engineConfig);
- seaTunnelServer
- .getClassLoaderService()
- .releaseClassLoader(
- jobImmutableInformation.getJobId(),
- jobImmutableInformation.getPluginJarsUrls());
- // revert to app class loader, it may be changed by
PlanUtils.fromLogicalDAG
- Thread.currentThread().setContextClassLoader(appClassLoader);
- this.physicalPlan = planTuple.f0();
- this.physicalPlan.setJobMaster(this);
- this.checkpointPlanMap = planTuple.f1();
+ final Tuple2<PhysicalPlan, Map<Integer, CheckpointPlan>> planTuple
=
+ PlanUtils.fromLogicalDAG(
+ logicalDag,
+ nodeEngine,
+ jobImmutableInformation,
+ initializationTimestamp,
+ executorService,
+ flakeIdGenerator,
+ runningJobStateIMap,
+ runningJobStateTimestampsIMap,
+ engineConfig.getQueueType(),
+ engineConfig);
+ this.physicalPlan = planTuple.f0();
+ this.physicalPlan.setJobMaster(this);
+ this.checkpointPlanMap = planTuple.f1();
+ } finally {
+ // revert to app class loader, it may be changed by
PlanUtils.fromLogicalDAG
+ Thread.currentThread().setContextClassLoader(appClassLoader);
+ seaTunnelServer
+ .getClassLoaderService()
+ .releaseClassLoader(
+ jobImmutableInformation.getJobId(),
+ jobImmutableInformation.getPluginJarsUrls());
+ }
Exception initException = null;
try {
this.initCheckPointManager(restart);