tzulitai commented on a change in pull request #12881:
URL: https://github.com/apache/flink/pull/12881#discussion_r469749157



##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisherFactory.java
##########
@@ -37,6 +46,41 @@
         * @param streamShardHandle the stream shard in which to consume from
         * @return the constructed {@link RecordPublisher}
         */
-       RecordPublisher create(Properties consumerConfig, MetricGroup 
metricGroup, StreamShardHandle streamShardHandle);
+       RecordPublisher create(
+                       SequenceNumber sequenceNumber,
+                       Properties consumerConfig,
+                       MetricGroup metricGroup,
+                       StreamShardHandle streamShardHandle) throws 
InterruptedException;
+
+       /**
+        * Determines the starting position in which the {@link 
RecordPublisher} should start consuming from.
+        *
+        * @param sequenceNumber the sequence number to start from
+        * @param consumerConfig the consumer properties
+        * @return the {@link StartingPosition} in which to start consuming from
+        */
+       default StartingPosition getStartingPosition(

Review comment:
       Was there any particular reason to put this method in the 
`RecordPublisherFactory`, instead of letting the factory get a 
`StartingPosition` (and not a `SequenceNumber`) on `create`?
   The conversion from sequence number to `StartingPosition` logic could 
probably be a separate static utility method somewhere that is unrelated to the 
actual implementation of the `RecordPublisherFactory`.

##########
File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
##########
@@ -85,12 +88,8 @@
 
                Preconditions.checkArgument(expiredIteratorBackoffMillis >= 0);
                Preconditions.checkArgument(maxNumberOfRecordsPerFetch > 0);
-       }
 
-       @Override
-       public void initialize(StartingPosition startingPosition) throws 
InterruptedException {
-               nextStartingPosition = startingPosition;

Review comment:
       With the `intialize` method now removed, could you also remove the 
preconditions check on line 101?
   `nextStartingPosition` should now never be null.
   Also, can `nextStartingPosition` now be a final field?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to