[ 
https://issues.apache.org/jira/browse/KAFKA-6267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16264561#comment-16264561
 ] 

Mauro Giacometti commented on KAFKA-6267:
-----------------------------------------

Hi Ismal,
i'm facing the same issue on Linux. Let me describe you the whole scenario. 
Silvio is my colleague and he's working with me in my project.

I'm trying to chain a JPATransaction with a KafkaTransaction by using a 
ChainedTransactionManager provided by spring. This choise deals with the need 
to implement a One Phase Commit. We need to open a "unique transaction" 
(managed by the ChainedTransactionManager) which open first a transaction to a 
Kafka Broker, then a transaction to an Oracle DB, a write operation is 
performed on a db. A final commit on the db is performed immediatly followed by 
a commit of the Kafka Transaction. Every rollback in the JPA transaction will 
lead to a Kafka transaction rollback.
Moreover, we're using spring-integration-kafka and Apache Avro for 
manipulating/serializing/deserializing the messages we're publishing on the 
Kafka Broker.

What we have now is:
0. Client rest service invocation for the creation of a product.
1. Kafka transaction is opened -> Message is delivered.
2. Data are persisted on the database
3. Data are correctly committed on database
4. Kafka transaction is not committed but is on a IN_TRANSITION state and i 
don't know why
5. A second call to the rest service immedialy fails as a new kafka transaction 
is required but there's the old one up. So the abortTransaction method is 
invoked.

What we need to have is:
0. Client rest service invocation for the creation of a product.
1. Kafka transaction is opened -> New Product request Message is delivered.
2. Data are persisted on the database
3. Data are correctly committed on database
4. Kafka transaction committed and available for the consumers.


Here is my configurations for the factory, template, etc. :

  public PlatformTransactionManager transactionManager(
      EntityManagerFactory emf,
      DefaultKafkaProducerFactory producerFactory) {
    
    final JpaTransactionManager jpaTransactionManager = new 
JpaTransactionManager();
    jpaTransactionManager.setEntityManagerFactory(emf);
    final KafkaTransactionManager kafkaTransactionManager = new 
KafkaTransactionManager<>(producerFactory);
    kafkaTransactionManager.setFailEarlyOnGlobalRollbackOnly(true);
    kafkaTransactionManager.setNestedTransactionAllowed(true);
    kafkaTransactionManager.setValidateExistingTransaction(true);
    kafkaTransactionManager.setRollbackOnCommitFailure(true);
    
kafkaTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ALWAYS);
   return new ChainedTransactionManager(new PlatformTransactionManager[] { 
jpaTransactionManager, kafkaTransactionManager });
    
  }

@Bean
  public DefaultKafkaProducerFactory producerFactory() {
    
    final Map<String, Object> configMap = new HashMap<>();
    configMap.put("bootstrap.servers", bootstrapServers);
    configMap.put("key.serializer", keySerializer);
    configMap.put("value.serializer", valueSerializer);
    configMap.put("schema.registry.url", schemaRegistryEndpoint);
    configMap.put("enable.idempotence", enableIdempotenceConfig );
    configMap.put("transactional.id", transactionalIdConfig);
    configMap.put("transaction.timeout.ms", transactionTimeoutMs);
    
    final DefaultKafkaProducerFactory producerFactory = new 
DefaultKafkaProducerFactory(configMap);
    
    producerFactory.setTransactionIdPrefix(transactionalIdConfig);
    return producerFactory;
  }

 public KafkaTemplate templateCreationCustomer(DefaultKafkaProducerFactory 
producerFactory, RecordMessageConverter messageConverter) {
    
    KafkaTemplate template = new KafkaTemplate<>(producerFactory);
    template.setMessageConverter(messageConverter);
    return template;
  }

Versions used :
spring-integration-kafka is 2.3.0
kafka-client is 0.11.0.1
kavka-avro-serializer is 3.3.0

> Kafka Producer - initTransaction forever waiting
> ------------------------------------------------
>
>                 Key: KAFKA-6267
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6267
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.11.0.1, 0.11.0.2
>            Reporter: Silvio Papa
>            Priority: Blocker
>         Attachments: producer.JPG
>
>
> In code of attached image, the producer remains forever awaiting in 
> initTransaction with default configuration of broker



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to