[ 
https://issues.apache.org/jira/browse/KAFKA-19788?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhuming resolved KAFKA-19788.
-----------------------------
    Resolution: Fixed

> kafka server ClusterAuthorization don't support acks=-1 for kafka-client 
> version>3.1.0
> --------------------------------------------------------------------------------------
>
>                 Key: KAFKA-19788
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19788
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 3.1.1
>            Reporter: zhuming
>            Priority: Major
>
> *  kafka server version is 2.5.1
>  *  kafka-client version bigger than 3.1.1 
>  
> {code:java}
> import org.apache.kafka.clients.producer.KafkaProducer;
> import org.apache.kafka.clients.producer.Producer;
> import org.apache.kafka.clients.producer.ProducerRecord;
> import java.util.Properties;
> public class KafkaSendTest {
>   public static void main(String[] args) {
>     Properties props = new Properties();
>     props.put("bootstrap.servers", "xxx:9092,xxxx:9092");
>     props.put("key.serializer", 
> "org.apache.kafka.common.serialization.ByteArraySerializer");
>     props.put("value.serializer", 
> "org.apache.kafka.common.serialization.ByteArraySerializer");
>     props.put("transaction.timeout.ms", "300000");
>     props.put("acks", "-1"); // If acks=1 or acks=0 it will send successfully
>     props.put("compression.type", "lz4");
>     props.put("security.protocol", "SASL_PLAINTEXT");
>     props.put("sasl.mechanism", "SCRAM-SHA-256");
>     props.put("sasl.jaas.config", 
> "org.apache.kafka.common.security.scram.ScramLoginModule required 
> username=\"xxx\" password=\"xxxx\";");
>     Producer<byte[], byte[]> producer = new KafkaProducer<>(props);
>     try {
>       String topic = "topic1";
>       byte[] value = new byte[]{1,2}; // example
>       ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, 
> null, value);
>       producer.send(record, (metadata, exception) -> {
>         if (exception == null) {
>           System.out.printf("Sent record(key=%s value=%s) meta(partition=%d, 
> offset=%d)\n",
>                   record.key(), new String(record.value()), 
> metadata.partition(), metadata.offset());
>         } else {
>           exception.printStackTrace();
>         }
>       });
>       producer.close();
>     } catch (Exception e) {
>       e.printStackTrace();
>     }
>   }
> } {code}
> pom.xml config
> {code:java}
> <dependency>
>   <groupId>org.apache.kafka</groupId>
>   <artifactId>kafka-clients</artifactId>
>   <version>3.4.0</version>
> </dependency> {code}
>          When kafka producer acks=-1, It will throw exception.
>  
> {code:java}
> org.apache.kafka.common.KafkaException: Cannot execute transactional method 
> because we are in an error state  at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1010)
>       at 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:328)
>         at 
> org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1061)
>       at 
> org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962) 
> at 
> com.mvad.realtime.show.converter.DataEntryToRTLogConverterTest.main(DataEntryToRTLogConverterTest.java:34)Caused
>  by: org.apache.kafka.common.errors.ClusterAuthorizationException: Cluster 
> authorization failed. {code}
>           If acks=1 or acks=0 it will send successfully
> {code:java}
> Sent record(key=null ) meta(partition=6, offset=321496) {code}
>     acks=-1 is just a param, How it effects ClusterAuthorization of kafka 
> producer.
>     Is this a bug or a mechanism in itself?
>  
> If change kafka-client verison to 3.1.0. When kafka producer acks=-1, It will 
> send successfully
> {code:java}
> <dependency>
>   <groupId>org.apache.kafka</groupId>
>   <artifactId>kafka-clients</artifactId>
>   <version>3.1.0</version>
> </dependency> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to