jason810496 commented on code in PR #65956:
URL: https://github.com/apache/airflow/pull/65956#discussion_r3322293920
##########
airflow-e2e-tests/tests/airflow_e2e_tests/conftest.py:
##########
@@ -234,6 +240,99 @@ def _setup_xcom_object_storage_integration(dot_env_file,
tmp_dir):
os.environ["ENV_FILE_PATH"] = str(dot_env_file)
+def _setup_java_sdk_integration(dot_env_file, tmp_dir):
+ """Set up the java_sdk E2E test mode.
+
+ Builds the Java example bundle via the Gradle wrapper, then builds a
+ Java-capable Airflow worker image, copies the JARs into the temp directory,
+ and writes the coordinator configuration.
+ """
+ # Build the example bundle inside an ephemeral JDK container so the host
+ # does not need Java installed.
+ #
+ # --user keeps build outputs owned by the current user (not root).
+ # GRADLE_USER_HOME persists the Gradle distribution and dependency cache in
+ # java-sdk/.gradle/ (already gitignored) so the first run downloads once
+ # and subsequent runs skip straight to compilation.
+ # --no-daemon avoids a background JVM that would outlive the container.
+ console.print("[yellow]Building Java SDK example bundle
(eclipse-temurin:17-jdk)...")
+ subprocess.run(
+ [
+ "docker",
+ "run",
+ "--rm",
+ "--user",
+ f"{os.getuid()}:{os.getgid()}",
+ "-e",
+ "GRADLE_USER_HOME=/repo/java-sdk/.gradle",
+ # Mount java-sdk/ at /java-sdk (the Gradle root project).
+ "-v",
+ f"{AIRFLOW_ROOT_PATH}:/repo",
+ "-w",
+ "/repo/java-sdk",
+ "eclipse-temurin:17-jdk",
+ "./gradlew",
+ ":example:installDist",
+ "--no-daemon",
+ ],
+ check=True,
+ )
+
+ # Copy compose override and Dockerfile into the temp directory.
+ copyfile(JAVA_COMPOSE_PATH, tmp_dir / "java.yml")
+ copyfile(JAVA_DOCKERFILE_PATH, tmp_dir / "Dockerfile.java")
+
+ # Copy all JARs from installDist output so the compose bind-mount ./jars
+ # gives the worker everything JavaCoordinator needs to build a classpath.
+ copytree(JAVA_SDK_EXAMPLE_LIBS_PATH, tmp_dir / "jars")
+
+ # Copy the Java SDK example Dag file so Airflow can discover it.
+ copyfile(JAVA_SDK_DAGS_PATH / "java_examples.py", tmp_dir / "dags" /
"java_examples.py")
+
+ # Build a local Docker image that extends DOCKER_IMAGE with a JRE.
+ # We do this explicitly so testcontainers' DockerCompose.start() does not
+ # need to handle the build itself (which avoids --no-build vs --build flag
+ # uncertainty across testcontainers versions).
+ console.print(f"[yellow]Building airflow-java-worker image on top of
{DOCKER_IMAGE}...")
+ subprocess.run(
+ [
+ "docker",
+ "build",
+ "--build-arg",
+ f"DOCKER_IMAGE={DOCKER_IMAGE}",
+ "-t",
+ "airflow-java-worker",
+ "-f",
+ str(tmp_dir / "Dockerfile.java"),
+ str(tmp_dir),
+ ],
+ check=True,
+ )
+
+ # Coordinator registry: maps the logical name "java-jdk" to
JavaCoordinator.
+ # Queue mapping: routes tasks on the "java" Celery queue to "java-jdk".
+ coordinator_config = json.dumps(
+ {
+ "java-jdk": {
+ "classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
+ "kwargs": {"jars_root": ["/opt/airflow/jars"]},
+ }
+ }
+ )
+ queue_to_coordinator = json.dumps({"java": "java-jdk"})
Review Comment:
Not necessary in this PR, but probably as follow-up.
Just came up with an idea that how about setting up different versions of
jdk (if it's possible) then test with `{"legacy-jdk": "java-jdk-11",
"modern-jdk": "java-jdk-17"}` to demonstrate the the coordinator interface is
able to route the workload to different JDK runtime.
##########
java-sdk/README.md:
##########
@@ -0,0 +1,101 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
Review Comment:
Should we mention user must use Airflow `3.3.0` or above?
Just in case they though any Airflow 3.0+ version will work with Java-SDK.
##########
java-sdk/README.md:
##########
@@ -0,0 +1,101 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+# Airflow Java SDK
+
+A **JVM** SDK for Apache Airflow. You can use any JVM-compatible language to
write
+workflow bundles, and have Airflow consume the result.
+
+The SDK and execution-time logic is implemented in Kotlin.
+An example is bundled showing how the SDK can be used in Java.
+
+## Building the SDK
+
+```bash
+./gradlew build
+```
+
+## Running the example
+
+* Put the [DAG with stub tasks](./dags) to somewhere Airflow can find.
+
+* Ensure the `java` command is available in the same environment the Airflow
+ task worker is in.
+
+* Package the example and its dependencies into JARs in
+ `./example/build/install/example/lib`
+
+ ```bash
+ ./gradlew :example:installDist
+ ```
+
+* Configure Airflow to route tasks in the *java* queue to be run with Java:
+
+ ```bash
+ export AIRFLOW__SDK__COORDINATORS='{
+ "java": {
+ "classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
+ "kwargs": {"jars_root":
["/opt/airflow/java-sdk/example/build/install/example/lib"]}
+ }
+ }'
+ export AIRFLOW__SDK__QUEUE_TO_COORDINATOR='{"java": "java"}'
Review Comment:
How about setting the identifier of coordinator different with the queue
name? In case users interpret that they must set both name identical.
```suggestion
"java-jdk": {
"classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
"kwargs": {"jars_root":
["/opt/airflow/java-sdk/example/build/install/example/lib"]}
}
}'
export AIRFLOW__SDK__QUEUE_TO_COORDINATOR='{"java": "java-jdk"}'
```
##########
java-sdk/README.md:
##########
@@ -0,0 +1,101 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+ -->
+
+# Airflow Java SDK
+
+A **JVM** SDK for Apache Airflow. You can use any JVM-compatible language to
write
+workflow bundles, and have Airflow consume the result.
+
+The SDK and execution-time logic is implemented in Kotlin.
+An example is bundled showing how the SDK can be used in Java.
+
+## Building the SDK
+
+```bash
+./gradlew build
+```
+
+## Running the example
+
+* Put the [DAG with stub tasks](./dags) to somewhere Airflow can find.
+
+* Ensure the `java` command is available in the same environment the Airflow
+ task worker is in.
+
+* Package the example and its dependencies into JARs in
+ `./example/build/install/example/lib`
+
+ ```bash
+ ./gradlew :example:installDist
+ ```
+
+* Configure Airflow to route tasks in the *java* queue to be run with Java:
+
+ ```bash
+ export AIRFLOW__SDK__COORDINATORS='{
+ "java": {
+ "classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
+ "kwargs": {"jars_root":
["/opt/airflow/java-sdk/example/build/install/example/lib"]}
+ }
+ }'
+ export AIRFLOW__SDK__QUEUE_TO_COORDINATOR='{"java": "java"}'
+ ```
+
+* Ensure the Connection and Variable needed by the example DAG are available:
+
+ ```bash
+ export AIRFLOW_CONN_TEST_HTTP='{
+ "conn_type": "http",
+ "login": "user",
+ "password": "pass",
+ "host": "example.com",
+ "port": 1234,
+ "extra": {"param1": "val1", "param2": "val2"}
+ }'
+ export AIRFLOW_VAR_MY_VARIABLE=123
+ ```
+
+## Technical Details
+
+The user uses the SDK to implement a Java application that implements task
+methods, and metadata on which DAG and task each method should be used
+for.
+
+When the Airflow Supervisor identifies a task should be run with Java, it
+launches the Java application as a subprocess. The Java application accepts
+flags `--comm` and `--logs` from the command line to identify TCP sockets it
+should connect to, and communicates with the Supervisor through these channels
+during execution.
+
+1. On connection, the Supervisor immediately sends a StartupDetails message
+ through the comm socket.
+2. The Java application finds and executes the relevant method.
+3. During execution, the Java application uses the comm socket to retrieve
+ information (e.g. Variable) from, and send data (e.g. XCom) to Airflow.
+4. The Java application informs the comm socket to tell the Supervisor the
+ task's terminal state.
+5. The Java application exits.
+
+During the Java application's lifetime, it also sends log messages generated by
+the SDK (not user code) through the logs socket, so the Supervisor can append
+them to Airflow logs.
+
+Communication uses the same formats as the Python-based processes.
+
Review Comment:
How about adding another section to mention the limitation so far.
There're two points I came up with, please feel free to add more if you have.
1. The current Java-SDK doesn't support define entire Dag in Java, we still
required to define corresponding Python Stub Dag. Defining entire Dag is Java
is still WIP.
2. We only allow sequential access for Airflow XCom, Connection, Variables
in Java Task. In another word, user couldn't spawn coroutines that access
`client.getXXX` concurrently.
I did implement the I/O multiplexing in Go-SDK (since the `go func`
goroutine might be one of the huge advantage for Go-native task) , concurrent
access is not allow in Java-SDK at this moment.
##########
java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Logger.kt:
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.airflow.sdk.execution
+
+import io.ktor.utils.io.ByteWriteChannel
+import io.ktor.utils.io.writeString
+import kotlinx.coroutines.runBlocking
+import kotlinx.datetime.LocalDateTime
+import kotlinx.datetime.TimeZone
+import kotlinx.datetime.toLocalDateTime
+import kotlinx.serialization.json.JsonElement
+import kotlinx.serialization.json.JsonNull
+import kotlinx.serialization.json.JsonPrimitive
+import kotlinx.serialization.json.buildJsonArray
+import kotlinx.serialization.json.buildJsonObject
+import kotlin.reflect.KClass
+import kotlin.time.Clock
+
+enum class Level { ERROR, DEBUG, }
+
+internal data class LogMessage(
+ val event: String,
+ val arguments: Map<String, Any>,
+ val logger: Logger,
+ val level: Level,
+ val timestamp: LocalDateTime =
Clock.System.now().toLocalDateTime(TimeZone.currentSystemDefault()),
+)
+
+internal class Logger(
+ cls: KClass<*>,
+) {
+ val name: String? = cls.java.typeName
+
+ // TODO: Actually implement level filtering.
+ @Suppress("UNUSED_PARAMETER")
+ fun isEnabledForLevel(level: Level): Boolean = true
+
+ fun debug(
+ message: String,
+ arguments: Map<String, Any> = emptyMap(),
+ ) {
+ log(Level.DEBUG, message, arguments)
+ }
+
+ fun error(
+ message: String,
+ arguments: Map<String, Any> = emptyMap(),
+ ) {
+ log(Level.ERROR, message, arguments)
+ }
+
+ private fun log(
+ level: Level,
+ event: String,
+ arguments: Map<String, Any>,
+ ) {
+ if (!isEnabledForLevel(level)) return
+ LogSender.send(LogMessage(event, arguments, this, level))
+ }
+}
+
+internal object LogSender {
+ private var writer: ByteWriteChannel? = null
+ val messages: MutableList<LogMessage> = mutableListOf()
Review Comment:
nit caught by claude:
+0 for me to fix this, it _should_ be fine to have interleave logs IMO. (API
server will interleave the logs by sorting timestamp anyway.
---
LogSender is not thread-safe. messages: `MutableList<LogMessage>` and
writer: var are touched from at least two coroutines (startProcessing thread,
configure thread, and the user task thread via
Logger.send). `mutableListOf().removeAt(0)` while another thread does
`messages.add(...)` will race; visibility of writer is also not guaranteed. Use
a ConcurrentLinkedQueue (or guard with a lock)
and make writer `@Volatile`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]