yashmayya commented on code in PR #12984:
URL: https://github.com/apache/kafka/pull/12984#discussion_r1046722667
##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConfigBackingStore.java:
##########
@@ -51,8 +52,9 @@ public interface ConfigBackingStore {
* Update the configuration for a connector.
* @param connector name of the connector
* @param properties the connector configuration
+ * @param callback the callback to be invoked after the put is complete;
can be {@code null} if no callback is desired
*/
- void putConnectorConfig(String connector, Map<String, String> properties);
+ void putConnectorConfig(String connector, Map<String, String> properties,
Callback<Void> callback);
Review Comment:
Could potentially add a new overloaded method in `KafkaConfigBackingStore`
and avoid touching this interface method although that would require
refactoring `AbstractHerder` to make it generic in order to allow
`DistributedHerder` to access an instance of `KafkaConfigBackingStore` rather
than `ConfigBackingStore`. However, that would be a much more noisy change IMO.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java:
##########
@@ -1058,14 +1058,20 @@ public void putConnectorConfig(final String connName,
final Map<String, String>
}
log.trace("Submitting connector config {} {} {}",
connName, allowReplace, configState.connectors());
- writeToConfigTopicAsLeader(() ->
configBackingStore.putConnectorConfig(connName, config));
-
- // Note that we use the updated connector config
despite the fact that we don't have an updated
- // snapshot yet. The existing task info should
still be accurate.
- ConnectorInfo info = new ConnectorInfo(connName,
config, configState.tasks(connName),
- // validateConnectorConfig have checked the
existence of CONNECTOR_CLASS_CONFIG
- connectorType(config));
- callback.onCompletion(null, new Created<>(!exists,
info));
+ Callback<Void> cb = (err, result) -> {
Review Comment:
Another option could be to directly call `callback.onCompletion` in
`KafkaConfigBackingStore::putConnectorConfig` itself - i.e. construct the
`Created<ConnectorInfo>` object there using its in-memory maps directly rather
than the snapshot; this would avoid this double callback kinda mechanism but
would need some other refactoring.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]