This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f7c75a5cde [bugfix][zeta] fix NotifyTaskRestoreOperation npe (#5362)
f7c75a5cde is described below

commit f7c75a5cde6d52f261d21713a7aa6821ff1a90b2
Author: ic4y <[email protected]>
AuthorDate: Wed Aug 23 17:56:53 2023 +0800

    [bugfix][zeta] fix NotifyTaskRestoreOperation npe (#5362)
    
    * [bugfix][zeta] fix NotifyTaskRestoreOperation npe
    
    * add some comment
---
 .../engine/server/task/SourceSplitEnumeratorTask.java          | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
index 1c8b25f37a..e2fe0c335a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java
@@ -37,7 +37,6 @@ import 
org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 import com.hazelcast.cluster.Address;
 import com.hazelcast.spi.impl.operationservice.Operation;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
-import lombok.Getter;
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
 
@@ -78,7 +77,7 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
     private SeaTunnelSplitEnumeratorContext<SplitT> enumeratorContext;
 
     private Serializer<Serializable> enumeratorStateSerializer;
-    @Getter private Serializer<SplitT> splitSerializer;
+    private Serializer<SplitT> splitSerializer;
 
     private int maxReaderSize;
     private Set<Long> unfinishedReaders;
@@ -197,6 +196,13 @@ public class SourceSplitEnumeratorTask<SplitT extends 
SourceSplit> extends Coord
         log.debug("restoreState split enumerator [{}] finished", 
actionStateList);
     }
 
+    public Serializer<SplitT> getSplitSerializer() throws ExecutionException, 
InterruptedException {
+        // Because the splitSerializer is initialized in the init method, it's 
necessary to wait for
+        // the Enumerator to finish initializing.
+        getEnumerator();
+        return splitSerializer;
+    }
+
     public void addSplitsBack(List<SplitT> splits, int subtaskId)
             throws ExecutionException, InterruptedException {
         getEnumerator().addSplitsBack(splits, subtaskId);

Reply via email to