On 12 May 2026, at 02:45, Jarek Potiuk <[email protected]> wrote: > >> The AIP has been updated to emphasise how to coordinator works with the >> (non-Python) SDK artefact, and that process management (if present) and >> message communication is not specified in the proposal since it is a >> contract only between the two. >> >> I also removed all references to parsing a Java dag file to avoid any >> potential conflicts to AIP-82. AIP-108 now only defines how the coordinator >> should find an executable target for a stub task, and the process it goes >> through to fulfil execution. >> > > Note. There are still some references to parsing and jars. For example > there is "DAG File (Python or JAR)" - but if I understand correctly, we no > longer have "Jar Dags." We will still have purely Python DAGs, and we also > need to specify the "jar" file to use on the execution side. I see that we > are going to store the jar in a serialized DAG—so it should be specified > somewhere in "task.stub()" - am I right? > Currently the "Foreign Language Process" chapter seems to still describe > the case where the Dag is defined in Java, not in Python, and defers to the > internal "coordinator" on how to pass it. Since AIP-108 now only concerns > the "Java bridge," should we describe in detail how we specify the jar to > use? And in this case I assume it's up to worker to place the ".jar" there > - do we make any assumptions if the "."jar" files should be distributed via > shared Dag Bundles? Or some other ways? I think it's important to define > how "jar" files are distributed—especially in the context of GitDagBundle > and versioned Dag. It seems to be appropriate to include JAR files as part > of the "Dag Bundle/Git" and reference them using a relative path from the > "Dag Bundle" root. I think this part is a little vague, especially since > AIP-108 specifically concerns the JDK "Bridge" implementation. Details > cannot be deferred to the "implementation detail of each coordinator" - > because this is the only one we are discussing (no matter what we call it). > So I think it should be a bit more specific.
There is still a concept of parsing in JARs since we still need a mechanism for the Java code to define which task is being implemented. This is still called Dag and Task in the Java SDK to match the terminology in Python. But Airflow does not find Airflow dags in JARs. I’ll edit out the DAG File reference you mentioned (and some other use of “DAG parsing” to avoid confusion since this is not the same concept in the Airflow dag processor sense). The JAR is not stored in a serialised dag (nor does a JAR contain serialised dags). It is configured in the worker environment. (JARs are similar to executing Python packages; the 'java' command knows where to look, or you can specify it in the coordinator with arguments.) The above only happens on execution time, so the JARs (technically) doesn’t need to be in the dag bundle, only somewhere in the execution environment that the coordinator can locate. In practice maybe it should be included in the bundle since the implementation affects dag versioning, but on the other hand a JAR may be difficult to version since it is fundamentally a derived binary (technically an extension to ZIP). Personally I think we need some real world user experience to know the best approach here, so the AIP does not provide an opinion on this topic. The best practice needs to be researched and codified later. > >> I disagree this is a blocker from releasing the coordinator. Since a >> language-specific coordinator is its own distribution, it does not follow >> Airflow Core and SDK’s major release cycles. It can be made experimental, >> and/or bump the major version for breaking changes on its own. >> >> > Yeah, If the goal is not to define the "coordinator" concept that will be > implemented in other languages as well - but specifically about the Java > Bridge—then I absolutely agree. But then introducing the concept of a new > "coordinator" distribution type is a bit far-fetched. It seems more like a > "task-sdk" Python -to-Java bridge that should be an optional "task-sdk" > feature. Alternatively, we could make it a separate extension, perhaps > named "airflow-sdk-java-bridge" because it essentially becomes the > "Task-sdk Java bridge". And a side comment: it looks like our "task-sdk" > will not have the use we anticipated. This "Task-SDK bridge," along with > all the "decoupling" we've done, isn't really used. Essentially, we will > have to hard-pin the Bridge's implementation to the task-sdk version > because we are not defining any API level compatibility mechanism. Even if > we separate it from "task-sdk", we will always have to pin those two > distributions together (unless we provide cadwyn-like compatibility for > task-sdk). So I woudl rather opt for "built-in" feature in "task-sdk" not > separate distribution I think. I don’t agree with putting the package under the airflow.sdk namespace since it complicates distribution a lot down the road. We already have interest for other languages, and the airflow.sdk namespace already contains a lot of Python SDK things as-is, it is not a good idea to also open it up for potential language runtimes. Furthermore, the coordinator layer is not a part of the DAG authoring interface, which airflow.sdk is supposed to contain. You could maybe argue it can go into a nested package such as airflow.sdk.execution_time.coordinators, but that is just too unnecessarily long. A separate package i.e. airflow.coordinators is the way to go in my opinion. > *4. JVM process model -> *unlike in Python, we do not know if we are >> running new processes or running tasks within the JVM process. This has >> implications for memory, startup time overhead, JIT, isolation (especially >> for multi-team environments), and restart-on-failure. I think we should >> spell out the proposal clearly. Possibly (if my queue/coordinator split is >> accepted), we could use the queue to determine the task's execution mode. >> >> I think this are implementation aspects. Managing a JVM process is no >> different from Python, and all the characteristics that a Python task >> possess apply directly to a Java one. There are really no differences that >> need specifications here. (Maybe this point was based on the understanding >> of the next point where Java tasks can reuse the same process? But we’re >> not doing that.) >> > > Hmm. I initially understood (with the coordinator concept) that we should > at least know how it will look in different cases for different > executors—specifically, whether a Python process runs under supervisor or > not. We already do this in Python. For example, the GoSDK (initially, at > least - not sure now) used the idea of a long running Go process that ran > tasks as subroutines (as far as I remember) - and it did not start a new > process for every task. Here it seems (and maybe I am wrong) what we are > proposing is that Python "supervisor" will start a Java process separately > for each task. Which is fine. As long as we are explicit about that. I see > three models here that might work: > > a) Python process (started by any executor—local executor process pool, > forked Celery worker, or Python edge—the method is the same as today) > starts a new Java process for every task. Both are running and taking > memory and starting the Java Interpreter is an additional "startup" > overhead. > b) New Java process(es) start via the workers (either the Scheduler for the > Local Executor, a Celery worker, or the Edge Executor), and the executors > communicate with this Java process without needing to create a new Python > interpreter. That would require changes in each executor but results in > less "per task" overhead, no need to create a Python Interpreter and less > isolation (unless we can use some built-in Java isolation mechanism) > c) A long-running Java process that implements the Edge Executor interface > and starts tasks using whatever provides good isolation in Java. > > But by simplifying it to merely the Java-specific "Task-SDK Bridge," I > understand we are talking about* a) only. - *Neither b) nor c) has been > considered at all. I think it would be great to confirm and spell it out, > as it has very concrete performance implications. When we discussed > multiple language support we cited both "Native API" and > "Performance/Overhead" as reasons to implement new languages. This > implementation essentially only addresses "Native API," in addition to the > Task SDK we already have. The entire TaskSDK premise was that it could be > implemented natively in any language. It might still be used this way, but > I understand it's not a goal of this AI any more. > > *6. Multi-team: *I guess coordinator definition should not be shared >> between teams (unless they want it). The current proposal puts them all in >> single ,cfg but Multi-team has this format: Which I think the coordinators >> should also follow? >> >> >> [core] >> executor = GlobalExecutor;team1=Team1Executor;team2=Team2Executor >> >> >> The executor config is formatted this way since it is a default value, and >> each team can have a different default. The coordinators config specifies >> what should exist in Airflow instead, and each team should choose which >> coordinator they want to use (by specifying the queues their tasks use). It >> is not necessary (and honestly IMO extremely confusing in practice) if each >> team can have a different set of coordinators. >> >> > With the explanation that we are switching only to "Task-SDK Java Bridge" > then it does not need that indeed. But I think we should be explicit that > it's a worker-only configuration. > > > >> The task (user code) simply print to stdout and stderr, and it gets hurled >> to the parent process and recorded with standard Airflow logging. How the >> messages are actually print is up to the user. Or you can do your own >> logging—we don’t care. (This is actually the same in Python; Airflow itself >> uses structured logging, but user-generated task logs are just strings; >> Airflow does not enforce a format there.) >> > > Hmm. I do not know the details but as far as I know, we currently use our > own more complex interface between the Python task and the supervisor that > we implemented. As far as I remember it was StructLog with > newline-separated JSON (but I might be wrong). Some of the issues we > encountered were that stderr/stdout is too brittle to rely on without > structure (regarding flushing and similar). I would love others who know > more to chime in here - but I do not think we use stdout/stderr there; it's > far more sophisticated. Since the JDK has its own logging "standard" > mechanism, it would make sense to tap into it and use the same interface, I > guess. We know from the past that relying on stdout/stderr is generally a > bad idea. But Maybe I am wrong about that. I think this requires > clarification and consultation regarding the past stdin/stderr errors. The supervisor process does use structured logging, but logs emitted from USER CODE is basically plain string. Technically it’s wrapped in a JSON to include some extra information (file location, stdout or stderr, etc.) but ultimately the user can only emit plain strings to Airflow. Java has its own logging architecture, but Airflow can only care about what Airflow understands. We can discuss what the best practice should be when you write Java tasks, but this AIP shouldn’t go beyond stdout and stderr streams. > Tracing is currently not implemented, but IMO this is more of a standalone >> feature on the Java SDK, The Java SDK can simply be configured to send to >> the same OT endpoint, and we can probably build some utility functions to >> simply things for task authors, but ultimately there’s not really anything >> to specify in the AIP since Airflow doesn’t read tracing information (it >> only emits). >> > > I think that's a fine (and good) approach to let people configure > OpenTelemetry (OTEL) in the regular way they configure other Java > processes. As long as we clearly document this and pass sufficient > information when executing the task (e.g., some OTEL span-ids), I believe > that should be enough for the standalone Java process to properly emit its > metrics and traces. It's likely just agreement on the interface - what > should be passed to Java to make it possible, and whether any additional > exchange via Task-SDK (doubtful) is needed. We likely need to tell people > how to handle this. When each Java task is a separate Java interpreter, it > will very likely "just work" as OTEL libraries should handle the "soft" and > "harder" (signal-based) killing. Speaking of which, it's also a question > for OpenLineage: whether we need to do something similar there. In the > past, there were various issues with buffering OTEL and Open Lineage > information and prematurely killing processes, so including the > OTEL/Lineage people in the discussion is likely a good idea. Even in the pure Python scenario, the supervisor layer does not receive OTel messages; they are sent directly from the worker process. The question would be entirely about the interface, which we can always add later. OL is mostly not an issue since Airflow’s integration is mostly at the operator and hook level, but the Java SDK does not hav those concepts. We can introduce interfaces to help user emit their own lineage info from Java code, but again that should be added later. > > Sorry for being such pain in the neck - I thin it's good we are simplifying > things as a result of this discussions and narrowing the scope - but I thin > there are still some important questions ^^ to solve. > > J. > > >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: [email protected] >> For additional commands, e-mail: [email protected] >> >> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
