[
https://issues.apache.org/jira/browse/FLINK-7143?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16092533#comment-16092533
]
ASF GitHub Bot commented on FLINK-7143:
---------------------------------------
Github user tzulitai commented on a diff in the pull request:
https://github.com/apache/flink/pull/4357#discussion_r128147490
--- Diff:
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
---
@@ -517,16 +519,13 @@ public void
initializeState(FunctionInitializationContext context) throws Except
LOG.debug("Using the following offsets:
{}", restoredState);
}
}
- if (restoredState != null && restoredState.isEmpty()) {
- restoredState = null;
- }
} else {
LOG.info("No restore state for FlinkKafkaConsumer.");
}
}
@Override
- public void snapshotState(FunctionSnapshotContext context) throws
Exception {
+ public final void snapshotState(FunctionSnapshotContext context) throws
Exception {
--- End diff --
On the other hand, I think that this part of your description is strange:
>we can't set Flink checkpoint to false, because otherwise Kafka consumer
auto.commit will be hard-coded to true.
This should not be the case (at least starting from Flink 1.3.x). The
"auto.commit" is independent of checkpointing. If you don't enable
checkpointing, "auto.commit" decides whether or not periodic checkpointing is
used. Otherwise, you can still disable offset committing with checkpointing on
by using `FlinkKafkaConsumer#disableOffsetCommittingOnCheckpoints`.
> Partition assignment for Kafka consumer is not stable
> -----------------------------------------------------
>
> Key: FLINK-7143
> URL: https://issues.apache.org/jira/browse/FLINK-7143
> Project: Flink
> Issue Type: Bug
> Components: Kafka Connector
> Affects Versions: 1.3.1
> Reporter: Steven Zhen Wu
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Fix For: 1.3.2
>
>
> while deploying Flink 1.3 release to hundreds of routing jobs, we found some
> issues with partition assignment for Kafka consumer. some partitions weren't
> assigned and some partitions got assigned more than once.
> Here is the bug introduced in Flink 1.3.
> {code}
> protected static void initializeSubscribedPartitionsToStartOffsets(...)
> {
> ...
> for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
> if (i % numParallelSubtasks == indexOfThisSubtask) {
> if (startupMode !=
> StartupMode.SPECIFIC_OFFSETS) {
>
> subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i),
> startupMode.getStateSentinel());
> }
> ...
> }
> {code}
> The bug is using array index {{i}} to mod against {{numParallelSubtasks}}. if
> the {{kafkaTopicPartitions}} has different order among different subtasks,
> assignment is not stable cross subtasks and creates the assignment issue
> mentioned earlier.
> fix is also very simple, we should use partitionId to do the mod {{if
> (kafkaTopicPartitions.get\(i\).getPartition() % numParallelSubtasks ==
> indexOfThisSubtask)}}. That would result in stable assignment cross subtasks
> that is independent of ordering in the array.
> marking it as blocker because of its impact.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)