Carl-Zhou-CN commented on code in PR #5617:
URL: https://github.com/apache/seatunnel/pull/5617#discussion_r1377016700


##########
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java:
##########
@@ -143,7 +153,9 @@ public void open() throws Exception {
                 .forEach(
                         entry -> {
                             try {
+                                // Initialize reader
                                 entry.getValue().open();
+                                // Allocates split whose status is pending

Review Comment:
   Would that be a better way to describe it ?"the corresponding status is 
assigned to the reader"



##########
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java:
##########
@@ -231,6 +244,7 @@ public Map<Integer, List<byte[]>> snapshotState(long 
checkpointId) throws Except
         StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
         if (enumeratorState != null) {
             byte[] enumeratorStateBytes = 
enumeratorStateSerializer.serialize(enumeratorState);
+            // -1 identification that the last status information is read

Review Comment:
   Would that be a better way to describe it ?"-1 represents the state of an 
pending split in SourceSplitEnumerator."



##########
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java:
##########
@@ -109,9 +117,11 @@ private void createSplitEnumerator() throws Exception {
                                 throw new RuntimeException(e);
                             }
                         }
+                        // The information read in the status is added to the 
Map

Review Comment:
     Would it be better to describe this process?       
      restoredState.forEach(
                       (subtaskId, splitBytes) -> {
                           if (subtaskId == -1) {
                               return;
                           }
                           List<SplitT> restoredSplitState = new 
ArrayList<>(splitBytes.size());
                           for (byte[] splitByte : splitBytes) {
                               try {
                                   
restoredSplitState.add(splitSerializer.deserialize(splitByte));
                               } catch (IOException e) {
                                   throw new RuntimeException(e);
                               }
                           }
                           // The information read in the status is added to 
the Map
                           restoredSplitStateMap.put(subtaskId, 
restoredSplitState);
                       });



##########
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java:
##########
@@ -143,7 +153,9 @@ public void open() throws Exception {
                 .forEach(
                         entry -> {
                             try {
+                                // Initialize reader
                                 entry.getValue().open();
+                                // Allocates split whose status is pending

Review Comment:
   Would that be a better way to describe it ?"the corresponding status is 
assigned to the reader"



##########
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java:
##########
@@ -47,18 +47,24 @@ public class CoordinatedSource<T, SplitT extends 
SourceSplit, StateT extends Ser
         implements BaseSourceFunction<T> {
     protected static final long SLEEP_TIME_INTERVAL = 5L;
     protected final SeaTunnelSource<T, SplitT, StateT> source;
+    // The subTask in the restored state corresponds to the corresponding 
List<State>
     protected final Map<Integer, List<byte[]>> restoredState;
     protected final Integer parallelism;
 
     protected final Serializer<SplitT> splitSerializer;
     protected final Serializer<StateT> enumeratorStateSerializer;
 
+    /* The creation environment for the distribution read coordinator is used 
to control split
+    distribution */
     protected final CoordinatedEnumeratorContext<SplitT> 
coordinatedEnumeratorContext;
     protected final Map<Integer, CoordinatedReaderContext> readerContextMap;
+    // The task to be recovered and the corresponding split
     protected final Map<Integer, List<SplitT>> restoredSplitStateMap = new 
HashMap<>();
 
     protected transient volatile SourceSplitEnumerator<SplitT, StateT> 
splitEnumerator;
+    // task and Reader Map Info

Review Comment:
   the mapping is from subtask id to the reader info



##########
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java:
##########
@@ -231,6 +244,7 @@ public Map<Integer, List<byte[]>> snapshotState(long 
checkpointId) throws Except
         StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
         if (enumeratorState != null) {
             byte[] enumeratorStateBytes = 
enumeratorStateSerializer.serialize(enumeratorState);
+            // -1 identification that the last status information is read

Review Comment:
   Would that be a better way to describe it ?"-1 represents the state of an 
pending split in SourceSplitEnumerator."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to