Carl-Zhou-CN commented on code in PR #5617:
URL: https://github.com/apache/seatunnel/pull/5617#discussion_r1377016700
##########
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java:
##########
@@ -143,7 +153,9 @@ public void open() throws Exception {
.forEach(
entry -> {
try {
+ // Initialize reader
entry.getValue().open();
+ // Allocates split whose status is pending
Review Comment:
Would that be a better way to describe it ?"the corresponding status is
assigned to the reader"
##########
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java:
##########
@@ -231,6 +244,7 @@ public Map<Integer, List<byte[]>> snapshotState(long
checkpointId) throws Except
StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
if (enumeratorState != null) {
byte[] enumeratorStateBytes =
enumeratorStateSerializer.serialize(enumeratorState);
+ // -1 identification that the last status information is read
Review Comment:
Would that be a better way to describe it ?"-1 represents the state of an
pending split in SourceSplitEnumerator."
##########
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java:
##########
@@ -109,9 +117,11 @@ private void createSplitEnumerator() throws Exception {
throw new RuntimeException(e);
}
}
+ // The information read in the status is added to the
Map
Review Comment:
Would it be better to describe this process?
restoredState.forEach(
(subtaskId, splitBytes) -> {
if (subtaskId == -1) {
return;
}
List<SplitT> restoredSplitState = new
ArrayList<>(splitBytes.size());
for (byte[] splitByte : splitBytes) {
try {
restoredSplitState.add(splitSerializer.deserialize(splitByte));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
// The information read in the status is added to
the Map
restoredSplitStateMap.put(subtaskId,
restoredSplitState);
});
##########
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java:
##########
@@ -143,7 +153,9 @@ public void open() throws Exception {
.forEach(
entry -> {
try {
+ // Initialize reader
entry.getValue().open();
+ // Allocates split whose status is pending
Review Comment:
Would that be a better way to describe it ?"the corresponding status is
assigned to the reader"
##########
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java:
##########
@@ -47,18 +47,24 @@ public class CoordinatedSource<T, SplitT extends
SourceSplit, StateT extends Ser
implements BaseSourceFunction<T> {
protected static final long SLEEP_TIME_INTERVAL = 5L;
protected final SeaTunnelSource<T, SplitT, StateT> source;
+ // The subTask in the restored state corresponds to the corresponding
List<State>
protected final Map<Integer, List<byte[]>> restoredState;
protected final Integer parallelism;
protected final Serializer<SplitT> splitSerializer;
protected final Serializer<StateT> enumeratorStateSerializer;
+ /* The creation environment for the distribution read coordinator is used
to control split
+ distribution */
protected final CoordinatedEnumeratorContext<SplitT>
coordinatedEnumeratorContext;
protected final Map<Integer, CoordinatedReaderContext> readerContextMap;
+ // The task to be recovered and the corresponding split
protected final Map<Integer, List<SplitT>> restoredSplitStateMap = new
HashMap<>();
protected transient volatile SourceSplitEnumerator<SplitT, StateT>
splitEnumerator;
+ // task and Reader Map Info
Review Comment:
the mapping is from subtask id to the reader info
##########
seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/source/CoordinatedSource.java:
##########
@@ -231,6 +244,7 @@ public Map<Integer, List<byte[]>> snapshotState(long
checkpointId) throws Except
StateT enumeratorState = splitEnumerator.snapshotState(checkpointId);
if (enumeratorState != null) {
byte[] enumeratorStateBytes =
enumeratorStateSerializer.serialize(enumeratorState);
+ // -1 identification that the last status information is read
Review Comment:
Would that be a better way to describe it ?"-1 represents the state of an
pending split in SourceSplitEnumerator."
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]