[
https://issues.apache.org/jira/browse/FLINK-34554?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17827657#comment-17827657
]
Hilmi Al Fatih commented on FLINK-34554:
----------------------------------------
Hi [~martijnvisser],
I just reproduced the behavior with one of the recent version of Kafka (3.6.1)
Here is the java code. It basically does: 1) initTransaction 2) send one record
3) commit.
{code:java}
public class KafkaSinkMimic {
private static final Logger logger =
LoggerFactory.getLogger(KafkaSinkMimic.class);
public static String BOOTSTRAP_SERVERS = "localhost:9092";
public static final String TOPIC = "test-kafkasink-topic";
public static void main(String[] args) throws InterruptedException {
if (args.length < 2){
System.out.println("Usage: java -cp <jarname> KafkaSinkMimic
<num_subtasks> <checkpoint_interval_ms> <bootstrap_server>");
}
int numSubtasks = Integer.parseInt(args[0]);
int checkpointIntervalMs = Integer.parseInt(args[1]);
if (args.length == 3){
KafkaSinkMimic.BOOTSTRAP_SERVERS = args[2];
}
System.out.println("Running with args " +
Arrays.asList(args).stream().collect(Collectors.joining(" ")));
KafkaSinkMimic mimic = new KafkaSinkMimic(checkpointIntervalMs,
numSubtasks);
mimic.run();
}
private int checkpointInterval = 5000;
private int numSubtasks = 1;
private long currentTs;
private Executor executor;
public KafkaSinkMimic(int checkpointInterval, int numSubtasks){
this.checkpointInterval = checkpointInterval;
this.numSubtasks = numSubtasks;
this.currentTs = Instant.now().getEpochSecond();
this.executor = Executors.newFixedThreadPool(numSubtasks);
}
public void run() throws InterruptedException {
int checkpointId = 0;
String transactionalIdPrefix = System.getenv("HOSTNAME") + "-" +
currentTs + "-";
while(true){
for(int i = 0; i<numSubtasks; i++){
int subtaskId = i;
int finalCheckpointId = checkpointId;
executor.execute(() -> this.runSubtask(transactionalIdPrefix,
subtaskId, finalCheckpointId));
}
Thread.sleep(checkpointInterval);
checkpointId++;
}
}
public void runSubtask(String transactionalIdPrefix, int subtaskId, int
checkpointId) {
Properties props = new Properties();
props.put("acks", "all");
props.put("transaction.timeout.ms", "900000");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
String transactionalId = transactionalIdPrefix + subtaskId + "-" +
checkpointId;
props.put("transactional.id", transactionalId);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
logger.info("Starting transaction with id {}", transactionalId);
producer.beginTransaction();
producer.send(new ProducerRecord<>(TOPIC,
Integer.toString(subtaskId), Instant.now().toString()));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException |
AuthorizationException e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
producer.close();
}
} {code}
As you can see in the figure below, it consumes broker heap significantly.
!image-2024-03-16-17-17-16-152.png|width=750,height=284!
Compared to old kafka broker version, the expiration settings for the
ProducerStateEntry has changed from
[transactional.id.expiration.ms|https://kafka.apache.org/36/documentation.html#brokerconfigs_transactional.id.expiration.ms]
(default 7 days) to
[producer.id.expiration.ms|https://kafka.apache.org/36/documentation.html#brokerconfigs_producer.id.expiration.ms]
(default 1 day)
([KIP-854|https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry]).
Still, for our usecase, it may cause OOM due to its large number of entries.
In any case, producer Id entries are also one of important concerns by Kafka
team. There are several KIPs that tries to mitigate that issue:
* [KIP-847: Add ProducerIdCount
metrics|https://cwiki.apache.org/confluence/display/KAFKA/KIP-847%3A+Add+ProducerIdCount+metrics]
* [KIP-854 Separate configuration for producer ID
expiry|https://cwiki.apache.org/confluence/display/KAFKA/KIP-854+Separate+configuration+for+producer+ID+expiry]
* [KIP-936: Throttle number of active
PIDs|https://cwiki.apache.org/confluence/display/KAFKA/KIP-936%3A+Throttle+number+of+active+PIDs]
Well, as I said in the ticket, there are many possible tuning to ease the
situation (increase broker heap, reduce checkpoint interval, use less subtasks,
reduce producer.id.expiration.ms, etc), but it harms our flexibility a lot
compared to using pooling as implemented by FlinkKafkaProducer.
Speaking about kafka new feature (2PC participant), I believe this issue will
not be solved even after introducing the new feature if we still uses this
generate-new-transactionalid-per-checkpoint approach.
Please let me know your thoughts on this
> Using EXACTLY_ONCE with KafkaSink cause broker's OOM due to newly created
> transactionalId per checkpoint
> --------------------------------------------------------------------------------------------------------
>
> Key: FLINK-34554
> URL: https://issues.apache.org/jira/browse/FLINK-34554
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / Kafka
> Affects Versions: 1.16.3, 1.17.2, 1.18.1
> Reporter: Hilmi Al Fatih
> Priority: Major
> Attachments: image (4).png, image (5).png,
> image-2024-03-16-17-17-16-152.png
>
>
> Flink version: 1.17.1
> Kafka Broker version: 2.7.1 * 4 GB heap memory for each
> Hi, We recently had an outage in our production system after we perform a
> Flink kafka-connector API upgrade. To give a brief context, our application
> is a simple kafka-to-kafka pipeline with minimal processing. We run in
> EXACTLY_ONCE mode, thus kafka transaction is involved.
> Our application runs with total around 350 sink subtask. Checkpoint period
> was set to 5 seconds to avoid blocking {{read_committed}} consumers too long.
> We recently performed an upgrade with the following details:
> Previous state:
> * Flink version: 1.14.4
> * Broker version: 2.7.1
> * kafka connector API: FlinkKafkaProducer
> Update to:
> * Flink version: 1.17.1
> * Broker version: 2.7.1
> * kafka connector API: KafkaSink
> Around 10 hours after the deployment, our kafka broker started to failing
> with OOM error. Heap dump entries are dominated by the ProducerStateEntry
> records.
> Our investigation leads to finding the total implementation change between
> FlinkKafkaProducer and KafkaSink.
> * KafkaSink generate different transactionalId for each checkpoint,
> * FlinkKafkaProducer uses constant set of transactionalId pool.
> With this behavior, KafkaSink seemed to exhaust our broker heap very fast and
> the ProducerStateEntry will only expire after
> [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] ,
> which by default is set to 7 days.
> ([ref1|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/Log.scala#L677],
>
> [ref2|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L268],
>
> [ref3|https://github.com/apache/kafka/blob/61dbce85d0d41457d81a4096ecaea049f3a4b3ae/core/src/main/scala/kafka/log/LogManager.scala#L1207])
> For our job, it means it creates roughly:
> * 10 hour running) 350 ids * 12 times/minute * 60 min/hour * 10 hour ~
> 2,520,000
> * 7 days) ~ 42mil entries.
> Attached below is the number of ProducerStateEntry entries of heap dump when
> it is OOM:
> * 505,000 (6.5%), in total it would be roughly ~ 7,000,000 entries.
> There are several things that come up in our mind to mitigate the drawbacks
> such as:
> * reduce the number of subtasks, so it reduces the number of transactionalId
> * Enlarge the checkpoint period to reduce the newly generated
> transactionalId rate.
> * Shorten
> [transactional.id.expiration.ms|http://transactional.id.expiration.ms/] to
> expire the unused transactionalId soon.
> * Increase the broker heap
> However, above mitigation might be too cumbersome and need careful tuning
> which harm our flexibility.In addition, due to the lack of maintaining
> lingering transaction state, TransactionAborter seems to abort old
> transaction naively. We might be accidentally (or purposefully) reuse the
> same transactionalIdPrefix and start the counter from 0. In that case, if the
> old transactionalId happens to have epoch >0, it will keep looping aborting
> the nonexistent transactions up to the latest checkpoint counter (which may
> be too big) and make the job stuck.
> Btw, I am aware that in Flink 2.0, you guys are putting a lot of effort on
> creating better integration with Kafka transaction
> ([FLIP-319|https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=255071710]).
> In FLIP-319, it mentions something about TID pooling. However, it is seem
> that there is no relevant page yet for it, so I wonder whether there are any
> concrete plan already that I can follow, or if there is something I can
> contribute to, I will be really happy to help.
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)