This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new cc021bd2c [Zeta] [Fix] Fix some operation didn't use classloader which 
belong task (#4611)
cc021bd2c is described below

commit cc021bd2c6de55f39e1ce4f3e3db50ad6d438439
Author: Jia Fan <[email protected]>
AuthorDate: Tue Apr 18 21:49:26 2023 +0800

    [Zeta] [Fix] Fix some operation didn't use classloader which belong task 
(#4611)
---
 .../engine/server/task/operation/source/RequestSplitOperation.java | 7 +++++++
 .../server/task/operation/source/SourceNoMoreElementOperation.java | 7 +++++++
 .../server/task/operation/source/SourceRegisterOperation.java      | 7 +++++++
 3 files changed, 21 insertions(+)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index 45da5b64f..5750046d0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -51,9 +51,16 @@ public class RequestSplitOperation extends Operation 
implements IdentifiedDataSe
 
         RetryUtils.retryWithException(
                 () -> {
+                    ClassLoader classLoader =
+                            server.getTaskExecutionService()
+                                    
.getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
+                                    .getClassLoader();
+                    ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
+                    Thread.currentThread().setContextClassLoader(classLoader);
                     SourceSplitEnumeratorTask<?> task =
                             
server.getTaskExecutionService().getTask(enumeratorTaskID);
                     task.requestSplit(taskID.getTaskIndex());
+                    
Thread.currentThread().setContextClassLoader(oldClassLoader);
                     return null;
                 },
                 new RetryUtils.RetryMaterial(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index 184d36abd..fec6afcb9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -49,9 +49,16 @@ public class SourceNoMoreElementOperation extends Operation 
implements Identifie
         SeaTunnelServer server = getService();
         RetryUtils.retryWithException(
                 () -> {
+                    ClassLoader classLoader =
+                            server.getTaskExecutionService()
+                                    
.getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
+                                    .getClassLoader();
+                    ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
+                    Thread.currentThread().setContextClassLoader(classLoader);
                     SourceSplitEnumeratorTask<?> task =
                             
server.getTaskExecutionService().getTask(enumeratorTaskID);
                     task.readerFinished(currentTaskID.getTaskID());
+                    
Thread.currentThread().setContextClassLoader(oldClassLoader);
                     return null;
                 },
                 new RetryUtils.RetryMaterial(
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index 8ac8a795f..7e0d5ce3a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -55,9 +55,16 @@ public class SourceRegisterOperation extends Operation 
implements IdentifiedData
         Address readerAddress = getCallerAddress();
         RetryUtils.retryWithException(
                 () -> {
+                    ClassLoader classLoader =
+                            server.getTaskExecutionService()
+                                    
.getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
+                                    .getClassLoader();
+                    ClassLoader oldClassLoader = 
Thread.currentThread().getContextClassLoader();
+                    Thread.currentThread().setContextClassLoader(classLoader);
                     SourceSplitEnumeratorTask<?> task =
                             
server.getTaskExecutionService().getTask(enumeratorTaskID);
                     task.receivedReader(readerTaskID, readerAddress);
+                    
Thread.currentThread().setContextClassLoader(oldClassLoader);
                     return null;
                 },
                 new RetryUtils.RetryMaterial(

Reply via email to