lindong28 commented on code in PR #248:
URL: https://github.com/apache/flink-ml/pull/248#discussion_r1297881781
##########
flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/operator/HeadOperator.java:
##########
@@ -422,16 +430,44 @@ public void close() throws Exception {
}
}
- private void registerFeedbackConsumer(Executor mailboxExecutor) {
+ private void registerFeedbackConsumer(Executor mailboxExecutor)
+ throws MemoryAllocationException {
+ StreamTask<?, ?> task = getContainingTask();
+
int indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
int attemptNum = getRuntimeContext().getAttemptNumber();
FeedbackKey<StreamRecord<IterationRecord<?>>> feedbackKey =
OperatorUtils.createFeedbackKey(iterationId, feedbackIndex);
SubtaskFeedbackKey<StreamRecord<IterationRecord<?>>> key =
feedbackKey.withSubTaskIndex(indexOfThisSubtask, attemptNum);
- FeedbackChannelBroker broker = FeedbackChannelBroker.get();
- FeedbackChannel<StreamRecord<IterationRecord<?>>> channel =
broker.getChannel(key);
- OperatorUtils.registerFeedbackConsumer(channel, this, mailboxExecutor);
+ SpillableFeedbackChannelBroker broker =
SpillableFeedbackChannelBroker.get();
+ this.feedbackChannel = broker.getChannel(key);
Review Comment:
With the current implementation, both HeadOperator and TailOperator will get
channel via `broker.getChannel(key)`. HeadOperator is responsible for
initializing this channel by calling its `initialize()` API. TailOperator will
encounter exception if the channel has not been initialized.
This approach effectively relies on the callers (i.e. HeadOperator and
TailOperator) to coordinator how they should access this shared object. It
would be simpler if the caller code does not rely on this coordination to work.
And this code is less readable. The reason is that
`SpillableFeedbackChannel#getChannel` effectively means "getOrCreateChannel"
since it creates a channel if it does not already exist for the given key.
However, this channel can only be used after it is initialized, which is not
guaranteed by `getChannel`. By passing the initialization logic as an extra
parameter to `getChannel(..)`, this method will always return a usable channel.
This can make the code more readable and more self-contained. And it can also
address the issue mentioned above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]