[
https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16096204#comment-16096204
]
ASF GitHub Bot commented on FLINK-7143:
---------------------------------------
Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/4357#discussion_r128754182
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
---
@@ -517,16 +519,13 @@ public void
initializeState(FunctionInitializationContext context) throws Except
LOG.debug("Using the following offsets:
{}", restoredState);
}
}
- if (restoredState != null && restoredState.isEmpty()) {
- restoredState = null;
- }
} else {
LOG.info("No restore state for FlinkKafkaConsumer.");
}
}
@Override
- public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+ public final void snapshotState(FunctionSnapshotContext context) throws
Exception {
--- End diff --
The concern here is that not making the methods final makes it easy for
contributors to accidentally override them. We don't have specific unit tests
for the 0.9 `FlinkKafkaConsumer` or the 0.10 `FlinkKafkaConsumer` and only test
the base `FlinkKafkaConsumerBase`. This is OK, as long as specific
implementations don't override important methods. If the
`FlinkKafkaConsumer090` did override the `snapshot()`/`restore()` methods, for
example, no tests would catch this.
@tzulitai I don't want to discuss here about these methods to much since we
want to get the fixes in for release 1.3.2. A way around the problem is to turn
the `FlinkKafkaConsumerBaseTest` into an abstract
`FlinkKafkaConsumerBaseTestBase` that has an abstract method
`createTestingConsumer(List<KafkaTopicPartition> mockFetchedPartitions)` that
creates a "dummy" consumer for a specific Kafka version. Then we would have
individual `FlinkKafkaConsumer09Test`, `FlinkKafkaConsumer010Test` and so on
that derive form the abstract test base and just implement the method for
creating the testing consumer.
What do you think?
> Partition assignment for Kafka consumer is not stable
> -----------------------------------------------------
>
> Key: FLINK-7143
> URL: https://issues.apache.org/jira/browse/FLINK-7143
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.3.1
> Reporter: Steven Zhen Wu
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Fix For: 1.3.2
>
>
> while deploying Flink 1.3 release to hundreds of routing jobs, we found some
> issues with partition assignment for Kafka consumer. some partitions weren't
> assigned and some partitions got assigned more than once.
> Here is the bug introduced in Flink 1.3.
> {code}
> protected static void initializeSubscribedPartitionsToStartOffsets(...)
> {
> ...
> for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
> if (i % numParallelSubtasks == indexOfThisSubtask) {
> if (startupMode !=
> StartupMode.SPECIFIC_OFFSETS) {
>
> subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i),
> startupMode.getStateSentinel());
> }
> ...
> }
> {code}
> The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if
> the {{kafkaTopicPartitions}} has different order among different subtasks,
> assignment is not stable cross subtasks and creates the assignment issue
> mentioned earlier.
> fix is also very simple, we should use partitionId to do the mod {{if
> (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks ==
> indexOfThisSubtask)}}. That would result in stable assignment cross subtasks
> that is independent of ordering in the array.
> marking it as blocker because of its impact.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)