Danil Shkodin created KAFKA-18673: ------------------------------------- Summary: Provide means to gracefully update Producer transational id mapping state in case of lasting inactivity Key: KAFKA-18673 URL: https://issues.apache.org/jira/browse/KAFKA-18673 Project: Kafka Issue Type: Improvement Reporter: Danil Shkodin
Consider adding some method of preventing a transactional _Producer_ instance from expiring, please. For ones that run services 24/7 that write transactional messages to Kafka very sparsely there are several options to keep the program highly available. The first being what Spring [does|https://docs.spring.io/spring-kafka/reference/kafka/transactions.html#overview]: rotating transactional producers at intervals lower than the expiration timeout. {code:java} void fixTransactionalIdExpiration() { try { producer.close(timeout); } catch (Exception error) { logger.warn("...", error); } producer = null; try { producer = new KafkaProducer<>(settings); } catch (Exception error) { logger.warn("...", error); // handle failure return; } try { producer.initTransactions(); } catch (Exception error) { logger.warn("...", error); // close producer and clean up, handle failure return; } }{code} The other similar one is to also act periodically, but to just write an empty record transactionally instead of reconnecting. {code:java} void fixTransactionalIdExpiration() { try { producer.beginTransaction(); var topic = "project_prefix.__dummy_topic"; var message = new ProducerRecord<>(topic, (String) null, (String) null); producer.send(message); producer.abortTransaction(); // or producer.commitTransaction(); does not matter } catch (Exception error) { logger.warn("...", error); // handle failure } }{code} Personally, I do not like the necessity of introducing a service topic. This inelegance overweights reconnection troubles for me. Suprisingly, producing an empty transaction does not prevent expiration. [Probably|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L840], there is a guard in the transactional manager that would prevent actual updates to the transactinal producer mappings if there is nothing to write. {code:java} void fixTransactionalIdExpiration() { // does not work // producer will still get fenced upon transactional.id.expiration.ms try { producer.beginTransaction(); producer.commitTransaction(); } catch (Exception error) {} } {code} Worth noting that client code may execute one of these periodic fixes conditionally, only if there was no activity, meaning there were no successful _send()_ or _sendOffsetsToTransaction()_ for, say, 6 - 24 hours. The last and obvious one is to let it fail and react to the error. {code:java} void sendMessage(Message message) { try { producer.beginTransaction(); producer.send(message.to()); producer.commitTransaction(); } catch (InvalidPidMappingException error) { // reconnect, retry } catch (Exception error) { // handle failure } }{code} Having a dedicated method that explicitly reflects the intent to refresh producer _transactional.id_ would line up with the _Consumer_ polling mechanism and manifest to new kafka-clients users that lasting transactional _Producer_ inactivity should be addressed. This issue search optimization: InvalidPidMappingException: The producer attempted to use a producer id which is not currently assigned to its transactional id transactional.id.expiration.ms -- This message was sent by Atlassian Jira (v8.20.10#820010)