dondaum commented on issue #17604:
URL: https://github.com/apache/airflow/issues/17604#issuecomment-898580071
We run version v2.1.2.
So I had a look again in the 'Secret Masker" feature as I had it already
configured without any positive effect on the environment variable masking.
But most likely our setup and configuration might be wrong. So let's give
some context:
We have a custom logging config that uses Azure blob strorage for remote
logging where we have add the new filter on the remote task handler:
```python
DEFAULT_LOGGING_CONFIG: Dict[str, Any] = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"airflow": {"format": LOG_FORMAT},
"airflow_coloured": {
"format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
"class": COLORED_FORMATTER_CLASS
if COLORED_LOG
else "logging.Formatter",
},
},
"filters": {
"mask_secrets": {
"()": "airflow.utils.log.secrets_masker.SecretsMasker",
},
},
"handlers": {
"console": {
"class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
"formatter": "airflow_coloured",
"stream": "sys.stdout",
"filters": ["mask_secrets"],
},
"task": {
"class": "airflow.utils.log.file_task_handler.FileTaskHandler",
"formatter": "airflow",
"base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
"filename_template": FILENAME_TEMPLATE,
"filters": ["mask_secrets"],
},
"processor": {
"class":
"airflow.utils.log.file_processor_handler.FileProcessorHandler", # noqa: E501
"formatter": "airflow",
"base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER),
"filename_template": PROCESSOR_FILENAME_TEMPLATE,
"filters": ["mask_secrets"],
},
},
"loggers": {
"airflow.processor": {
"handlers": ["processor"],
"level": LOG_LEVEL,
"propagate": False,
},
"airflow.task": {
"handlers": ["task"],
"level": LOG_LEVEL,
"propagate": False,
"filters": ["mask_secrets"],
},
"flask_appbuilder": {
"handler": ["console"],
"level": FAB_LOG_LEVEL,
"propagate": True,
},
},
"root": {
"handlers": ["console"],
"level": LOG_LEVEL,
"filters": ["mask_secrets"],
},
}
EXTRA_LOGGER_NAMES: str = conf.get(
"logging", "EXTRA_LOGGER_NAMES", fallback=None
)
if EXTRA_LOGGER_NAMES:
new_loggers = {
logger_name.strip(): {
"handler": ["console"],
"level": LOG_LEVEL,
"propagate": True,
}
for logger_name in EXTRA_LOGGER_NAMES.split(",")
}
DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers)
DEFAULT_DAG_PARSING_LOGGING_CONFIG: Dict[str, Dict[str, Dict[str, Any]]] = {
"handlers": {
"processor_manager": {
"class": "logging.handlers.RotatingFileHandler",
"formatter": "airflow",
"filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION,
"mode": "a",
"maxBytes": 104857600, # 100MB
"backupCount": 5,
}
},
"loggers": {
"airflow.processor_manager": {
"handlers": ["processor_manager"],
"level": LOG_LEVEL,
"propagate": False,
}
},
}
...
elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"):
REMOTE_BASE_LOG_FOLDER = TARGET_HOSTNAME
WASB_REMOTE_HANDLERS: Dict[str, Dict[str, Union[str, bool]]] = {
"task": {
"class":
"airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler", #
noqa: E501
"formatter": "airflow",
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
"wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
"wasb_container": "airflow-logs",
"filename_template": FILENAME_TEMPLATE,
"delete_local_copy": False,
"filters": ["mask_secrets"],
},
}
```
In our Airflow configuration we have (we are using helm charts):
`
AIRFLOW__CORE__SENSITIVE_VAR_CONN_NAMES: "DBT_PWD,DBT_WAREHOUSE"
`
I just had a look in the Airflow doc to verify not to miss any other
important config. I found AIRFLOW__CORE__HIDE_SENSITIVE_VAR_CONN_FIELDS but it
is true by default and we do not change it.
In an example dag I use an environment variable:
```python
sec = Variable.get("testvar", deserialize_json=True)
with dag:
k = KubernetesPodOperator(
namespace=aks_kube_config.aks_namespace,
in_cluster=True,
image="python:3.6",
cmds=["python", "-c"],
arguments=["import sys; sys.exit(1)"],
env_vars=[
k8s.V1EnvVar(
name="DBT_WAREHOUSE",
value=sec["DBT_WAREHOUSE"],
)
],
labels={"foo": "bar"},
name="passing-test",
is_delete_operator_pod=True,
task_id="passing-task",
get_logs=True,
resources=aks_kube_config.get_dbt_config(),
node_selector=aks_kube_config.get_dbt_node_selector(),
)
```
However I still have the value of the env variable in the task logs:
```python
[2021-08-13 15:21:38,366] {{taskinstance.py:1501}} ERROR - Task failed with
exception
Traceback (most recent call last):
File
"/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/operators/kubernetes_pod.py",
line 368, in execute
raise AirflowException(f'Pod {self.pod.metadata.name} returned a
failure: {remote_pod}')
airflow.exceptions.AirflowException: Pod
passing-test.786a2b1e2b3e4b60accdf901645c6af0 returned a failure:
{'api_version': 'v1',
'kind': 'Pod',
'metadata': {'annotations': None,
'cluster_name': None,
'creation_timestamp': datetime.datetime(2021, 8, 13, 15, 21,
6, tzinfo=tzlocal()),
'deletion_grace_period_seconds': None,
'deletion_timestamp': None,
'finalizers': None,
'generate_name': None,
'generation': None,
'initializers': None,
'labels': {'airflow_version': '2.1.2',
'dag_id': 'k8s_v3',
'execution_date':
'2021-08-13T151421.6018540000-bfa4f3805',
'foo': 'bar',
'kubernetes_pod_operator': 'True',
'task_id': 'passing-task',
'try_number': '2'},
'managed_fields': [{'api_version': 'v1',
'fields': None,
'manager': 'OpenAPI-Generator',
'operation': 'Update',
'time': datetime.datetime(2021, 8, 13, 15,
21, 6, tzinfo=tzlocal())},
{'api_version': 'v1',
'fields': None,
'manager': 'kubelet',
'operation': 'Update',
'time': datetime.datetime(2021, 8, 13, 15,
21, 36, tzinfo=tzlocal())}],
'name': 'passing-test.786a2b1e2b3e4b60accdf901645c6af0',
'namespace': 'XXXX',
'owner_references': None,
'resource_version': '90185518',
'self_link': None,
'uid': '6bb3ef39-0d53-4b65-8e4b-b0a142304ef4'},
'spec': {'active_deadline_seconds': None,
'affinity': {'node_affinity': None,
'pod_affinity': None,
'pod_anti_affinity': None},
'automount_service_account_token': None,
'containers': [{'args': ['import sys; sys.exit(1)'],
'command': ['python', '-c'],
'env': [{'name': 'DBT_WAREHOUSE',
'value': 'secret',
'value_from': None}],
'env_from': None,
'image': 'python:3.6',
'image_pull_policy': 'IfNotPresent',
'lifecycle': None,
'liveness_probe': None,
'name': 'base',
'ports': None,
'readiness_probe': None,
'resources': {'limits': {'cpu': '1500m',
'memory': '512Mi'},
'requests': {'cpu': '512m',
'memory': '256Mi'}},
'security_context': None,
'stdin': None,
'stdin_once': None,
'termination_message_path': '/dev/termination-log',
'termination_message_policy': 'File',
'tty': None,
'volume_devices': None,
'volume_mounts': [{'mount_path':
'/var/run/secrets/kubernetes.io/serviceaccount',
'mount_propagation': None,
'name': 'default-token-k59nb',
'read_only': True,
'sub_path': None,
'sub_path_expr': None}],
'working_dir': None}],
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]