tzulitai commented on a change in pull request #12881:
URL: https://github.com/apache/flink/pull/12881#discussion_r469749157
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/RecordPublisherFactory.java
##########
@@ -37,6 +46,41 @@
* @param streamShardHandle the stream shard in which to consume from
* @return the constructed {@link RecordPublisher}
*/
- RecordPublisher create(Properties consumerConfig, MetricGroup
metricGroup, StreamShardHandle streamShardHandle);
+ RecordPublisher create(
+ SequenceNumber sequenceNumber,
+ Properties consumerConfig,
+ MetricGroup metricGroup,
+ StreamShardHandle streamShardHandle) throws
InterruptedException;
+
+ /**
+ * Determines the starting position in which the {@link
RecordPublisher} should start consuming from.
+ *
+ * @param sequenceNumber the sequence number to start from
+ * @param consumerConfig the consumer properties
+ * @return the {@link StartingPosition} in which to start consuming from
+ */
+ default StartingPosition getStartingPosition(
Review comment:
Was there any particular reason to put this method in the
`RecordPublisherFactory`, instead of letting the factory get a
`StartingPosition` (and not a `SequenceNumber`) on `create`?
The conversion from sequence number to `StartingPosition` logic could
probably be a separate static utility method somewhere that is unrelated to the
actual implementation of the `RecordPublisherFactory`.
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/polling/PollingRecordPublisher.java
##########
@@ -85,12 +88,8 @@
Preconditions.checkArgument(expiredIteratorBackoffMillis >= 0);
Preconditions.checkArgument(maxNumberOfRecordsPerFetch > 0);
- }
- @Override
- public void initialize(StartingPosition startingPosition) throws
InterruptedException {
- nextStartingPosition = startingPosition;
Review comment:
With the `intialize` method now removed, could you also remove the
preconditions check on line 101?
`nextStartingPosition` should now never be null.
Also, can `nextStartingPosition` now be a final field?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]