Hi Jarek, Thanks for the review - good to have some more eyes on this.
One important thing that was not explicitly stated in the AIP (only by “side-effect” in the diagram, or the airflow.sdk code references) is that the Coordinator is entirely a worker-side construct. The scheduler is entirely unaware of how a task runs, just that it does. Point 2: queue_to_sdk is per-worker deployment config. The scheduler never sees or touches coordinator selection -- it only knows about queue in the same way it always has. The CeleryKubernetesExecutor comparison doesn't quite apply here because that was a scheduler/executor-level conflation; here the scheduler is entirely unaware coordinators exist. Adding coordinator="jdk-17" to @task.stub would require the scheduler to carry and propagate that to the worker, which is exactly the coupling we're trying to avoid. The coordinator choice is a deployment operator decision (not 100% only, but mostly), not a DAG authoring decision. This is especially true for one of the use cases I had in my presentation at Airflow Sumit where you have a separate team define the task, and you just “call it like a function”. You as the dag author don’t need, and shouldn’t know if it’s run in idk-8 or jdk-25. Point 3: Since it's per-worker config there's no shared registry for coordinators to compete in. jdk-11 vs jdk-17 is just two differently-configured workers with different queue mappings. First-match within a single worker's config is deterministic and unambiguous. Point 6: Falls out naturally -- different teams' workers have different configs. No scheduler-level shared config concern. -ash > On 11 May 2026, at 00:29, Tzu-ping Chung via dev <[email protected]> > wrote: > > The AIP has been updated to emphasise how to coordinator works with the > (non-Python) SDK artefact, and that process management (if present) and > message communication is not specified in the proposal since it is a contract > only between the two. > > I also removed all references to parsing a Java dag file to avoid any > potential conflicts to AIP-82. AIP-108 now only defines how the coordinator > should find an executable target for a stub task, and the process it goes > through to fulfil execution. > > Moving to other topics mentioned previously… > >> >> 1. The important topic already discussed is the wire protocol. While I agree >> it's not a "blocker" for this AIP, it's 100% a blocker for releasing the >> coordinator. Until we ship the first version of the coordinator we can >> **easily** change the wire protocol. Once we ship it, it's basically a done >> deal and it will be difficult to take a different path. But I do not see it >> as a blocker for this AIP. I saw early versions (not public yet—I saw >> messages from Confluence and read some early drafts) showing that the >> current choice was a well-thought-out decision. I have my own sentiments and >> experience with protobufs—they're pretty bad, and I really wouldn't like to >> use them here—but I agree this is a different discussion and shouldn't >> prevent acceptance of this AIP. This is contingent on us agreeing that we >> will not ship the coordinator until AIP-109 (which I saw was the proposal) >> is discussed and agreed upon. That discussion should happen **after** we >> complete this one and vote on this AIP; otherwise, it might get stuck in >> endless bikeshedding. I'm glad the proposal exists, and I even more glad >> it's now gated by permissions (I saw that TP, Ash, and Jason did that). Once >> we accept this one, we will open AIP-109 and discuss and agree on it well >> before we actually "ship" the first coordinator. > > I disagree this is a blocker from releasing the coordinator. Since a > language-specific coordinator is its own distribution, it does not follow > Airflow Core and SDK’s major release cycles. It can be made experimental, > and/or bump the major version for breaking changes on its own. > > However, in the meantime, some changes in Airflow Core and SDK are needed for > them to know the coordinator exists and route workload to it. The best > approach IMO is to release the base coordinator interface in Airflow 3.2 with > a provisional Java SDK and coordinator implementation (maybe using a > zero-prefixed version). This would be most helpful for us to understand > potential issues, rather than trying to make decisions in a vacuum. > > >> 3. First match coordinator choice: Choosing the first matching coordinator >> is I think, a bad choice (paired with the queue_to_coordinator mapping). It >> is simplistic and has potential problems, such as when "similar" >> coordinators effectively "compete" for Dags/Tasks. Even the very example of >> two "jdk" coordinators shows the problem. I can very easily see say "jdk-11" >> and "jdk-17" coordinators configured in the same setup. The sequence of >> setting those coordinators will determine whch one is used, which shows a >> bit of asymmetry. If you want to use "jdk-17", you must specify it, rather >> than having it matched automatically. However, I can easily imagine a >> slightly more complicated interface where the matcher would return a tuple - >> ("matches": bool, "priority": int) + some deterministic tie-break or >> conflict error if we have two identical matches. This could be a much more >> flexible solution. It would handle a much more versatile set of cases. For >> example, you could have "jdk-11/" and "jdk-17" folders in a bundle and have >> two coordinators pick which one handles the dags based on the folder name - >> without needing to specify the coordinator by "queue" or "name. In my >> opinion, that is a much better solution than the one the >> "queue_to_coordinator" tried to solve by proposing an indirection layer in >> the Dag definition. Performance-wise, the impact should be negligible. I >> hardly imagine we'd have to worry about more than a few coordinators and the >> complexity of the "can_handle_dag" implementation, which would involve >> always running "can_handle_dag" for all coordinators rather than only when >> the last coordinator matches. > > I’m not sure where the “first matching” understanding is from. (Please point > out relevant text on this; it is most likely out-of-date.) A task specifies a > queue, and then the queue is mapped to a specific coordinator. There can’t be > multiple to choose from. You can map multiple queues to use the same > coordinator, but only one coordinator can be assigned to a queue. > > >> 4. JVM process model -> unlike in Python, we do not know if we are running >> new processes or running tasks within the JVM process. This has implications >> for memory, startup time overhead, JIT, isolation (especially for multi-team >> environments), and restart-on-failure. I think we should spell out the >> proposal clearly. Possibly (if my queue/coordinator split is accepted), we >> could use the queue to determine the task's execution mode. > > I think this are implementation aspects. Managing a JVM process is no > different from Python, and all the characteristics that a Python task possess > apply directly to a Java one. There are really no differences that need > specifications here. (Maybe this point was based on the understanding of the > next point where Java tasks can reuse the same process? But we’re not doing > that.) > > >> 6. Multi-team: I guess coordinator definition should not be shared between >> teams (unless they want it). The current proposal puts them all in single >> ,cfg but Multi-team has this format: Which I think the coordinators should >> also follow? >> >> [core] >> executor = GlobalExecutor;team1=Team1Executor;team2=Team2Executor > > The executor config is formatted this way since it is a default value, and > each team can have a different default. The coordinators config specifies > what should exist in Airflow instead, and each team should choose which > coordinator they want to use (by specifying the queues their tasks use). It > is not necessary (and honestly IMO extremely confusing in practice) if each > team can have a different set of coordinators. > > >> 7. How will logging / tracing be handled? - Currently there is no mention >> of it in the AIP, I am sure it has already been considered and resolved >> (including wire protocol and Java logger integration). But it's not >> specified in the design - and I think describing it is quite important. >> Since we are using StructLog, we want to use a similar tool in Java and >> integrate it with existing struct-logging - but it's not clear how. Java has >> its own logging system and plenty of logging "wrappers" - but here I think >> we have similar problems with "wire protocol." Which one are we going to >> use? Where do we convert what Java produces into StructLog's JSON? On which >> side does this conversion happen? Will the wire protocol be the same for all >> languages? Similar to the wire protocol above, details and choices might be >> discussed in a separate AIP. However, at least the "Java choices" regarding >> logging and how we envision communication for logs should be documented. >> Similarly with OpenTelemetry (OTEL) tracing: perhaps we should skip it for >> now and assume no tracing exists. More likely, however, we will integrate >> the Java side with the Java OTEL implementation and pass enough information >> to the task so it can provide sufficient tracing information via its own >> OTEL configuration. ? I think we should spell out the choices we made at the >> very least here. > > The task (user code) simply print to stdout and stderr, and it gets hurled to > the parent process and recorded with standard Airflow logging. How the > messages are actually print is up to the user. Or you can do your own > logging—we don’t care. (This is actually the same in Python; Airflow itself > uses structured logging, but user-generated task logs are just strings; > Airflow does not enforce a format there.) > > The Java SDK actually has a separate channel to send internal logs so they > are not mixed with user logs, but again this is specific to the > coordinator-SDK combination and doesn’t need to be specified. Each > coordinator can do this in its own way best fit from the SDK. > > Tracing is currently not implemented, but IMO this is more of a standalone > feature on the Java SDK, The Java SDK can simply be configured to send to > the same OT endpoint, and we can probably build some utility functions to > simply things for task authors, but ultimately there’s not really anything to > specify in the AIP since Airflow doesn’t read tracing information (it only > emits). > > TP > > --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
