Hello, *Ash*,
> One important thing not explicitly stated in the AIP (only implied by a "side-effect” in the diagram or airflow.sdk code references) is that the Coordinator is entirely a worker-side construct. The scheduler is entirely unaware of how a task runs, only that it does. That change in scope really changes things a LOT. Because when I looked at it a few days ago, it was not. Back then (and at the dev call) it looked like a generic solution that should eventually handle selection of different languages and introduce a coordinator standard and interface that will be implemented across many different languages. It also involved part of DagFile Processing, making it a much more "shared" construct between Worker and DagFileProcess. It also appeared to be configured the same way in Scheduler, and Scheduler makes the choice of which coordinator to select for which task. After removing the Dag File Processor, it can now truly be thought of only as a "worker" (i.e., a task execution process). And that indeed changes and simplifies a lot and I think many more decisions can now be better justified. But with this scope, to be honest that starts to feel much less like a coordinator and more like a "Bridge to run Java Tasks." It's very specific to Java. Is this something we want to use in exactly this form - or even reuse anything of in different language implementations? Removing this "first matching" mechanism from the AIP changes its "genericness" and applicability for other languages quite dramatically. Basically, all that remains is something like a* "Python <> Java bridge to start the process and pass through the task-SDK."* And I am not even sure if the "coordinator" concept fits it any more - I would rather call it " *bridge*" or something like that to make it clearer :). And in my opinion, it should be an optional feature of "task-sdk," not a separate distribution. I am not sure if separate "coordinator" distributions are justified in this case. Paraphrasing it: If I understand correctly, the current scope is to allow the Python task process to run a subprocess specifically in a JDK-interface-specific way, send it all the starting data, pass-through Task-SDK messages and send back responses - without any ambition to become the standard approach for any other language? It's mostly a `hack" to do it this way and use queue to pass "dssired execution context - specific and implemented only in Java". And it is nothing bad, to be perfectly honest, - this is fine and simple - has less coupling (for example we can indeed immediately drop the whole discussion which protocol we should use). But it looked like much more than that during the dev call and in the discussion :) > This is especially true for one of the use cases I had in my presentation at Airflow Sumit where you have a separate team define the task, and you just “call it like a function”. You as the dag author don’t need, and shouldn’t know if it’s run in idk-8 or jdk-25. Oh, absolutely—that makes total sense - now after removing the Dag Parsing part. I understand we simply want to simplify it now (comparing to what we saw on the Dev Call as well?) - for the sake of "delivering it on time" and focusing only on that part. I just want to be crystal clear about the current scope :) > Point 2: queue_to_sdk is per-worker deployment config. The scheduler never sees or touches coordinator selection -- it only knows about queue in the same way it always has. The CeleryKubernetesExecutor comparison doesn't quite apply here because that was a scheduler/executor-level conflation; here the scheduler is entirely unaware coordinators exist. Adding coordinator="jdk-17" to @task.stub would require the scheduler to carry and propagate that to the worker, which is exactly the coupling we're trying to avoid. The coordinator choice is a deployment operator decision (not 100% only, but mostly), not a DAG authoring decision. That was very unclear. This also necessitates clearly defining which configuration part each component uses. For 3.2, we merely mentioned in the security model which items should be configured where. If we now introduce different configurations for different components, I think that should also be reflected in how we treat config. Likely, we need at least a documentation split there, or most likely "airflow config" should have options to generate, list etc. things per-component. And yeah, spelling it out in the AIP would be great—even now it says "airflow.cfg" and does not mention that it is configured specifically in each worker. So as I understand it now, we aren't introducing a reusable coordinator concept. Instead, tasks should specify which "*Java bridge*" (for example) to use in a queue. The task stub should provide a way to pass which specific Java bridge implementation to use if more than one exists. We might use the queue similarly if we have another language bridge, but that isn't certain. Is this a good statement? *TP:* On Mon, May 11, 2026 at 1:29 AM Tzu-ping Chung <[email protected]> wrote: > The AIP has been updated to emphasise how to coordinator works with the > (non-Python) SDK artefact, and that process management (if present) and > message communication is not specified in the proposal since it is a > contract only between the two. > > I also removed all references to parsing a Java dag file to avoid any > potential conflicts to AIP-82. AIP-108 now only defines how the coordinator > should find an executable target for a stub task, and the process it goes > through to fulfil execution. > Note. There are still some references to parsing and jars. For example there is "DAG File (Python or JAR)" - but if I understand correctly, we no longer have "Jar Dags." We will still have purely Python DAGs, and we also need to specify the "jar" file to use on the execution side. I see that we are going to store the jar in a serialized DAG—so it should be specified somewhere in "task.stub()" - am I right? Currently the "Foreign Language Process" chapter seems to still describe the case where the Dag is defined in Java, not in Python, and defers to the internal "coordinator" on how to pass it. Since AIP-108 now only concerns the "Java bridge," should we describe in detail how we specify the jar to use? And in this case I assume it's up to worker to place the ".jar" there - do we make any assumptions if the "."jar" files should be distributed via shared Dag Bundles? Or some other ways? I think it's important to define how "jar" files are distributed—especially in the context of GitDagBundle and versioned Dag. It seems to be appropriate to include JAR files as part of the "Dag Bundle/Git" and reference them using a relative path from the "Dag Bundle" root. I think this part is a little vague, especially since AIP-108 specifically concerns the JDK "Bridge" implementation. Details cannot be deferred to the "implementation detail of each coordinator" - because this is the only one we are discussing (no matter what we call it). So I think it should be a bit more specific. > I disagree this is a blocker from releasing the coordinator. Since a > language-specific coordinator is its own distribution, it does not follow > Airflow Core and SDK’s major release cycles. It can be made experimental, > and/or bump the major version for breaking changes on its own. > > Yeah, If the goal is not to define the "coordinator" concept that will be implemented in other languages as well - but specifically about the Java Bridge—then I absolutely agree. But then introducing the concept of a new "coordinator" distribution type is a bit far-fetched. It seems more like a "task-sdk" Python -to-Java bridge that should be an optional "task-sdk" feature. Alternatively, we could make it a separate extension, perhaps named "airflow-sdk-java-bridge" because it essentially becomes the "Task-sdk Java bridge". And a side comment: it looks like our "task-sdk" will not have the use we anticipated. This "Task-SDK bridge," along with all the "decoupling" we've done, isn't really used. Essentially, we will have to hard-pin the Bridge's implementation to the task-sdk version because we are not defining any API level compatibility mechanism. Even if we separate it from "task-sdk", we will always have to pin those two distributions together (unless we provide cadwyn-like compatibility for task-sdk). So I woudl rather opt for "built-in" feature in "task-sdk" not separate distribution I think. > I’m not sure where the “first matching” understanding is from. (Please > point out relevant text on this; it is most likely out-of-date.) A task > specifies a queue, and then the queue is mapped to a specific coordinator. > There can’t be multiple to choose from. You can map multiple queues to use > the same coordinator, but only one coordinator can be assigned to a queue. > >From this (now removed): ┌──────────────▼───────────────┐ ┌──────────────────────────────┐ │ DAG File Processor │ │ Runtime Subprocess (Java) │ │ │ can_handle_dag │ │ │ For each file in bundle: │ _file() == True │ dag_parsing_cmd() │ │ ┌ coordinator handles it? ──┼───────────────────►│ │ │ │ Yes ──► delegate parse │ │ Java SDK parses JAR, builds │ │ │ No ──► Python path │ SDK Serialized │ SDK-compatible Serialized │ │ │ │◄─── DAG JSON ──────┤ DAG JSON (sdk, tasks, etc.) │ │ └ │ │ │ └──────────────┬───────────────┘ └──────────────────────────────┘ Since we can have multiple coordinators (as noted in even existing example below - sometimes multiple coordinators handle it.java), the first coordinator that returns "can_handle_dags" will win. (That was on Saturday when I reviewed it) And I understand why it was done that way—because, obviously, you would have to create a method to determine which coordinator should handle which files without hard-coding it. But since in this version—but since we removed that whole section, and we only explicitly rely on choosing coordinator based on what "task" defines - and can use explicitly "coordinator name"—this whole part is gone now. But yes. Since this part is removed, it is no longer a concern. *4. JVM process model -> *unlike in Python, we do not know if we are > running new processes or running tasks within the JVM process. This has > implications for memory, startup time overhead, JIT, isolation (especially > for multi-team environments), and restart-on-failure. I think we should > spell out the proposal clearly. Possibly (if my queue/coordinator split is > accepted), we could use the queue to determine the task's execution mode. > > > I think this are implementation aspects. Managing a JVM process is no > different from Python, and all the characteristics that a Python task > possess apply directly to a Java one. There are really no differences that > need specifications here. (Maybe this point was based on the understanding > of the next point where Java tasks can reuse the same process? But we’re > not doing that.) > Hmm. I initially understood (with the coordinator concept) that we should at least know how it will look in different cases for different executors—specifically, whether a Python process runs under supervisor or not. We already do this in Python. For example, the GoSDK (initially, at least - not sure now) used the idea of a long running Go process that ran tasks as subroutines (as far as I remember) - and it did not start a new process for every task. Here it seems (and maybe I am wrong) what we are proposing is that Python "supervisor" will start a Java process separately for each task. Which is fine. As long as we are explicit about that. I see three models here that might work: a) Python process (started by any executor—local executor process pool, forked Celery worker, or Python edge—the method is the same as today) starts a new Java process for every task. Both are running and taking memory and starting the Java Interpreter is an additional "startup" overhead. b) New Java process(es) start via the workers (either the Scheduler for the Local Executor, a Celery worker, or the Edge Executor), and the executors communicate with this Java process without needing to create a new Python interpreter. That would require changes in each executor but results in less "per task" overhead, no need to create a Python Interpreter and less isolation (unless we can use some built-in Java isolation mechanism) c) A long-running Java process that implements the Edge Executor interface and starts tasks using whatever provides good isolation in Java. But by simplifying it to merely the Java-specific "Task-SDK Bridge," I understand we are talking about* a) only. - *Neither b) nor c) has been considered at all. I think it would be great to confirm and spell it out, as it has very concrete performance implications. When we discussed multiple language support we cited both "Native API" and "Performance/Overhead" as reasons to implement new languages. This implementation essentially only addresses "Native API," in addition to the Task SDK we already have. The entire TaskSDK premise was that it could be implemented natively in any language. It might still be used this way, but I understand it's not a goal of this AI any more. *6. Multi-team: *I guess coordinator definition should not be shared > between teams (unless they want it). The current proposal puts them all in > single ,cfg but Multi-team has this format: Which I think the coordinators > should also follow? > > > [core] > executor = GlobalExecutor;team1=Team1Executor;team2=Team2Executor > > > The executor config is formatted this way since it is a default value, and > each team can have a different default. The coordinators config specifies > what should exist in Airflow instead, and each team should choose which > coordinator they want to use (by specifying the queues their tasks use). It > is not necessary (and honestly IMO extremely confusing in practice) if each > team can have a different set of coordinators. > > With the explanation that we are switching only to "Task-SDK Java Bridge" then it does not need that indeed. But I think we should be explicit that it's a worker-only configuration. > The task (user code) simply print to stdout and stderr, and it gets hurled > to the parent process and recorded with standard Airflow logging. How the > messages are actually print is up to the user. Or you can do your own > logging—we don’t care. (This is actually the same in Python; Airflow itself > uses structured logging, but user-generated task logs are just strings; > Airflow does not enforce a format there.) > Hmm. I do not know the details but as far as I know, we currently use our own more complex interface between the Python task and the supervisor that we implemented. As far as I remember it was StructLog with newline-separated JSON (but I might be wrong). Some of the issues we encountered were that stderr/stdout is too brittle to rely on without structure (regarding flushing and similar). I would love others who know more to chime in here - but I do not think we use stdout/stderr there; it's far more sophisticated. Since the JDK has its own logging "standard" mechanism, it would make sense to tap into it and use the same interface, I guess. We know from the past that relying on stdout/stderr is generally a bad idea. But Maybe I am wrong about that. I think this requires clarification and consultation regarding the past stdin/stderr errors. Tracing is currently not implemented, but IMO this is more of a standalone > feature on the Java SDK, The Java SDK can simply be configured to send to > the same OT endpoint, and we can probably build some utility functions to > simply things for task authors, but ultimately there’s not really anything > to specify in the AIP since Airflow doesn’t read tracing information (it > only emits). > I think that's a fine (and good) approach to let people configure OpenTelemetry (OTEL) in the regular way they configure other Java processes. As long as we clearly document this and pass sufficient information when executing the task (e.g., some OTEL span-ids), I believe that should be enough for the standalone Java process to properly emit its metrics and traces. It's likely just agreement on the interface - what should be passed to Java to make it possible, and whether any additional exchange via Task-SDK (doubtful) is needed. We likely need to tell people how to handle this. When each Java task is a separate Java interpreter, it will very likely "just work" as OTEL libraries should handle the "soft" and "harder" (signal-based) killing. Speaking of which, it's also a question for OpenLineage: whether we need to do something similar there. In the past, there were various issues with buffering OTEL and Open Lineage information and prematurely killing processes, so including the OTEL/Lineage people in the discussion is likely a good idea. Sorry for being such pain in the neck - I thin it's good we are simplifying things as a result of this discussions and narrowing the scope - but I thin there are still some important questions ^^ to solve. J. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > >
