yassun7010 commented on issue #37217:
URL: https://github.com/apache/airflow/issues/37217#issuecomment-2520237004
My solution is as follows:
## common code
```python
from typing import TYPE_CHECKING, cast
from airflow.decorators import task
from airflow import XComArg
if TYPE_CHECKING:
import google.cloud.batch_v1 as batch_v1
@task
def submit_cloud_batch_job(
job_id: str,
commands: list[str] | XComArg,
/,
*,
envs: dict[str, str] | None = None,
) -> str:
client = batch_v1.BatchServiceClient()
create_request = batch_v1.CreateJobRequest()
runnable = batch_v1.Runnable()
...
runnable.container.commands = cast(list[str], commands)
...
client.create_job(request=create_request)
return create_request.job_id
```
## DAG code
```python
import datetime
from pathlib import Path
from airflow.decorators import dag, task
from airflow.models import DagRun
from common_code import submit_cloud_batch_job
@dag(
dag_id=Path(__file__).stem,
schedule_interval="0 0 * * *",
start_date=datetime.datetime(2021, 1, 1),
)
def execute():
@task()
def make_commands(dag_run: DagRun | None = None):
conf = dag_run.conf if dag_run else {}
commands = [
"python",
"-m",
"my-task",
]
# You can pass any command argument you like from conf or Python
code to Cloud Batch.
if value := conf.get("my-option"):
commands.extend(["--my-option", str(value)])
return commands
submit_cloud_batch_job(
Path(__file__).stem,
make_commands(),
)
execute()
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]