AHeise commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r649084908
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSourceReader.java
##########
@@ -60,11 +62,20 @@ public HybridSourceReader(
@Override
public void start() {
- setCurrentReader(0);
+ // underlying reader starts on demand with split assignment
}
@Override
public InputStatus pollNext(ReaderOutput output) throws Exception {
Review comment:
Do we want to support a bounded hybrid? Currently, there is no
`END_OF_INPUT` return value...
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -107,48 +127,74 @@ public Boundedness getBoundedness() {
}
/**
- * Converts checkpoint between sources to transfer end position to next
source's start position.
- * Only required for dynamic position transfer at time of switching,
otherwise source can be
+ * Callback for switch time customization of the underlying source from
previous enumerator end
+ * state.
+ *
+ * <p>Called when the current enumerator has finished and before the next
enumerator is created.
+ * The enumerator end state can thus be used to set the next source's
start start position.
+ *
+ * <p>Only required for dynamic position transfer at time of switching,
otherwise source can be
* preconfigured with a start position during job submission.
*/
- public interface CheckpointConverter<InCheckpointT, OutCheckpointT>
- extends Function<InCheckpointT, OutCheckpointT>, Serializable {}
+ public interface SourceConfigurer<SourceT extends Source, FromEnumT
extends SplitEnumerator>
Review comment:
Hm, this means that every source needs to provide a setter to be usable
in `HybridSource`. We probably can't even force that with an API change as
start offsets are defined very differently.
I'm still thinking that the builder route would go more with the intended
design of `Source` but we can also keep as is for now and see how it affects
`FileSource` and `KafkaSource` first.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]