becketqin commented on a change in pull request #15924:
URL: https://github.com/apache/flink/pull/15924#discussion_r665110409



##########
File path: 
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/hybrid/HybridSource.java
##########
@@ -110,69 +106,64 @@ public Boundedness getBoundedness() {
 
     @Override
     public SimpleVersionedSerializer<HybridSourceSplit> getSplitSerializer() {
-        // List<SimpleVersionedSerializer<SourceSplit>> serializers = new 
ArrayList<>();
-        // TODO: serializers are created on demand as underlying sources are 
created during switch
-        // sources.forEach(t -> 
serializers.add(castSerializer(t.source.getSplitSerializer())));
         return new HybridSourceSplitSerializer(switchedSources);
     }
 
     @Override
     public SimpleVersionedSerializer<HybridSourceEnumeratorState>
             getEnumeratorCheckpointSerializer() {
-        List<SimpleVersionedSerializer<Object>> serializers = new 
ArrayList<>();
-        sources.forEach(
-                t -> 
serializers.add(castSerializer(t.source.getEnumeratorCheckpointSerializer())));
-        return new HybridSourceEnumeratorStateSerializer(serializers);
-    }
-
-    private static <T> SimpleVersionedSerializer<T> castSerializer(
-            SimpleVersionedSerializer<? extends T> s) {
-        @SuppressWarnings("rawtypes")
-        SimpleVersionedSerializer s1 = s;
-        return s1;
+        return new HybridSourceEnumeratorStateSerializer(switchedSources);
     }
 
     /**
-     * Callback for switch time customization of the underlying source, 
typically to dynamically set
-     * a start position from previous enumerator end state.
+     * Factory for underlying sources of {@link HybridSource}.
      *
-     * <p>Requires the ability to augment the existing source (or clone and 
modify). Provides the
-     * flexibility to set start position in any way a source allows, in a 
source specific way.
-     * Future convenience could be built on top of it, for example an 
implementation recognizes
-     * optional interfaces.
+     * <p>This factory permits building of a source at graph construction time 
or deferred at switch
+     * time. Provides the ability to set a start position in any way a 
specific source allows.
+     * Future convenience could be built on top of it, for example a default 
implementation that
+     * recognizes optional interfaces to transfer position in a common format.
      *
      * <p>Called when the current enumerator has finished and before the next 
enumerator is created.
-     * The enumerator end state can thus be used to set the next source's 
start start position.
+     * The enumerator end state can thus be used to set the next source's 
start start position. Only
+     * required for dynamic position transfer at time of switching.
      *
-     * <p>Only required for dynamic position transfer at time of switching, 
otherwise source can be
-     * preconfigured with a start position during job submission.
+     * <p>If start position is known at jib submission, the source can be 
constructed in the entry
+     * point and simply wrapped into the factory, providing the benefit of 
validation during
+     * submission.
      */
-    public interface SourceConfigurer<SourceT extends Source, FromEnumT 
extends SplitEnumerator>
+    public interface SourceFactory<T, SourceT extends Source, FromEnumT 
extends SplitEnumerator>
             extends Serializable {
-        SourceT configure(SourceT source, FromEnumT enumerator);
+        SourceT create(FromEnumT enumerator);

Review comment:
       At a high level, in terms of how to perform the switchover, I think we 
need to answer the following questions:
   1. **How to extract the _end_position_ information from the previous 
source?** I agree it is intuitive to get the _end_position_ from 
`SplitEnumerator` of the previous source. However, it seems some protocol is 
missing here. We probably need to define some method like `getEndState()` in 
the `SplitEnumerator` interface so users knows how to get the end state, no 
matter what the end state is.
   2. **How to translate the _end_position_ of the previous source to the 
_start_position_ of the next source?** This could be done in the 
`SourceFactory`. I have no concern about this.
   3. **How to construct the next source from the _start_position_?** This is 
also done in the `SourceFactory`. I agree having the `SourceFactory` here makes 
sense as it allows users to build the next Source dynamically.
   
   My concern here is just for the first question.
   I am wondering how would the `SourceFactory` for the next source get the 
information (timestamp, etc) from the enumerator of the previous source? The 
`SplitEnumerator` interface does not have a method to get such information. 
Does that mean the users need to use the concrete enumerator classes (e.g. 
`KafkaPartitionSplitEnumerator`) and these concrete classes also required to 
expose the information for switching via some additional method?
   
   For the example in the java doc, what would be an expected implementation of 
`createKafkaSource`? It seems a little difficult to derive the start position 
of the Kafka source from a `StaticFileSourceEnumerator`, unless it is modified 
by the users for this specific use case.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to