yashmayya commented on code in PR #12984: URL: https://github.com/apache/kafka/pull/12984#discussion_r1093606834
########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java: ########## @@ -711,9 +768,35 @@ KafkaBasedLog<String, byte[]> setupAndCreateKafkaBasedLog(String topic, final Wo return createKafkaBasedLog(topic, producerProps, consumerProps, new ConsumeCallback(), topicDescription, adminSupplier); } - private void sendPrivileged(String key, byte[] value) { + /** + * Send a single record to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be + * successfully invoked before calling this method if this store is configured to use a fencable writer. + * @param key the record key + * @param value the record value + * @param timer Timer bounding how long this method can block. The timer is updated before the method returns. + */ + private void sendPrivileged(String key, byte[] value, Timer timer) throws ExecutionException, InterruptedException, TimeoutException { + sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, value)), timer); + } + + /** + * Send one or more records to the config topic synchronously. Note that {@link #claimWritePrivileges()} must be + * successfully invoked before calling this method if this store is configured to use a fencable writer. + * @param keyValues the list of producer record key/value pairs + * @param timer Timer bounding how long this method can block. The timer is updated before the method returns. + */ + private void sendPrivileged(List<ProducerKeyValue> keyValues, Timer timer) throws ExecutionException, InterruptedException, TimeoutException { if (!usesFencableWriter) { - configLog.send(key, value); + List<Future<RecordMetadata>> producerFutures = new ArrayList<>(); + keyValues.forEach( + keyValue -> producerFutures.add(configLog.send(keyValue.key, keyValue.value)) + ); + + for (Future<RecordMetadata> future : producerFutures) { Review Comment: Done, but just curious about when the producer send might block? The doc says that it should just essentially add the record to the record buffer and return? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org