yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1093606834


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -711,9 +768,35 @@ KafkaBasedLog<String, byte[]> 
setupAndCreateKafkaBasedLog(String topic, final Wo
         return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminSupplier);
     }
 
-    private void sendPrivileged(String key, byte[] value) {
+    /**
+     * Send a single record to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+     * @param key the record key
+     * @param value the record value
+     * @param timer Timer bounding how long this method can block. The timer 
is updated before the method returns.
+     */
+    private void sendPrivileged(String key, byte[] value, Timer timer) throws 
ExecutionException, InterruptedException, TimeoutException {
+        sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, 
value)), timer);
+    }
+
+    /**
+     * Send one or more records to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+     * @param keyValues the list of producer record key/value pairs
+     * @param timer Timer bounding how long this method can block. The timer 
is updated before the method returns.
+     */
+    private void sendPrivileged(List<ProducerKeyValue> keyValues, Timer timer) 
throws ExecutionException, InterruptedException, TimeoutException {
         if (!usesFencableWriter) {
-            configLog.send(key, value);
+            List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
+            keyValues.forEach(
+                    keyValue -> 
producerFutures.add(configLog.send(keyValue.key, keyValue.value))
+            );
+
+            for (Future<RecordMetadata> future : producerFutures) {

Review Comment:
   Done, but just curious about when the producer send might block? The doc 
says that it should just essentially add the record to the record buffer and 
return?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to