tweise commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r677967467
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -110,69 +106,64 @@ public Boundedness getBoundedness() {
@Override
public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer() {
- // List<SimpleVersionedSerializer<SourceSplit>> serializers = new
ArrayList<>();
- // TODO: serializers are created on demand as underlying sources are
created during switch
- // sources.forEach(t ->
serializers.add(castSerializer(t.source.getSplitSerializer())));
return new HybridSourceSplitSerializer(switchedSources);
}
@Override
public SimpleVersionedSerializer<HybridSourceEnumeratorState>
getEnumeratorCheckpointSerializer() {
- List<SimpleVersionedSerializer<Object>> serializers = new
ArrayList<>();
- sources.forEach(
- t ->
serializers.add(castSerializer(t.source.getEnumeratorCheckpointSerializer())));
- return new HybridSourceEnumeratorStateSerializer(serializers);
- }
-
- private static <T> SimpleVersionedSerializer<T> castSerializer(
- SimpleVersionedSerializer<? extends T> s) {
- @SuppressWarnings("rawtypes")
- SimpleVersionedSerializer s1 = s;
- return s1;
+ return new HybridSourceEnumeratorStateSerializer(switchedSources);
}
/**
- * Callback for switch time customization of the underlying source,
typically to dynamically set
- * a start position from previous enumerator end state.
+ * Factory for underlying sources of {@link HybridSource}.
*
- * <p>Requires the ability to augment the existing source (or clone and
modify). Provides the
- * flexibility to set start position in any way a source allows, in a
source specific way.
- * Future convenience could be built on top of it, for example an
implementation recognizes
- * optional interfaces.
+ * <p>This factory permits building of a source at graph construction time
or deferred at switch
+ * time. Provides the ability to set a start position in any way a
specific source allows.
+ * Future convenience could be built on top of it, for example a default
implementation that
+ * recognizes optional interfaces to transfer position in a common format.
*
* <p>Called when the current enumerator has finished and before the next
enumerator is created.
- * The enumerator end state can thus be used to set the next source's
start start position.
+ * The enumerator end state can thus be used to set the next source's
start start position. Only
+ * required for dynamic position transfer at time of switching.
*
- * <p>Only required for dynamic position transfer at time of switching,
otherwise source can be
- * preconfigured with a start position during job submission.
+ * <p>If start position is known at jib submission, the source can be
constructed in the entry
+ * point and simply wrapped into the factory, providing the benefit of
validation during
+ * submission.
*/
- public interface SourceConfigurer<SourceT extends Source, FromEnumT
extends SplitEnumerator>
+ public interface SourceFactory<T, SourceT extends Source, FromEnumT
extends SplitEnumerator>
extends Serializable {
- SourceT configure(SourceT source, FromEnumT enumerator);
+ SourceT create(FromEnumT enumerator);
Review comment:
@becketqin your description of the switching process is correct but it
only covers the scenario where the start position of the subsequent source is
determined at switch time. There is the much simpler scenario with fixed start
position which does not require the user to implement `SourceFactory`:
```
FileSource<String> fileSource =
FileSource.forRecordStreamFormat(new TextLineFormat(),
Path.fromLocalFile(testDir)).build();
KafkaSource<String> kafkaSource =
KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setGroupId("MyGroup")
.setTopics(Arrays.asList("quickstart-events"))
.setDeserializer(
KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
.setStartingOffsets(OffsetsInitializer.earliest())
.build();
HybridSource hybridSource =
HybridSource.builder(fileSource)
.addSource(kafkaSource)
.build();
```
The intention for this PR is to cover that simple scenario and at the same
time provide the mechanism to build something more advanced with a different
bounded source. This isn't in conflict with subsequent addition of standard
interface for extracting end position and we can discuss whether or not that
should be part of FLIP-150. However I hope to find a way to move this PR
forward independently as it has been under review for a long time now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]