Hi Mickael, I'm not Chris but I hope I can still respond to your questions :)
1a. This system behaves best when connectors and tasks are known in advance, but I don't think that is a requirement. I clarified that the worker configurations are permissive of non-existent jobs, which allows for bootstrapping an empty cluster and concurrent worker & job provisioning. 1b. I think the wildcard suggestion is similar to the rejected alternative "implement a more complex worker selector...". While we could add that sort of functionality, I think it is more difficult to express and reason about. For example, with the current configuration, users can ensure that at most one job is assigned to a JVM. If a user wanted to use wildcards with the same constraint, those wildcards would need to express "at most one of <matching job>". 1c. The javadoc doesn't say what happens to the additional tasks, but I think we could start enforcing it if it makes the fixed-static-assignment use-case more reliable. I have only ever seen additional tasks emitted due to rather serious bugs in connector implementations. 2. Yes, I've added that information to the proposal. I used the `GET /connectors/{connector}` API as it doesn't list the task configs, but it performs the same function. 3. Do you mean the resource limits and requests? This is the rejected alternative "Model per-job resource constraints and measure resource utilization within a single JVM", let me know if the reasoning there is not convincing. 4. I have not explored rack awareness for Connect in detail. I expect that rack awareness would require some compromise on the "least disruption" property of incremental cooperative rebalancing for shared JVMs. We could discuss that in a future KIP. As for this proposal, because the management layer is responsible for placing workers and placing tasks on those workers, it would also be responsible for implementing rack-awareness in isolated JVMs. Thanks a lot! Greg On Tue, Oct 10, 2023 at 10:08 AM Greg Harris <greg.har...@aiven.io> wrote: > > Hey Tom, > > Thanks for your comments! I appreciate your insight here and on the > Strimzi proposal. > > 1. Fixed, thanks. > > 2. We haven't removed a protocol yet, so workers still announce that > they can use the old protocol versions, and the group coordinator will > choose the highest mutually supported protocol version. This would be > true even if static was not a superset of sessioned. > For the static feature, having workers in the cluster that do not > support static assignments will cause the common version to not > support static assignments, and will mean that workers which announce > static.connectors and static.tasks may be assigned jobs outside of > their static assignments, which is called out in the KIP. > If the version gap is wider, then the other protocol-controlled > features will also change behavior (i.e. no longer using > incremental-cooperative-assignments, or not including REST > authentication headers). There would be no difference to the behavior > of this feature. > > 3. Arbitrary here is specifically meant to say that one of the workers > will be chosen, but outside implementations should not rely on the > method of choosing. This allows us to change/improve the assignment > algorithm in the future without a KIP. The reason I've specified it > this way is because I'd like to choose the "least loaded" worker, but > I don't know if that is well-defined or tractable to compute. I expect > that in practice we will use some heuristics to tie-break, but won't > be able to ensure that the algorithm will be deterministic and/or > optimal. If we find an improvement to the heuristics, I think that we > should be able to implement them without a KIP. > > 4. I added a section for "Choosing static assignments" that details > both the "dynamic" and "fixed" use-cases. With regards to tasks.max: > The javadoc for the `taskConfigs` method does state that it > "produc[es] at most {@code maxTasks} configurations," so static > inference of the task IDs is valid. I think that statically inferring > the number of tasks will always come with the trade-off that you can > over-provision task workers, it seems fundamental. I think it's a > trade-off that users of this feature will make for themselves, and > both methods will be supported. > > 5. No I don't think that it is a safe assumption, as there are > certainly connectors out there which violate it. For example, sink > connectors with a heterogeneous set of topics, and source connectors > which distribute a dynamic heterogeneous workload internally. I > believe there are a few connectors with a distinct "task-0" which > consumes more resources than the other tasks which would have some > index-stability. Index is chosen because it is the only distinguishing > feature we have without inspecting task configurations, and have > control over via coordination and assignments. > > Thanks again! > Greg > > > On Tue, Oct 10, 2023 at 8:05 AM Mickael Maison <mickael.mai...@gmail.com> > wrote: > > > > Hi Chris, > > > > Thanks for the KIP! > > > > 1) With this mechanism it seems Connect administrators have to know in > > advanced the names of connectors that will be running. As far as I can > > tell, static.connectors and static.tasks will require restarting the > > worker to change. Also you don't specify if these configurations can > > accept wildcards or not. This could be useful for tasks as it's hard > > to know in advance how many tasks will exist, especially as connectors > > can in theory ignore tasks.max (I wonder if we should make the runtime > > enforce that config). > > > > 2) The KIP does not explicitly mention a mechanism for management > > systems to figure out what to do. As far as I can tell (and looking at > > the proposal in Strimzi), these systems will have to poll the REST API > > to do so. If I understand correctly, management systems would first > > have to poll the GET /connectors and then GET /connectors/{name}/tasks > > for each connector, is that right? > > > > 3) In the Strimzi proposal you mention adding custom fields to the > > Connector resource to instruct it what to do. I wonder if these should > > be added directly in the connector configuration, rather than just > > exist in the management system. That way all the data necessary to > > make decisions would be available in the same place, the REST API. > > > > 4) This is not something you mention in the KIP and I don't think it's > > a blocker for this KIP but I wonder if you thought about rack > > awareness too. I'm bringing it as I think it's a similar concern when > > it comes to deciding where to assign tasks. > > > > Thanks, > > Mickael > > > > > > On Tue, Oct 10, 2023 at 8:39 AM Tom Bentley <tbent...@redhat.com> wrote: > > > > > > Hi Greg, > > > > > > Many thanks for the KIP. Here are a few initial questions > > > > > > 1. Incomplete sentence: "But Connect is not situated to be able to manage > > > resources directly, as workers are given a fixed " > > > 2. You explain how sessioned is now a subset of static, but what happens > > > in > > > a cluster where some workers are using static and some are using either > > > eager or compatible? > > > 3. "Assign each unassigned static job to a static worker which specifies > > > that job, choosing arbitrarily if there are multiple valid workers." I > > > think there might be less ambiguous words than "arbitrary" to specify this > > > behaviour. Hashing the task name would _appear_ pretty arbitrary to the > > > user, but would be deterministic. Picking at random would be > > > non-deterministic. Even better if you have a rationale. > > > 4. You don't describe how a user, or automated system, starting with a set > > > of connectors, should find out the tasks that they want to run. This > > > relates to the contract of > > > org.apache.kafka.connect.connector.Connector#taskConfigs(int maxTasks). > > > AFAIK (please correct me if I'm wrong, because it's a long time since I > > > looked at this code) there's nothing that validates that the returned list > > > has at most the `maxTasks` and connectors can, of course, return fewer > > > than > > > that many tasks. So without deeper knowledge of a particular connector > > > it's > > > not clear to the user/operator how to configure their static workers and > > > static assignments. > > > 5. Is there a lurking assumption that task indices are stable? E.g. that > > > the task with index 3 will always be the resource-intensive one. I can see > > > that that would often be a reliable assumption, but it's not clear to me > > > that it is always the case. > > > > > > Thanks again, > > > > > > Tom > > > > > > On Fri, 6 Oct 2023 at 12:36, Greg Harris <greg.har...@aiven.io.invalid> > > > wrote: > > > > > > > Hey everyone! > > > > > > > > I'd like to propose an improvement to Kafka Connect's scheduling > > > > algorithm, with the goal of improving the operability of connect > > > > clusters through resource isolation. > > > > > > > > Find the KIP here: > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-987%3A+Connect+Static+Assignments > > > > > > > > This feature is primarily intended to be consumed by cluster > > > > management systems, so I've opened a sister proposal in the Strimzi > > > > project that uses this feature to provide user-facing resource > > > > isolation. The base feature is generic, so it is usable manually and > > > > with cluster management systems other than Strimzi. > > > > > > > > Find the proposal here: https://github.com/strimzi/proposals/pull/96 > > > > > > > > Thanks! > > > > Greg Harris > > > > > > > >