tweise commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r649682476
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##########
@@ -60,11 +62,20 @@ public HybridSourceReader(
@Override
public void start() {
- setCurrentReader(0);
+ // underlying reader starts on demand with split assignment
}
@Override
public InputStatus pollNext(ReaderOutput output) throws Exception {
+ if (currentReader == null) {
+ if (pendingSplits.isEmpty()) {
+ // no underlying reader before split assignment
+ return InputStatus.NOTHING_AVAILABLE;
Review comment:
The `END_OF_INPUT` from the last reader is passed up. That happens
further down, this here is just handling the case where we are initially
waiting for splits to arrive.
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##########
@@ -60,11 +62,20 @@ public HybridSourceReader(
@Override
public void start() {
- setCurrentReader(0);
+ // underlying reader starts on demand with split assignment
}
@Override
public InputStatus pollNext(ReaderOutput output) throws Exception {
+ if (currentReader == null) {
+ if (pendingSplits.isEmpty()) {
+ // no underlying reader before split assignment
+ return InputStatus.NOTHING_AVAILABLE;
Review comment:
Will be covered by unit test.
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -107,48 +127,74 @@ public Boundedness getBoundedness() {
}
/**
- * Converts checkpoint between sources to transfer end position to next
source's start position.
- * Only required for dynamic position transfer at time of switching,
otherwise source can be
+ * Callback for switch time customization of the underlying source from
previous enumerator end
+ * state.
+ *
+ * <p>Called when the current enumerator has finished and before the next
enumerator is created.
+ * The enumerator end state can thus be used to set the next source's
start start position.
+ *
+ * <p>Only required for dynamic position transfer at time of switching,
otherwise source can be
* preconfigured with a start position during job submission.
*/
- public interface CheckpointConverter<InCheckpointT, OutCheckpointT>
- extends Function<InCheckpointT, OutCheckpointT>, Serializable {}
+ public interface SourceConfigurer<SourceT extends Source, FromEnumT
extends SplitEnumerator>
Review comment:
We would either need to allow to set start position or splits on the
source or clone the existing source and modify that. For now, this hook
provides the flexibility to solve set start position in any way a source
allows, in a source specific way. It is backward compatible (we need this to
work with 1.12) and a customization can resort to reflection if need be. Future
extensions can be built on top of it, like a function that recognizes optional
interfaces. Preferably we don't widen the scope of this PR.
Will update javadoc to convey the intent.
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -107,48 +127,74 @@ public Boundedness getBoundedness() {
}
/**
- * Converts checkpoint between sources to transfer end position to next
source's start position.
- * Only required for dynamic position transfer at time of switching,
otherwise source can be
+ * Callback for switch time customization of the underlying source from
previous enumerator end
+ * state.
+ *
+ * <p>Called when the current enumerator has finished and before the next
enumerator is created.
+ * The enumerator end state can thus be used to set the next source's
start start position.
+ *
+ * <p>Only required for dynamic position transfer at time of switching,
otherwise source can be
* preconfigured with a start position during job submission.
*/
- public interface CheckpointConverter<InCheckpointT, OutCheckpointT>
- extends Function<InCheckpointT, OutCheckpointT>, Serializable {}
+ public interface SourceConfigurer<SourceT extends Source, FromEnumT
extends SplitEnumerator>
Review comment:
If source mutability is the only concern, then alternatively the
function can return a cloned source with the start position augmented. I'm
still leaning slightly towards the former checkpoint based solution but there
are pros and cons for each of the options that have been discussed so far. I
took this back to the mailing list thread, let's continue here and try to
settle on an approach soon!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]