This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d29c8b1b3 [hotfix][ST-Engine] fix ST-Engine task event serialization
error (#3693)
d29c8b1b3 is described below
commit d29c8b1b33ded34a5314548283766801d1cf2187
Author: ic4y <[email protected]>
AuthorDate: Sun Dec 11 17:04:22 2022 +0800
[hotfix][ST-Engine] fix ST-Engine task event serialization error (#3693)
---
.../engine/server/task/operation/source/SourceEventOperation.java | 5 +++--
.../server/task/operation/source/SourceReaderEventOperation.java | 6 +++++-
2 files changed, 8 insertions(+), 3 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceEventOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceEventOperation.java
index e62865993..a74ef7201 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceEventOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceEventOperation.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.task.operation.source;
import org.apache.seatunnel.api.source.SourceEvent;
+import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
@@ -30,7 +31,7 @@ import java.io.IOException;
public abstract class SourceEventOperation extends TaskOperation {
protected TaskLocation currentTaskLocation;
- protected SourceEvent sourceEvent;
+ protected byte[] sourceEvent;
public SourceEventOperation() {
}
@@ -40,7 +41,7 @@ public abstract class SourceEventOperation extends
TaskOperation {
SourceEvent event) {
super(targetTaskLocation);
this.currentTaskLocation = currentTaskLocation;
- this.sourceEvent = event;
+ this.sourceEvent = SerializationUtils.serialize(event);
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
index 97e873220..efb13aeb7 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceReaderEventOperation.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.engine.server.task.operation.source;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -50,7 +51,10 @@ public class SourceReaderEventOperation extends
SourceEventOperation {
RetryUtils.retryWithException(() -> {
SourceSplitEnumeratorTask<?> task =
server.getTaskExecutionService().getTask(taskLocation);
- task.handleSourceEvent(currentTaskLocation.getTaskIndex(),
sourceEvent);
+ ClassLoader classLoader =
+
server.getTaskExecutionService().getExecutionContext(taskLocation.getTaskGroupLocation())
+ .getClassLoader();
+ task.handleSourceEvent(currentTaskLocation.getTaskIndex(),
SerializationUtils.deserialize(sourceEvent, classLoader));
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
exception -> exception instanceof NullPointerException &&