dannycranmer commented on a change in pull request #12881:
URL: https://github.com/apache/flink/pull/12881#discussion_r469796613



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisherFactory.java
##########
@@ -37,6 +46,41 @@
         * @param streamShardHandle the stream shard in which to consume from
         * @return the constructed {@link RecordPublisher}
         */
-       RecordPublisher create(Properties consumerConfig, MetricGroup 
metricGroup, StreamShardHandle streamShardHandle);
+       RecordPublisher create(
+                       SequenceNumber sequenceNumber,
+                       Properties consumerConfig,
+                       MetricGroup metricGroup,
+                       StreamShardHandle streamShardHandle) throws 
InterruptedException;
+
+       /**
+        * Determines the starting position in which the {@link 
RecordPublisher} should start consuming from.
+        *
+        * @param sequenceNumber the sequence number to start from
+        * @param consumerConfig the consumer properties
+        * @return the {@link StartingPosition} in which to start consuming from
+        */
+       default StartingPosition getStartingPosition(

Review comment:
       It didn't feel right to pass the `@Nullable` fields to 
`StartingPosition` so in the end:
   - `Date` is parsed from properties in `KinesisConfigUtil`
   - Logic to determine to restart from timestamp or sequence is in `AWSUtil`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to