This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d29c8b1b3 [hotfix][ST-Engine] fix ST-Engine task event serialization 
error (#3693)
d29c8b1b3 is described below

commit d29c8b1b33ded34a5314548283766801d1cf2187
Author: ic4y <[email protected]>
AuthorDate: Sun Dec 11 17:04:22 2022 +0800

    [hotfix][ST-Engine] fix ST-Engine task event serialization error (#3693)
---
 .../engine/server/task/operation/source/SourceEventOperation.java   | 5 +++--
 .../server/task/operation/source/SourceReaderEventOperation.java    | 6 +++++-
 2 files changed, 8 insertions(+), 3 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceEventOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceEventOperation.java
index e62865993..a74ef7201 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceEventOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceEventOperation.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.task.operation.source;
 
 import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
 import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
@@ -30,7 +31,7 @@ import java.io.IOException;
 public abstract class SourceEventOperation extends TaskOperation {
     protected TaskLocation currentTaskLocation;
 
-    protected SourceEvent sourceEvent;
+    protected byte[] sourceEvent;
 
     public SourceEventOperation() {
     }
@@ -40,7 +41,7 @@ public abstract class SourceEventOperation extends 
TaskOperation {
                                 SourceEvent event) {
         super(targetTaskLocation);
         this.currentTaskLocation = currentTaskLocation;
-        this.sourceEvent = event;
+        this.sourceEvent = SerializationUtils.serialize(event);
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
index 97e873220..efb13aeb7 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.engine.server.task.operation.source;
 
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -50,7 +51,10 @@ public class SourceReaderEventOperation extends 
SourceEventOperation {
         RetryUtils.retryWithException(() -> {
             SourceSplitEnumeratorTask<?> task =
                 server.getTaskExecutionService().getTask(taskLocation);
-            task.handleSourceEvent(currentTaskLocation.getTaskIndex(), 
sourceEvent);
+            ClassLoader classLoader =
+                
server.getTaskExecutionService().getExecutionContext(taskLocation.getTaskGroupLocation())
+                    .getClassLoader();
+            task.handleSourceEvent(currentTaskLocation.getTaskIndex(), 
SerializationUtils.deserialize(sourceEvent, classLoader));
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
             exception -> exception instanceof NullPointerException &&

Reply via email to