This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 2268589 [FLINK-23866][connectors/kafka] Statically assign partitions when reading transaction log topic 2268589 is described below commit 226858967eb341c37b44b2f62646798d42c383ff Author: Fabian Paul <fabianp...@ververica.com> AuthorDate: Thu Aug 19 13:25:47 2021 +0200 [FLINK-23866][connectors/kafka] Statically assign partitions when reading transaction log topic Before this PR we simply used KafkaConsumer#subcribe(topic) to read from the transaction log topic. Unfortunately, this has the downside that not all partitions are immediately assigned to the consumer. This could cause an IllegalStateException when trying to access the latest offset of a not yet assigned partition. Now the partitions are fetched once and the consumer is assigned to all of them immediately. --- .../java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java index 4e5926e..61370c3 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java @@ -106,7 +106,7 @@ class KafkaTransactionLog implements AutoCloseable { subtaskIdCheckpointOffsetMapping, main.getSubtaskId()); this.consumer = new KafkaConsumer<>(consumerConfig); - this.consumer.subscribe(ImmutableList.of(TRANSACTION_STATE_TOPIC_NAME)); + this.consumer.assign(getAllPartitions()); } /**