dondaum commented on issue #17604:
URL: https://github.com/apache/airflow/issues/17604#issuecomment-898663582
So I did tested the workaround with a quick and dirty subclass and
overriding of the execute method. For each env value I call mask_secret():
```python
class KubernetesPodOperator_(KubernetesPodOperator):
def __init__(
self,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
def _get_secret_var(self) -> List[k8s.V1EnvVar]:
try:
sec = Variable.get(
"testvar", deserialize_json=True
)
except KeyError:
self.log.warning(
"You have to add the variable in Airflow"
)
sec = {
"DBT_WAREHOUSE": "Default",
}
return [
k8s.V1EnvVar(
name="DBT_DATABASE",
value=sec["DBT_WAREHOUSE"],
),
]
def execute(self, context):
from airflow.utils.log.secrets_masker import mask_secret
self.log.info(
f"Custom KubernetesPodOperator runs K8S Task"
)
self.env_vars = self._get_secret_var()
for secret_env in self.env_vars:
mask_secret(secret_env.value)
return super().execute(context)
```
Afterwards I had a look in the logs and it seems that the masking now works
but only in one of the two exception messages in the logs.
Here is the detailed log message:
```python
[2021-08-13 18:27:01,897] {{taskinstance.py:1501}} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py",
line 368, in execute
raise AirflowException(f'Pod {self.pod.metadata.name} returned a
failure: {remote_pod}')
airflow.exceptions.AirflowException: Pod
passing-test.ea5fb34fd1274264baa318e7d66098de returned a failure:
{'api_version': 'v1',
'kind': 'Pod',
'metadata': {'annotations': None,
'cluster_name': None,
'creation_timestamp': datetime.datetime(2021, 8, 13, 18, 26,
33, tzinfo=tzlocal()),
'deletion_grace_period_seconds': None,
'deletion_timestamp': None,
'finalizers': None,
'generate_name': None,
'generation': None,
'initializers': None,
'labels': {'airflow_version': '2.1.2',
'dag_id': 'k8s_v3',
'execution_date':
'2021-08-13T151421.6018540000-bfa4f3805',
'foo': 'bar',
'kubernetes_pod_operator': 'True',
'task_id': 'passing-task',
'try_number': '6'},
'managed_fields': [{'api_version': 'v1',
'fields': None,
'manager': 'OpenAPI-Generator',
'operation': 'Update',
'time': datetime.datetime(2021, 8, 13, 18,
26, 33, tzinfo=tzlocal())},
{'api_version': 'v1',
'fields': None,
'manager': 'kubelet',
'operation': 'Update',
'time': datetime.datetime(2021, 8, 13, 18,
27, tzinfo=tzlocal())}],
'name': 'passing-test.ea5fb34fd1274264baa318e7d66098de',
'namespace': 'XXXXX',
'owner_references': None,
'resource_version': '90215538',
'self_link': None,
'uid': '00145ef8-992c-42d7-9ce3-f2a7990617f0'},
'spec': {'active_deadline_seconds': None,
'affinity': {'node_affinity': None,
'pod_affinity': None,
'pod_anti_affinity': None},
'automount_service_account_token': None,
'containers': [{'args': ['import sys; sys.exit(1)'],
'command': ['python', '-c'],
'env': [{'name': 'DBT_DATABASE',
'value': 'secret',
'value_from': None}],
'env_from': None,
'image': 'python:3.6',
'image_pull_policy': 'IfNotPresent',
'lifecycle': None,
'liveness_probe': None,
'name': 'base',
'ports': None,
'readiness_probe': None,
'resources': {'limits': {'cpu': '1500m',
'memory': '512Mi'},
'requests': {'cpu': '512m',
'memory': '256Mi'}},
'security_context': None,
'stdin': None,
'stdin_once': None,
'termination_message_path': '/dev/termination-log',
'termination_message_policy': 'File',
'tty': None,
'volume_devices': None,
'volume_mounts': [{'mount_path':
'/var/run/secrets/kubernetes.io/serviceaccount',
'mount_propagation': None,
'name': 'default-token-k59nb',
'read_only': True,
'sub_path': None,
'sub_path_expr': None}],
'working_dir': None}],
...
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
line 1157, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
line 1331, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py",
line 1361, in _execute_task
result = task_copy.execute(context=context)
File "/opt/airflow/plugins/airflow_dbt/operators/dbt_k8s_operator.py",
line 61, in execute
return super().execute(context)
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py",
line 373, in execute
raise AirflowException(f'Pod Launching failed: {ex}')
airflow.exceptions.AirflowException: Pod Launching failed: Pod
passing-test.ea5fb34fd1274264baa318e7d66098de returned a failure:
{'api_version': 'v1',
'kind': 'Pod',
'metadata': {'annotations': None,
'cluster_name': None,
'creation_timestamp': datetime.datetime(2021, 8, 13, 18, 26,
33, tzinfo=tzlocal()),
'deletion_grace_period_seconds': None,
'deletion_timestamp': None,
'finalizers': None,
'generate_name': None,
'generation': None,
'initializers': None,
'labels': {'airflow_version': '2.1.2',
'dag_id': 'k8s_v3',
'execution_date':
'2021-08-13T151421.6018540000-bfa4f3805',
'foo': 'bar',
'kubernetes_pod_operator': 'True',
'task_id': 'passing-task',
'try_number': '6'},
'managed_fields': [{'api_version': 'v1',
'fields': None,
'manager': 'OpenAPI-Generator',
'operation': 'Update',
'time': datetime.datetime(2021, 8, 13, 18,
26, 33, tzinfo=tzlocal())},
{'api_version': 'v1',
'fields': None,
'manager': 'kubelet',
'operation': 'Update',
'time': datetime.datetime(2021, 8, 13, 18,
27, tzinfo=tzlocal())}],
'name': 'passing-test.ea5fb34fd1274264baa318e7d66098de',
'namespace': 'XXXXX',
'owner_references': None,
'resource_version': '90215538',
'self_link': None,
'uid': '00145ef8-992c-42d7-9ce3-f2a7990617f0'},
'spec': {'active_deadline_seconds': None,
'affinity': {'node_affinity': None,
'pod_affinity': None,
'pod_anti_affinity': None},
'automount_service_account_token': None,
'containers': [{'args': ['import sys; sys.exit(1)'],
'command': ['python', '-c'],
'env': [{'name': 'DBT_DATABASE',
'value': '***',
'value_from': None}],
'env_from': None,
'image': 'python:3.6',
'image_pull_policy': 'IfNotPresent',
'lifecycle': None,
'liveness_probe': None,
'name': 'base',
'ports': None,
'readiness_probe': None,
'resources': {'limits': {'cpu': '1500m',
'memory': '512Mi'},
'requests': {'cpu': '512m',
'memory': '256Mi'}},
'security_context': None,
'stdin': None,
'stdin_once': None,
'termination_message_path': '/dev/termination-log',
'termination_message_policy': 'File',
'tty': None,
'volume_devices': None,
'volume_mounts': [{'mount_path':
'/var/run/***s/kubernetes.io/serviceaccount',
'mount_propagation': None,
'name': 'default-token-k59nb',
'read_only': True,
'sub_path': None,
'sub_path_expr': None}],
'working_dir': None}],
```
If a pod ends with an failure the KubernetesPodOperator seems to check the
state first and if the state does not equal State.SUCCESS it raises an
AirflowException. This exception is catched and raised again a couple of
statements below. However the masking (at least how my very trivial subclass
implements it) only works for the second exception message.
[KubernetesPodOperator
](https://github.com/apache/airflow/blob/providers-cncf-kubernetes/2.0.0/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py)
```python
else:
self.log.info("creating pod with labels %s and launcher %s",
labels, launcher)
final_state, remote_pod, result =
self.create_new_pod_for_operator(labels, launcher)
if final_state != State.SUCCESS:
raise AirflowException(f'Pod {self.pod.metadata.name}
returned a failure: {remote_pod}')
context['task_instance'].xcom_push(key='pod_name',
value=self.pod.metadata.name)
context['task_instance'].xcom_push(key='pod_namespace',
value=self.namespace)
return result
except AirflowException as ex:
raise AirflowException(f'Pod Launching failed: {ex}')
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]