Hi Mickael,

I'm not Chris but I hope I can still respond to your questions :)

1a. This system behaves best when connectors and tasks are known in
advance, but I don't think that is a requirement. I clarified that the
worker configurations are permissive of non-existent jobs, which
allows for bootstrapping an empty cluster and concurrent worker & job
provisioning.
1b. I think the wildcard suggestion is similar to the rejected
alternative "implement a more complex worker selector...". While we
could add that sort of functionality, I think it is more difficult to
express and reason about. For example, with the current configuration,
users can ensure that at most one job is assigned to a JVM. If a user
wanted to use wildcards with the same constraint, those wildcards
would need to express "at most one of <matching job>".
1c. The javadoc doesn't say what happens to the additional tasks, but
I think we could start enforcing it if it makes the
fixed-static-assignment use-case more reliable. I have only ever seen
additional tasks emitted due to rather serious bugs in connector
implementations.

2. Yes, I've added that information to the proposal. I used the `GET
/connectors/{connector}` API as it doesn't list the task configs, but
it performs the same function.

3. Do you mean the resource limits and requests? This is the rejected
alternative "Model per-job resource constraints and measure resource
utilization within a single JVM", let me know if the reasoning there
is not convincing.

4. I have not explored rack awareness for Connect in detail. I expect
that rack awareness would require some compromise on the "least
disruption" property of incremental cooperative rebalancing for shared
JVMs. We could discuss that in a future KIP.
As for this proposal, because the management layer is responsible for
placing workers and placing tasks on those workers, it would also be
responsible for implementing rack-awareness in isolated JVMs.

Thanks a lot!
Greg

On Tue, Oct 10, 2023 at 10:08 AM Greg Harris <greg.har...@aiven.io> wrote:
>
> Hey Tom,
>
> Thanks for your comments! I appreciate your insight here and on the
> Strimzi proposal.
>
> 1. Fixed, thanks.
>
> 2. We haven't removed a protocol yet, so workers still announce that
> they can use the old protocol versions, and the group coordinator will
> choose the highest mutually supported protocol version. This would be
> true even if static was not a superset of sessioned.
> For the static feature, having workers in the cluster that do not
> support static assignments will cause the common version to not
> support static assignments, and will mean that workers which announce
> static.connectors and static.tasks may be assigned jobs outside of
> their static assignments, which is called out in the KIP.
> If the version gap is wider, then the other protocol-controlled
> features will also change behavior (i.e. no longer using
> incremental-cooperative-assignments, or not including REST
> authentication headers). There would be no difference to the behavior
> of this feature.
>
> 3. Arbitrary here is specifically meant to say that one of the workers
> will be chosen, but outside implementations should not rely on the
> method of choosing. This allows us to change/improve the assignment
> algorithm in the future without a KIP. The reason I've specified it
> this way is because I'd like to choose the "least loaded" worker, but
> I don't know if that is well-defined or tractable to compute. I expect
> that in practice we will use some heuristics to tie-break, but won't
> be able to ensure that the algorithm will be deterministic and/or
> optimal. If we find an improvement to the heuristics, I think that we
> should be able to implement them without a KIP.
>
> 4. I added a section for "Choosing static assignments" that details
> both the "dynamic" and "fixed" use-cases. With regards to tasks.max:
> The javadoc for the `taskConfigs` method does state that it
> "produc[es] at most {@code maxTasks} configurations," so static
> inference of the task IDs is valid. I think that statically inferring
> the number of tasks will always come with the trade-off that you can
> over-provision task workers, it seems fundamental. I think it's a
> trade-off that users of this feature will make for themselves, and
> both methods will be supported.
>
> 5. No I don't think that it is a safe assumption, as there are
> certainly connectors out there which violate it. For example, sink
> connectors with a heterogeneous set of topics, and source connectors
> which distribute a dynamic heterogeneous workload internally. I
> believe there are a few connectors with a distinct "task-0" which
> consumes more resources than the other tasks which would have some
> index-stability. Index is chosen because it is the only distinguishing
> feature we have without inspecting task configurations, and have
> control over via coordination and assignments.
>
> Thanks again!
> Greg
>
>
> On Tue, Oct 10, 2023 at 8:05 AM Mickael Maison <mickael.mai...@gmail.com> 
> wrote:
> >
> > Hi Chris,
> >
> > Thanks for the KIP!
> >
> > 1) With this mechanism it seems Connect administrators have to know in
> > advanced the names of connectors that will be running. As far as I can
> > tell, static.connectors and static.tasks will require restarting the
> > worker to change. Also you don't specify if these configurations can
> > accept wildcards or not. This could be useful for tasks as it's hard
> > to know in advance how many tasks will exist, especially as connectors
> > can in theory ignore tasks.max (I wonder if we should make the runtime
> > enforce that config).
> >
> > 2) The KIP does not explicitly mention a mechanism for management
> > systems to figure out what to do. As far as I can tell (and looking at
> > the proposal in Strimzi), these systems will have to poll the REST API
> > to do so. If I understand correctly, management systems would first
> > have to poll the GET /connectors and then GET /connectors/{name}/tasks
> > for each connector, is that right?
> >
> > 3) In the Strimzi proposal you mention adding custom fields to the
> > Connector resource to instruct it what to do. I wonder if these should
> > be added directly in the connector configuration, rather than just
> > exist in the management system. That way all the data necessary to
> > make decisions would be available in the same place, the REST API.
> >
> > 4) This is not something you mention in the KIP and I don't think it's
> > a blocker for this KIP but I wonder if you thought about rack
> > awareness too. I'm bringing it as I think it's a similar concern when
> > it comes to deciding where to assign tasks.
> >
> > Thanks,
> > Mickael
> >
> >
> > On Tue, Oct 10, 2023 at 8:39 AM Tom Bentley <tbent...@redhat.com> wrote:
> > >
> > > Hi Greg,
> > >
> > > Many thanks for the KIP. Here are a few initial questions
> > >
> > > 1. Incomplete sentence: "But Connect is not situated to be able to manage
> > > resources directly, as workers are given a fixed "
> > > 2. You explain how sessioned is now a subset of static, but what happens 
> > > in
> > > a cluster where some workers are using static and some are using either
> > > eager or compatible?
> > > 3. "Assign each unassigned static job to a static worker which specifies
> > > that job, choosing arbitrarily if there are multiple valid workers." I
> > > think there might be less ambiguous words than "arbitrary" to specify this
> > > behaviour. Hashing the task name would _appear_ pretty arbitrary to the
> > > user, but would be deterministic. Picking at random would be
> > > non-deterministic. Even better if you have a rationale.
> > > 4. You don't describe how a user, or automated system, starting with a set
> > > of connectors, should find out the tasks that they want to run. This
> > > relates to the contract of
> > > org.apache.kafka.connect.connector.Connector#taskConfigs(int maxTasks).
> > > AFAIK (please correct me if I'm wrong, because it's a long time since I
> > > looked at this code) there's nothing that validates that the returned list
> > > has at most the `maxTasks` and connectors can, of course, return fewer 
> > > than
> > > that many tasks. So without deeper knowledge of a particular connector 
> > > it's
> > > not clear to the user/operator how to configure their static workers and
> > > static assignments.
> > > 5. Is there a lurking assumption that task indices are stable? E.g. that
> > > the task with index 3 will always be the resource-intensive one. I can see
> > > that that would often be a reliable assumption, but it's not clear to me
> > > that it is always the case.
> > >
> > > Thanks again,
> > >
> > > Tom
> > >
> > > On Fri, 6 Oct 2023 at 12:36, Greg Harris <greg.har...@aiven.io.invalid>
> > > wrote:
> > >
> > > > Hey everyone!
> > > >
> > > > I'd like to propose an improvement to Kafka Connect's scheduling
> > > > algorithm, with the goal of improving the operability of connect
> > > > clusters through resource isolation.
> > > >
> > > > Find the KIP here:
> > > >
> > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-987%3A+Connect+Static+Assignments
> > > >
> > > > This feature is primarily intended to be consumed by cluster
> > > > management systems, so I've opened a sister proposal in the Strimzi
> > > > project that uses this feature to provide user-facing resource
> > > > isolation. The base feature is generic, so it is usable manually and
> > > > with cluster management systems other than Strimzi.
> > > >
> > > > Find the proposal here: https://github.com/strimzi/proposals/pull/96
> > > >
> > > > Thanks!
> > > > Greg Harris
> > > >
> > > >

Reply via email to