Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/4928#discussion_r148226195
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher.java
---
@@ -242,10 +243,25 @@ public void
addDiscoveredPartitions(List<KafkaTopicPartition> newPartitions) thr
* @param commitCallback The callback that the user should trigger when
a commit request completes or fails.
* @throws Exception This method forwards exceptions.
*/
- public abstract void commitInternalOffsetsToKafka(
+ public final void commitInternalOffsetsToKafka(
+ Map<KafkaTopicPartition, Long> offsets,
+ @Nonnull KafkaCommitCallback commitCallback) throws
Exception {
+ // Ignore sentinels. They might appear here if snapshot has
started before actual offsets values
+ // replaced sentinels
+ doCommitInternalOffsetsToKafka(filerOutSentinels(offsets),
commitCallback);
+ }
+
+ protected abstract void doCommitInternalOffsetsToKafka(
Map<KafkaTopicPartition, Long> offsets,
@Nonnull KafkaCommitCallback commitCallback) throws
Exception;
+ private Map<KafkaTopicPartition, Long>
filerOutSentinels(Map<KafkaTopicPartition, Long> offsets) {
--- End diff --
typo: `filterOutSentinels`, missing `t`.
---