This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a9e055d79f [Hotfix][Zeta] Fix CDC task restore throw NPE (#5507)
a9e055d79f is described below

commit a9e055d79f2bb31c05a5810e2549166f2524e491
Author: hailin0 <[email protected]>
AuthorDate: Sat Sep 16 18:22:21 2023 +0800

    [Hotfix][Zeta] Fix CDC task restore throw NPE (#5507)
---
 release-note.md                                         |  1 +
 .../task/operation/source/RestoredSplitOperation.java   | 17 +----------------
 2 files changed, 2 insertions(+), 16 deletions(-)

diff --git a/release-note.md b/release-note.md
index 61664d773f..27f57f4ea7 100644
--- a/release-note.md
+++ b/release-note.md
@@ -76,6 +76,7 @@
 - [Zeta] Fix cpu load problem (#4828)
 - [zeta] Fix the deadlock issue with JDBC driver loading (#4878)
 - [zeta] dynamically replace the value of the variable at runtime (#4950)
+- [zeta] Fix CDC task restore throw NPE (#5507)
 
 ### E2E
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
index 05fbf6537e..e3f6c90815 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RestoredSplitOperation.java
@@ -22,7 +22,6 @@ import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.TaskExecutionService;
-import 
org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointErrorReportOperation;
 import 
org.apache.seatunnel.engine.server.exception.TaskGroupContextNotFoundException;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
 import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
@@ -101,24 +100,10 @@ public class RestoredSplitOperation extends TaskOperation 
{
                         for (byte[] split : splits) {
                             
deserializeSplits.add(task.getSplitSerializer().deserialize(split));
                         }
+                        task.addSplitsBack(deserializeSplits, subtaskIndex);
                     } finally {
                         
Thread.currentThread().setContextClassLoader(mainClassLoader);
                     }
-
-                    task.getExecutionContext()
-                            .getTaskExecutionService()
-                            .asyncExecuteFunction(
-                                    taskLocation.getTaskGroupLocation(),
-                                    () -> {
-                                        try {
-                                            
task.addSplitsBack(deserializeSplits, subtaskIndex);
-                                        } catch (Exception e) {
-                                            task.getExecutionContext()
-                                                    .sendToMaster(
-                                                            new 
CheckpointErrorReportOperation(
-                                                                    
taskLocation, e));
-                                        }
-                                    });
                     return null;
                 },
                 new RetryUtils.RetryMaterial(

Reply via email to