C0urante commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1084555816


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -711,9 +742,9 @@ KafkaBasedLog<String, byte[]> 
setupAndCreateKafkaBasedLog(String topic, final Wo
         return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminSupplier);
     }
 
-    private void sendPrivileged(String key, byte[] value) {
+    private void sendPrivileged(String key, byte[] value) throws 
ExecutionException, InterruptedException {
         if (!usesFencableWriter) {
-            configLog.send(key, value);
+            configLog.send(key, value).get();

Review Comment:
   Unbounded waiting for the record to send doesn't seem very safe here. It 
looks like we were already trying to achieve synchronous writes by immediately 
performing a read-to-end after the call to `configLog::send` with a timeout of 
`READ_TO_END_TIMEOUT_MS`; could we use that same timeout while waiting for 
producer send futures to be completed (possibly after renaming to reflect the 
new purpose of the timeout)?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -711,9 +742,9 @@ KafkaBasedLog<String, byte[]> 
setupAndCreateKafkaBasedLog(String topic, final Wo
         return createKafkaBasedLog(topic, producerProps, consumerProps, new 
ConsumeCallback(), topicDescription, adminSupplier);
     }
 
-    private void sendPrivileged(String key, byte[] value) {
+    private void sendPrivileged(String key, byte[] value) throws 
ExecutionException, InterruptedException {
         if (!usesFencableWriter) {
-            configLog.send(key, value);
+            configLog.send(key, value).get();

Review Comment:
   Also, making this synchronously await the producer future's completion has 
the potential to slow things down for operations that involve writing multiple 
records to the config topic (such as [deleting a 
connector](https://github.com/apache/kafka/blob/b2cb546fba03bbdc4054c7d120b0b2654c7cf34e/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L493-L494)).
   
   This is already the case with exactly-once support enabled since we perform 
each write in its own transaction, but while we're in the neighborhood, if 
there's an easy way to group together producer sends before awaiting the 
completion of any of their futures, it'd be nice. Not going to block on this, 
though.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -712,8 +733,16 @@ KafkaBasedLog<String, byte[]> 
setupAndCreateKafkaBasedLog(String topic, final Wo
     }
 
     private void sendPrivileged(String key, byte[] value) {
+        sendPrivileged(key, value, null);
+    }
+
+    private void sendPrivileged(String key, byte[] value, Callback<Void> 
callback) {
         if (!usesFencableWriter) {
-            configLog.send(key, value);

Review Comment:
   Do we still see the cause of the failure to send in the REST response with 
the current changes? It looks like we're only going to include the generic 
"Failed to... to/from Kafka" message, or the privileged write failure message.
   
   I think it'd be nice to include more detail on the cause of the failure, but 
if that's too invasive or difficult to get quite right, it can be left as a 
follow-up item.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to