C0urante commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1093433981


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -711,9 +768,35 @@ KafkaBasedLog<String, byte[]> 
setupAndCreateKafkaBasedLog(String topic, final Wo
         return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminSupplier);
     }
 
-    private void sendPrivileged(String key, byte[] value) {
+    /**
+     * Send a single record to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+     * @param key the record key
+     * @param value the record value
+     * @param timer Timer bounding how long this method can block. The timer 
is updated before the method returns.
+     */
+    private void sendPrivileged(String key, byte[] value, Timer timer) throws 
ExecutionException, InterruptedException, TimeoutException {
+        sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, 
value)), timer);
+    }
+
+    /**
+     * Send one or more records to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+     * @param keyValues the list of producer record key/value pairs
+     * @param timer Timer bounding how long this method can block. The timer 
is updated before the method returns.

Review Comment:
   Thank you for taking care to call out that the timer is updated before 
returning; it's an important detail.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -711,9 +768,35 @@ KafkaBasedLog<String, byte[]> 
setupAndCreateKafkaBasedLog(String topic, final Wo
         return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminSupplier);
     }
 
-    private void sendPrivileged(String key, byte[] value) {
+    /**
+     * Send a single record to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+     * @param key the record key
+     * @param value the record value
+     * @param timer Timer bounding how long this method can block. The timer 
is updated before the method returns.
+     */
+    private void sendPrivileged(String key, byte[] value, Timer timer) throws 
ExecutionException, InterruptedException, TimeoutException {
+        sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, 
value)), timer);
+    }
+
+    /**
+     * Send one or more records to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+     * @param keyValues the list of producer record key/value pairs
+     * @param timer Timer bounding how long this method can block. The timer 
is updated before the method returns.
+     */
+    private void sendPrivileged(List<ProducerKeyValue> keyValues, Timer timer) 
throws ExecutionException, InterruptedException, TimeoutException {
         if (!usesFencableWriter) {
-            configLog.send(key, value);
+            List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
+            keyValues.forEach(
+                    keyValue -> 
producerFutures.add(configLog.send(keyValue.key, keyValue.value))
+            );
+
+            for (Future<RecordMetadata> future : producerFutures) {

Review Comment:
   We should update the timer before waiting on the first future, since in some 
rare cases `configLog::send` may actually block for a bit:
   
   ```suggestion
               timer.update();
               for (Future<RecordMetadata> future : producerFutures) {
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -723,12 +806,26 @@ private void sendPrivileged(String key, byte[] value) {
 
         try {
             fencableProducer.beginTransaction();
-            fencableProducer.send(new ProducerRecord<>(topic, key, value));
+            keyValues.forEach(
+                    keyValue -> fencableProducer.send(new 
ProducerRecord<>(topic, keyValue.key, keyValue.value))
+            );
             fencableProducer.commitTransaction();
         } catch (Exception e) {
             log.warn("Failed to perform fencable send to config topic", e);
             relinquishWritePrivileges();
             throw new PrivilegedWriteException("Failed to perform fencable 
send to config topic", e);
+        } finally {
+            timer.update();
+        }

Review Comment:
   I'm not sure this belongs in a `finally` block, especially since that 
introduces inconsistencies in how this method updates the timer depending on 
whether EOS source support is enabled. IMO it's fine to add inside the `try` 
block after the call to `fencableProducer::commitTransaction`



##########
clients/src/main/java/org/apache/kafka/common/utils/Timer.java:
##########
@@ -161,7 +161,7 @@ public long remainingMs() {
     /**
      * Get the current time in milliseconds. This will return the same cached 
value until the timer
      * has been updated using one of the {@link #update()} methods or {@link 
#sleep(long)} is used.
-     *
+     * <p>

Review Comment:
   In the future, please save nonessential improvements like this for a 
dedicated PR; it adds noise to the diff and makes things harder to review.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -712,8 +733,16 @@ KafkaBasedLog<String, byte[]> 
setupAndCreateKafkaBasedLog(String topic, final Wo
     }
 
     private void sendPrivileged(String key, byte[] value) {
+        sendPrivileged(key, value, null);
+    }
+
+    private void sendPrivileged(String key, byte[] value, Callback<Void> 
callback) {
         if (!usesFencableWriter) {
-            configLog.send(key, value);

Review Comment:
   (Resolving this convo as it's taking up way too much real estate in the 
GitHub UI)



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -711,9 +752,32 @@ KafkaBasedLog<String, byte[]> 
setupAndCreateKafkaBasedLog(String topic, final Wo
         return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminSupplier);
     }
 
-    private void sendPrivileged(String key, byte[] value) {
+    /**
+     * Send a single record to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+     * @param key the record key
+     * @param value the record value
+     */
+    private void sendPrivileged(String key, byte[] value) throws 
ExecutionException, InterruptedException, TimeoutException {
+        sendPrivileged(Collections.singletonList(new ProducerKeyValue(key, 
value)));
+    }
+
+    /**
+     * Send one or more records to the config topic synchronously. Note that 
{@link #claimWritePrivileges()} must be
+     * successfully invoked before calling this method if this store is 
configured to use a fencable writer.
+     * @param keyValues the list of producer record key/value pairs
+     */
+    private void sendPrivileged(List<ProducerKeyValue> keyValues) throws 
ExecutionException, InterruptedException, TimeoutException {
         if (!usesFencableWriter) {
-            configLog.send(key, value);
+            List<Future<RecordMetadata>> producerFutures = new ArrayList<>();
+            keyValues.forEach(
+                    keyValue -> 
producerFutures.add(configLog.send(keyValue.key, keyValue.value))
+            );
+
+            for (Future<RecordMetadata> future : producerFutures) {
+                future.get(READ_WRITE_TIMEOUT_MS, TimeUnit.MILLISECONDS);

Review Comment:
   Yeah, that's fine. The less time we spend blocking the tick thread, the 
better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to