This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a9e055d79f [Hotfix][Zeta] Fix CDC task restore throw NPE (#5507)
a9e055d79f is described below
commit a9e055d79f2bb31c05a5810e2549166f2524e491
Author: hailin0 <[email protected]>
AuthorDate: Sat Sep 16 18:22:21 2023 +0800
[Hotfix][Zeta] Fix CDC task restore throw NPE (#5507)
---
release-note.md | 1 +
.../task/operation/source/RestoredSplitOperation.java | 17 +----------------
2 files changed, 2 insertions(+), 16 deletions(-)
diff --git a/release-note.md b/release-note.md
index 61664d773f..27f57f4ea7 100644
--- a/release-note.md
+++ b/release-note.md
@@ -76,6 +76,7 @@
- [Zeta] Fix cpu load problem (#4828)
- [zeta] Fix the deadlock issue with JDBC driver loading (#4878)
- [zeta] dynamically replace the value of the variable at runtime (#4950)
+- [zeta] Fix CDC task restore throw NPE (#5507)
### E2E
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
index 05fbf6537e..e3f6c90815 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.TaskExecutionService;
-import
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation;
import
org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -101,24 +100,10 @@ public class RestoredSplitOperation extends TaskOperation
{
for (byte[] split : splits) {
deserializeSplits.add(task.getSplitSerializer().deserialize(split));
}
+ task.addSplitsBack(deserializeSplits, subtaskIndex);
} finally {
Thread.currentThread().setContextClassLoader(mainClassLoader);
}
-
- task.getExecutionContext()
- .getTaskExecutionService()
- .asyncExecuteFunction(
- taskLocation.getTaskGroupLocation(),
- () -> {
- try {
-
task.addSplitsBack(deserializeSplits, subtaskIndex);
- } catch (Exception e) {
- task.getExecutionContext()
- .sendToMaster(
- new
CheckpointErrorReportOperation(
-
taskLocation, e));
- }
- });
return null;
},
new RetryUtils.RetryMaterial(