fvaleri commented on code in PR #13515: URL: https://github.com/apache/kafka/pull/13515#discussion_r1183959056
########## examples/src/main/java/kafka/examples/Producer.java: ########## @@ -21,133 +21,159 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.AuthorizationException; +import org.apache.kafka.common.errors.FencedInstanceIdException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; /** - * Demo producer that demonstrate two modes of KafkaProducer. - * If the user uses the Async mode: The messages will be printed to stdout upon successful completion - * If the user uses the sync mode (isAsync = false): Each send loop will block until completion. + * A simple producer thread supporting two send modes: + * - Async mode (default): records are sent without waiting for the response. + * - Sync mode: each send operation blocks waiting for the response. */ public class Producer extends Thread { - private final KafkaProducer<Integer, String> producer; + private final String bootstrapServers; private final String topic; - private final Boolean isAsync; - private int numRecords; + private final boolean isAsync; + private final String transactionalId; + private final boolean enableIdempotency; + private final int numRecords; + private final int transactionTimeoutMs; private final CountDownLatch latch; + private volatile boolean closed; - public Producer(final String topic, - final Boolean isAsync, - final String transactionalId, - final boolean enableIdempotency, - final int numRecords, - final int transactionTimeoutMs, - final CountDownLatch latch) { - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT); - props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - if (transactionTimeoutMs > 0) { - props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs); - } - if (transactionalId != null) { - props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId); - } - props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency); - producer = new KafkaProducer<>(props); - + public Producer(String threadName, + String bootstrapServers, + String topic, + boolean isAsync, + String transactionalId, + boolean enableIdempotency, + int numRecords, + int transactionTimeoutMs, + CountDownLatch latch) { + super(threadName); + this.bootstrapServers = bootstrapServers; this.topic = topic; this.isAsync = isAsync; + this.transactionalId = transactionalId; + this.enableIdempotency = enableIdempotency; this.numRecords = numRecords; + this.transactionTimeoutMs = transactionTimeoutMs; this.latch = latch; } - KafkaProducer<Integer, String> get() { - return producer; - } - @Override public void run() { - int messageKey = 0; - int recordsSent = 0; - try { - while (recordsSent < numRecords) { - final long currentTimeMs = System.currentTimeMillis(); - produceOnce(messageKey, recordsSent, currentTimeMs); - messageKey += 2; - recordsSent += 1; + int key = 0; + int sentRecords = 0; + // the producer instance is thread safe + try (KafkaProducer<Integer, String> producer = createKafkaProducer()) { + while (!closed && sentRecords < numRecords) { + if (isAsync) { + asyncSend(producer, key, "test"); + } else { + syncSend(producer, key, "test"); Review Comment: Sure. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org