JzhJay commented on issue #58382:
URL: https://github.com/apache/airflow/issues/58382#issuecomment-3540453645
PS: the way we creat our task instance is as follows:
````python
def create_k8s_task_op(
task_id: str,
image: str,
arguments: list,
volumes: list,
volume_mounts: list,
namespace: str,
user_id: int,
group_id: int,
resources: k8s.V1ResourceRequirements = None,
affinity: k8s.V1Affinity = None,
startup_timeout_seconds: int = 1800,
image_pull_policy: str = "Always",
cmds: list = None,
trigger_rule: str = TriggerRule.ALL_SUCCESS,
on_failure_callback: Optional[Callable] = None,
on_retry_callback: Optional[Callable] = None,
scheduler_name: Optional[str] = "default-scheduler",
pod_template_config: dict = {},
labels: dict = {},
extra_env: dict = {},
node_selector: dict = {},
):
KubernetesPodOperator.template_fields_renderers["arguments"] = "py"
return KubernetesPodOperator(
# unique id of the task within the DAG
task_id=task_id,
# the Docker image to launch
image=image,
cmds=cmds,
arguments=arguments,
# arguments=["bash", "run.sh", "pre9", "/opt/conda/bin/python"],
# launch the Pod on the same cluster as Airflow is running on
in_cluster=True,
# launch the Pod in the same namespace as Airflow is running in
namespace=namespace,
on_success_callback=None,
# name the Pod
name=task_id,
# give the Pod name a random suffix, ensure uniqueness in the
namespace
random_name_suffix=True,
# attach labels to the Pod, can be used for grouping
labels=labels,
# reattach to worker instead of creating a new Pod on worker failure
reattach_on_restart=True,
# delete Pod after the task is finished
is_delete_operator_pod=True,
# get log stdout of the container as task logs
get_logs=True,
# log events in case of Pod failure
log_events_on_failure=True,
volumes=volumes,
volume_mounts=volume_mounts,
affinity=affinity,
# pass your name as an environment var
env_vars={"NAME_TO_GREET": task_id, "TZ": TIMEZONE, **extra_env}, #
设置容器的时区为 CST
startup_timeout_seconds=startup_timeout_seconds,
container_resources=resources,
image_pull_policy=image_pull_policy,
trigger_rule=trigger_rule,
security_context={
"runAsUser": user_id, # 指定要运行的用户ID
"runAsGroup": group_id, # 指定要运行的组ID
},
on_failure_callback=on_failure_callback,
on_retry_callback=on_retry_callback,
pod_template_dict=create_pod_template(
image=image,
scheduler_name=scheduler_name,
pod_template_config=pod_template_config,
),
node_selector=node_selector,
)
````
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]