Danil Shkodin created KAFKA-18673:
-------------------------------------

             Summary: Provide means to gracefully update Producer transational 
id mapping state in case of lasting inactivity
                 Key: KAFKA-18673
                 URL: https://issues.apache.org/jira/browse/KAFKA-18673
             Project: Kafka
          Issue Type: Improvement
            Reporter: Danil Shkodin


Consider adding some method of preventing a transactional _Producer_ instance 
from expiring, please.

For ones that run services 24/7 that write transactional messages to Kafka very 
sparsely there are several options to keep the program highly available.

The first being what Spring  
[does|https://docs.spring.io/spring-kafka/reference/kafka/transactions.html#overview]:
 rotating transactional producers at intervals lower than the expiration 
timeout.

 
{code:java}
void fixTransactionalIdExpiration() {
  try {
    producer.close(timeout);
  } catch (Exception error) {
    logger.warn("...", error);
  }
  producer = null;
  try {
    producer = new KafkaProducer<>(settings);
  } catch (Exception error) {
    logger.warn("...", error);
    // handle failure
    return;
  }
  try {
    producer.initTransactions();
  } catch (Exception error) {
    logger.warn("...", error);
    // close producer and clean up, handle failure
    return;
  }
}{code}
 

The other similar one is to also act periodically, but to just write an empty 
record transactionally instead of reconnecting.

 
{code:java}
void fixTransactionalIdExpiration() {
  try {
    producer.beginTransaction();
    var topic = "project_prefix.__dummy_topic";
    var message = new ProducerRecord<>(topic, (String) null, (String) null);
    producer.send(message);
    producer.abortTransaction();
    // or producer.commitTransaction(); does not matter
  } catch (Exception error) {
    logger.warn("...", error);
    // handle failure
  }
}{code}
 

Personally, I do not like the necessity of introducing a service topic. This 
inelegance overweights reconnection troubles for me.
Suprisingly, producing an empty transaction does not prevent expiration. 
[Probably|https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java#L840],
 there is a guard in the transactional manager that would prevent actual 
updates to the transactinal producer mappings if there is nothing to write.
{code:java}
void fixTransactionalIdExpiration() {
  // does not work
  // producer will still get fenced upon transactional.id.expiration.ms
  try {
    producer.beginTransaction();
    producer.commitTransaction();
  } catch (Exception error) {}
} {code}
 

Worth noting that client code may execute one of these periodic fixes 
conditionally, only if there was no activity, meaning there were no successful 
_send()_ or _sendOffsetsToTransaction()_ for, say, 6 - 24 hours.

The last and obvious one is to let it fail and react to the error.

 
{code:java}
void sendMessage(Message message) {
  try {
    producer.beginTransaction();
    producer.send(message.to());
    producer.commitTransaction();
  } catch (InvalidPidMappingException error) {
    // reconnect, retry
  } catch (Exception error) {
    // handle failure
  }
}{code}


Having a dedicated method that explicitly reflects the intent to refresh 
producer _transactional.id_ would line up with the _Consumer_ polling mechanism 
and manifest to new kafka-clients users that lasting transactional _Producer_ 
inactivity should be addressed.


This issue search optimization:
InvalidPidMappingException: The producer attempted to use a producer id which 
is not currently assigned to its transactional id
transactional.id.expiration.ms



 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to