C0urante commented on code in PR #11779:
URL: https://github.com/apache/kafka/pull/11779#discussion_r891379125
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java:
##########
@@ -138,6 +138,18 @@ public interface Herder {
*/
void putTaskConfigs(String connName, List<Map<String, String>> configs,
Callback<Void> callback, InternalRequestSignature requestSignature);
+ /**
+ * Fence out any older task generations for a source connector, and then
write a record to the config topic
+ * indicating that it is safe to bring up a new generation of tasks. If
that record is already present, do nothing
+ * and invoke the callback successfully.
+ * @param connName the name of the connector to fence out, which must
refer to a source connector; if the
+ * connector does not exist or is not a source connector,
the callback will be invoked with an error
+ * @param callback callback to invoke upon completion
+ * @param requestSignature the signature of the request made for this
connector;
+ * may be null if no signature was provided
+ */
+ void fenceZombies(String connName, Callback<Void> callback,
InternalRequestSignature requestSignature);
Review Comment:
Fine by me 👍
##########
gradle/spotbugs-exclude.xml:
##########
@@ -311,6 +311,16 @@ For a detailed description of spotbugs bug categories, see
https://spotbugs.read
<Bug pattern="SF_SWITCH_FALLTHROUGH"/>
</Match>
+ <Match>
+ <!--
+ Temporarily suppress warnings about unused private methods (will
be used in a subsequent pull request)
+ TODO: Remove this before merging to trunk
Review Comment:
No, but I will update https://github.com/apache/kafka/pull/11782 to remove
it as soon as this is merged.
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java:
##########
@@ -320,6 +320,18 @@ public void putTaskConfigs(final @PathParam("connector")
String connector,
completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks",
"POST", headers, taskConfigs, forward);
}
+ @PUT
+ @Path("/{connector}/fence")
+ public Response fenceZombies(final @PathParam("connector") String
connector,
+ final @Context HttpHeaders headers,
+ final @QueryParam("forward") Boolean forward,
+ final byte[] requestBody) throws Throwable {
+ FutureCallback<Void> cb = new FutureCallback<>();
+ herder.fenceZombies(connector, cb,
InternalRequestSignature.fromHeaders(requestBody, headers));
+ completeOrForwardRequest(cb, "/connectors/" + connector + "/fence",
"PUT", headers, requestBody, forward);
+ return Response.ok().build();
Review Comment:
This is to force a 200 OK response instead of a 204 no content response,
which would be returned otherwise. I'd just use a 204 except the KIP specifies
that this endpoint should "serve an empty-bodied 200 response" and I wanted to
stick to that.
Given that this endpoint is internal and it's a tiny detail, I'd be fine
with switch to a 204 response if it's alright with you.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]