[
https://issues.apache.org/jira/browse/FLINK-7195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16091450#comment-16091450
]
ASF GitHub Bot commented on FLINK-7195:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/4344#discussion_r127951191
--- Diff:
flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseTest.java
---
@@ -532,6 +533,107 @@ public void
testSnapshotStateWithCommitOnCheckpointsDisabled() throws Exception
verify(fetcher,
never()).commitInternalOffsetsToKafka(anyMap()); // not offsets should be
committed
}
+ @Test
+ public void testRestoredStateInsensitiveToMissingPartitions() throws
Exception {
+ List<KafkaTopicPartition> mockFetchedPartitionsOnStartup =
Arrays.asList(
+ new KafkaTopicPartition("test-topic", 0),
+ new KafkaTopicPartition("test-topic", 1),
+ new KafkaTopicPartition("test-topic", 2));
+
+ // missing fetched partitions on restore
+ List<KafkaTopicPartition> mockFetchedPartitionsOnRestore =
mockFetchedPartitionsOnStartup.subList(0, 2);
+
+
testRestoredStateInsensitiveToFetchedPartitions(mockFetchedPartitionsOnStartup,
mockFetchedPartitionsOnRestore);
+ }
+
+ @Test
+ public void testRestoredStateInsensitiveToNewPartitions() throws
Exception {
+ List<KafkaTopicPartition> mockFetchedPartitionsOnStartup =
Arrays.asList(
+ new KafkaTopicPartition("test-topic", 0),
+ new KafkaTopicPartition("test-topic", 1),
+ new KafkaTopicPartition("test-topic", 2));
+
+ // new partitions on restore
+ List<KafkaTopicPartition> mockFetchedPartitionsOnRestore = new
ArrayList<>(mockFetchedPartitionsOnStartup);
--- End diff --
I've addressed this and re-opened as #4357 which subsumes this PR.
> FlinkKafkaConsumer should not respect fetched partitions to filter restored
> partition states
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-7195
> URL: https://issues.apache.org/jira/browse/FLINK-7195
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.3.1
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Fix For: 1.3.2
>
>
> This issue is a re-appearance of FLINK-6006. On restore, we should not
> respect any fetched partitions list from Kafka and perform any filtering of
> the restored partition states. There are corner cases where, due to Kafka
> broker downtime, some partitions may be missing in the fetched partition
> list. To be more precise, we actually should not require fetching partitions
> on restore.
> We've stepped on our own foot again and reintroduced this bug in
> https://github.com/apache/flink/pull/3378/commits/ed68fedbe90db03823d75a020510ad3c344fa73e.
> The previous test for this behavior was too implementation specific, and
> therefore the leak in catching this on different internal implementations.
> We should have a proper unit test for this that does not rely on the internal
> implementations and test only on public abstractions of
> {{FlinkKafkaConsumerBase}}.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)