gharris1727 commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1048754327


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1058,14 +1058,20 @@ public void putConnectorConfig(final String connName, 
final Map<String, String>
                             }
 
                             log.trace("Submitting connector config {} {} {}", 
connName, allowReplace, configState.connectors());
-                            writeToConfigTopicAsLeader(() -> 
configBackingStore.putConnectorConfig(connName, config));
-
-                            // Note that we use the updated connector config 
despite the fact that we don't have an updated
-                            // snapshot yet. The existing task info should 
still be accurate.
-                            ConnectorInfo info = new ConnectorInfo(connName, 
config, configState.tasks(connName),
-                                // validateConnectorConfig have checked the 
existence of CONNECTOR_CLASS_CONFIG
-                                connectorType(config));
-                            callback.onCompletion(null, new Created<>(!exists, 
info));
+                            Callback<Void> cb = (err, result) -> {
+                                if (err != null) {
+                                    // producer send error
+                                    callback.onCompletion(err, null);
+                                } else {
+                                    // Note that we use the updated connector 
config despite the fact that we don't have an updated
+                                    // snapshot yet. The existing task info 
should still be accurate.
+                                    callback.onCompletion(null, new 
Created<>(!exists, new ConnectorInfo(connName, config, 
configState.tasks(connName),
+                                            // validateConnectorConfig have 
checked the existence of CONNECTOR_CLASS_CONFIG

Review Comment:
   this comment is stale and can be removed
   (i forgot to remove it sorry)
   ```suggestion
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) {
 
         try {
             fencableProducer.beginTransaction();
-            fencableProducer.send(new ProducerRecord<>(topic, key, value));
+            fencableProducer.send(new ProducerRecord<>(topic, key, value), 
(metadata, exception) -> {

Review Comment:
   I believe here we're intentionally relying on this behavior of 
commitTransaction to propagate errors:
   > Further, if any of the {@link #send(ProducerRecord)} calls which were part 
of the transaction hit irrecoverable
   > errors, this method will throw the last received exception immediately and 
the transaction will not be committed.
   > So all {@link #send(ProducerRecord)} calls in a transaction must succeed 
in order for this method to succeed.
   
   I'm not sure if we're in danger of double-completing the callback, and 
whether semantically the callback should be completed after the 
commitTransaction completes or an exception is thrown from commitTransaction.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -712,8 +733,16 @@ KafkaBasedLog<String, byte[]> 
setupAndCreateKafkaBasedLog(String topic, final Wo
     }
 
     private void sendPrivileged(String key, byte[] value) {
+        sendPrivileged(key, value, null);
+    }
+
+    private void sendPrivileged(String key, byte[] value, Callback<Void> 
callback) {
         if (!usesFencableWriter) {
-            configLog.send(key, value);

Review Comment:
   The only other call-site for the send(String, byte[]) is now putTargetState, 
does a similar fix apply there?
   Perhaps this blind send call is not safe and should be eliminated. Perhaps 
we can also enforce non-null callbacks. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to