This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2268589  [FLINK-23866][connectors/kafka] Statically assign partitions 
when reading transaction log topic
2268589 is described below

commit 226858967eb341c37b44b2f62646798d42c383ff
Author: Fabian Paul <fabianp...@ververica.com>
AuthorDate: Thu Aug 19 13:25:47 2021 +0200

    [FLINK-23866][connectors/kafka] Statically assign partitions when reading 
transaction log topic
    
    Before this PR we simply used KafkaConsumer#subcribe(topic) to read from
    the transaction log topic. Unfortunately, this has the downside that not
    all partitions are immediately assigned to the consumer. This could
    cause an IllegalStateException when trying to access the latest offset
    of a not yet assigned partition.
    Now the partitions are fetched once and the consumer is assigned to all
    of them immediately.
---
 .../java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java
index 4e5926e..61370c3 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLog.java
@@ -106,7 +106,7 @@ class KafkaTransactionLog implements AutoCloseable {
                         subtaskIdCheckpointOffsetMapping,
                         main.getSubtaskId());
         this.consumer = new KafkaConsumer<>(consumerConfig);
-        
this.consumer.subscribe(ImmutableList.of(TRANSACTION_STATE_TOPIC_NAME));
+        this.consumer.assign(getAllPartitions());
     }
 
     /**

Reply via email to