sundargates commented on a change in pull request #13988:
URL: https://github.com/apache/flink/pull/13988#discussion_r520199275
##########
File path:
flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
##########
@@ -41,12 +43,14 @@
void start();
/**
- * Handles the source event from the source reader.
+ * Handles the request for a split. This method is called when the
reader with the given subtask
+ * id calls the {@link SourceReaderContext#sendSplitRequest()} method.
*
* @param subtaskId the subtask id of the source reader who sent the
source event.
- * @param sourceEvent the source event from the source reader.
+ * @param requesterHostname Optional, the hostname where the requesting
task is running.
+ * This can be used to make split assignments
locality-aware.
*/
- void handleSourceEvent(int subtaskId, SourceEvent sourceEvent);
+ void handleSplitRequest(int subtaskId, @Nullable String
requesterHostname);
Review comment:
+1
##########
File path:
flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
##########
@@ -87,4 +91,18 @@
*/
@Override
default void notifyCheckpointComplete(long checkpointId) throws
Exception {}
+
+ /**
+ * Handles a custom source event from the source reader.
+ *
+ * <p>This method has a default implementation that does nothing,
because it is only
+ * required to be implemented by some sources, which have a custom
event protocol between
+ * reader and enumerator. The common events for reader registration and
split requests
+ * are not dispatched to this method, but rather invoke the {@link
#addReader(int)} and
+ * {@link #handleSplitRequest(int, String)} methods.
+ *
+ * @param subtaskId the subtask id of the source reader who sent the
source event.
+ * @param sourceEvent the source event from the source reader.
+ */
+ default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent)
{}
Review comment:
looks like the enumerators will silently ignore these events if this
method was forgotten to be overridden? Wondering if a better default would be
to either throw unhandled event exception or at the minimum log them so that
the user is aware of the issue? The other thing to keep in mind is that the
enumerator will only get custom events if the reader has decided to send them
which implies that the user is interested in these events in the first place.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
##########
@@ -83,6 +83,14 @@ default void assignSplit(SplitT split, int subtask) {
assignSplits(new SplitsAssignment<>(split, subtask));
}
+ /**
+ * Signals a subtask that it will not receive any further split.
+ *
+ * @param subtask The index of the operator's parallel subtask that
shall be
+ * signaled it will not receive any further split.
+ */
+ void signalNoMoreSplits(int subtask);
Review comment:
Would it also make sense to change this method signature to accept the
list of subtasks to be notified? Also, can we have another method that can be
used to notify all registered readers?
```
void signalNoMoreSplits(Set<Integer> subtasksToBeNotified);
default void signalNoMoreSplits() {
signalNoMoreSplits(registeredReaders.keySet());
}
```
This is especially useful for bounded cases where you do not know ahead of
time if there is any potential that a subtask could be assigned splits in the
future based on the current state of the system as assigned splits could always
be added back and could be reassigned to other subtasks. Only when the system
has reached terminal state (i.e., no more pending splits, all splits
completed), you can know for sure all subtasks can be marked as done. So having
this method would help in not having to go through the coordinator thread for
every single registered subtask.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/source/event/NoMoreSplitsEvent.java
##########
@@ -16,16 +16,16 @@
* limitations under the License.
*/
-package org.apache.flink.api.connector.source.event;
+package org.apache.flink.runtime.source.event;
-import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
/**
* A source event sent from the SplitEnumerator to the SourceReader to
indicate that no more
* splits will be assigned to the source reader anymore. So once the
SplitReader finishes
* reading the currently assigned splits, they can exit.
*/
-public class NoMoreSplitsEvent implements SourceEvent {
+public class NoMoreSplitsEvent implements OperatorEvent {
Review comment:
nit: should this be a final class?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]