[ https://issues.apache.org/jira/browse/AIRFLOW-3645?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Work on AIRFLOW-3645 started by Kyle Hamlin. -------------------------------------------- > Use a base_executor_config and merge operator level executor_config > ------------------------------------------------------------------- > > Key: AIRFLOW-3645 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3645 > Project: Apache Airflow > Issue Type: Improvement > Reporter: Kyle Hamlin > Assignee: Kyle Hamlin > Priority: Major > Fix For: 1.10.2 > > > It would be very useful to have a `base_executor_config` and merge the base > config with any operator level `executor_config`. > I imaging referencing a python dict similar to how we reference a custom > logging_config > *Example config* > {code:java} > [core] > base_executor_config = config.base_executor_config.BASE_EXECUTOR_CONFIG > {code} > *Example base_executor_config* > {code:java} > BASE_EXECUTOR_CONFIG = { > "KubernetesExecutor": { > "image_pull_policy": "Always", > "annotations": { > "iam.amazonaws.com/role": "arn:aws:iam::<some arn>" > }, > "volumes": [ > { > "name": "airflow-lib", > "persistentVolumeClaim": { > "claimName": "airflow-lib" > } > } > ], > "volume_mounts": [ > { > "name": "airflow-lib", > "mountPath": "/usr/local/airflow/lib", > } > ] > } > } > {code} > *Example operator* > {code:java} > run_this = PythonOperator( > task_id='print_the_context', > provide_context=True, > python_callable=print_context, > executor_config={ > "KubernetesExecutor": { > "request_memory": "256Mi", > "request_cpu": "100m", > "limit_memory": "256Mi", > "limit_cpu": "100m" > } > }, > dag=dag) > {code} > Then we'll want to have a dict deep merge function in that returns the > executor_config > *Merge functionality* > {code:java} > import collections > from airflow import conf > from airflow.utils.module_loading import import_string > def dict_merge(dct, merge_dct): > """ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of > updating only top-level keys, dict_merge recurses down into dicts nested > to an arbitrary depth, updating keys. The ``merge_dct`` is merged into > ``dct``. > :param dct: dict onto which the merge is executed > :param merge_dct: dct merged into dct > :return: dct > """ > for k, v in merge_dct.items(): > if (k in dct and isinstance(dct[k], dict) > and isinstance(merge_dct[k], collections.Mapping)): > dict_merge(dct[k], merge_dct[k]) > else: > dct[k] = merge_dct[k] > > return dct > def get_executor_config(executor_config): > """Try to import base_executor_config and merge it with provided > executor_config. > :param executor_config: operator level executor config > :return: dict""" > > try: > base_executor_config = import_string( > conf.get('core', 'base_executor_config')) > merged_executor_config = dict_merge( > base_executor_config, executor_config) > return merged_executor_config > except Exception: > return executor_config > {code} > Finally, we'll want to call the get_executor_config function in the > `BaseOperator` possibly here: > https://github.com/apache/airflow/blob/master/airflow/models/__init__.py#L2348 -- This message was sent by Atlassian JIRA (v7.6.3#76005)