1996fanrui commented on code in PR #27373:
URL: https://github.com/apache/flink/pull/27373#discussion_r2679360247
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java:
##########
@@ -408,4 +416,22 @@ public CheckpointStorageAccess
getCheckpointStorageAccess() {
public ChannelStateWriteRequestExecutorFactory
getChannelStateExecutorFactory() {
return channelStateExecutorFactory;
}
+
+ public void setChannelStateWriter(ChannelStateWriter channelStateWriter) {
+ checkState(
+ this.channelStateWriter == null, "Can not set channelStateWriter
twice!");
+ this.channelStateWriter = channelStateWriter;
+ }
+
+ @Override
+ public ChannelStateWriter getChannelStateWriter() {
+ return checkNotNull(
+ channelStateWriter, "channelStateWriter has not been
initialized yet!");
+ }
+
+ @Override
+ public StateBackend getStateBackend() {
+ return checkNotNull(
+ stateBackend, "stateBackend has not been initialized yet!");
+ }
Review Comment:
From the code, stateBackend never be null, it is initialized in constructor
instead of setter.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java:
##########
@@ -730,7 +746,21 @@ private void doRun() {
this,
externalResourceInfoProvider,
channelStateExecutorFactory,
- taskManagerActions);
+ taskManagerActions,
+ stateBackend);
+
+ final CheckpointStorage checkpointStorage =
createCheckpointStorage(stateBackend);
+ CheckpointStorageAccess checkpointStorageAccess =
+ checkpointStorage.createCheckpointStorage(this.jobId);
+ checkpointStorageAccess =
+ tryApplyFileMergingCheckpoint(
+ checkpointStorageAccess,
+
this.taskStateManager.getFileMergingSnapshotManager(),
+ postFailureCleanUpRegistry,
+ env);
+ this.channelStateWriter =
createChannelStateWriter(checkpointStorage, checkpointStorageAccess);
+ env.setCheckpointStorageAccess(checkpointStorageAccess);
+ env.setChannelStateWriter(channelStateWriter);
Review Comment:
>> As we discuss before, StreamTask sets writer to env and Task gets and
closes writer, right?
> Yes
From the PR, the writer is set in Task. IIUC, we do not need to change lot
of code if it is still set in StreamTask, right?
If so, the `getStateBackend()` may be not needed in `Environment` interface
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]