gharris1727 commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1048754327
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1058,14 +1058,20 @@ public void putConnectorConfig(final String connName,
final Map<String, String>
}
log.trace("Submitting connector config {} {} {}",
connName, allowReplace, configState.connectors());
- writeToConfigTopicAsLeader(() ->
configBackingStore.putConnectorConfig(connName, config));
-
- // Note that we use the updated connector config
despite the fact that we don't have an updated
- // snapshot yet. The existing task info should
still be accurate.
- ConnectorInfo info = new ConnectorInfo(connName,
config, configState.tasks(connName),
- // validateConnectorConfig have checked the
existence of CONNECTOR_CLASS_CONFIG
- connectorType(config));
- callback.onCompletion(null, new Created<>(!exists,
info));
+ Callback<Void> cb = (err, result) -> {
+ if (err != null) {
+ // producer send error
+ callback.onCompletion(err, null);
+ } else {
+ // Note that we use the updated connector
config despite the fact that we don't have an updated
+ // snapshot yet. The existing task info
should still be accurate.
+ callback.onCompletion(null, new
Created<>(!exists, new ConnectorInfo(connName, config,
configState.tasks(connName),
+ // validateConnectorConfig have
checked the existence of CONNECTOR_CLASS_CONFIG
Review Comment:
this comment is stale and can be removed
(i forgot to remove it sorry)
```suggestion
```
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -723,7 +752,11 @@ private void sendPrivileged(String key, byte[] value) {
try {
fencableProducer.beginTransaction();
- fencableProducer.send(new ProducerRecord<>(topic, key, value));
+ fencableProducer.send(new ProducerRecord<>(topic, key, value),
(metadata, exception) -> {
Review Comment:
I believe here we're intentionally relying on this behavior of
commitTransaction to propagate errors:
> Further, if any of the {@link #send(ProducerRecord)} calls which were part
of the transaction hit irrecoverable
> errors, this method will throw the last received exception immediately and
the transaction will not be committed.
> So all {@link #send(ProducerRecord)} calls in a transaction must succeed
in order for this method to succeed.
I'm not sure if we're in danger of double-completing the callback, and
whether semantically the callback should be completed after the
commitTransaction completes or an exception is thrown from commitTransaction.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -712,8 +733,16 @@ KafkaBasedLog<String, byte[]>
setupAndCreateKafkaBasedLog(String topic, final Wo
}
private void sendPrivileged(String key, byte[] value) {
+ sendPrivileged(key, value, null);
+ }
+
+ private void sendPrivileged(String key, byte[] value, Callback<Void>
callback) {
if (!usesFencableWriter) {
- configLog.send(key, value);
Review Comment:
The only other call-site for the send(String, byte[]) is now putTargetState,
does a similar fix apply there?
Perhaps this blind send call is not safe and should be eliminated. Perhaps
we can also enforce non-null callbacks. WDYT?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]