ephraimbuddy opened a new issue #12418:
URL: https://github.com/apache/airflow/issues/12418


   AttributeError is raised when a dictionary is passed to executor_config 
using kubernetes executor.
   
   
   
   **Apache Airflow version**: 2.0
   
   
   **Kubernetes version (if you are using kubernetes)** (use `kubectl version`):
   Client Version: version.Info{Major:"1", Minor:"19", GitVersion:"v1.19.2", 
GitCommit:"f5743093fd1c663cb0cbc89748f730662345d44d", GitTreeState:"clean", 
BuildDate:"2020-09-16T13:41:02Z", GoVersion:"go1.15", Compiler:"gc", 
Platform:"linux/amd64"}
   Server Version: version.Info{Major:"1", Minor:"19", GitVersion:"v1.19.1", 
GitCommit:"206bcadf021e76c27513500ca24182692aabd17e", GitTreeState:"clean", 
BuildDate:"2020-09-14T07:30:52Z", GoVersion:"go1.15", Compiler:"gc", 
Platform:"linux/amd64"}
   
   **Environment**:
   
   - **OS** (e.g. from /etc/os-release): ubuntu
   
   **What happened**:
   
   While running the example dag 
https://github.com/apache/airflow/blob/master/airflow/example_dags/example_kubernetes_executor.py
 using the kubernetes executor, the scheduler raises an exception when trying 
to run `three_task`. The exception is as below:
   
   ```
   [2020-11-17 18:12:08,510] {base_executor.py:79} INFO - Adding to queue: 
['airflow', 'tasks', 'run', 'example_kubernetes_executor', 'three_task', 
'2020-11-17T18:11:10.769947+00:00', '--local', '--pool', 'default_pool', 
'--subdir', '/usr/local/airflow/dags/example_kubernetes_executor.py']
   [2020-11-17 18:12:08,523] {kubernetes_executor.py:543} INFO - Add task 
TaskInstanceKey(dag_id='example_kubernetes_executor', task_id='three_task', 
execution_date=datetime.datetime(2020, 11, 17, 18, 11, 10, 769947, 
tzinfo=Timezone('UTC')), try_number=1) with command ['airflow', 'tasks', 'run', 
'example_kubernetes_executor', 'three_task', 
'2020-11-17T18:11:10.769947+00:00', '--local', '--pool', 'default_pool', 
'--subdir', '/usr/local/airflow/dags/example_kubernetes_executor.py'] with 
executor_config {'KubernetesExecutor': {'request_memory': '128Mi', 
'limit_memory': '128Mi', 'tolerations': [{'key': 'dedicated', 'operator': 
'Equal', 'value': 'airflow'}], 'affinity': {'podAntiAffinity': 
{'requiredDuringSchedulingIgnoredDuringExecution': [{'topologyKey': 
'kubernetes.io/hostname', 'labelSelector': {'matchExpressions': [{'key': 'app', 
'operator': 'In', 'values': ['airflow']}]}}]}}}}
   
/usr/local/lib/python3.7/site-packages/airflow/kubernetes/pod_generator.py:197 
   DeprecationWarning: Using a dictionary for the executor_config is deprecated 
and
   will soon be removed.please use a `kubernetes.client.models.V1Pod` class 
with a 
   "pod_override" key instead. 
   [2020-11-17 18:12:08,573] {kubernetes_executor.py:280} INFO - Kubernetes job 
is (TaskInstanceKey(dag_id='example_kubernetes_executor', task_id='three_task', 
execution_date=datetime.datetime(2020, 11, 17, 18, 11, 10, 769947, 
tzinfo=Timezone('UTC')), try_number=1), ['airflow', 'tasks', 'run', 
'example_kubernetes_executor', 'three_task', 
'2020-11-17T18:11:10.769947+00:00', '--local', '--pool', 'default_pool', 
'--subdir', '/usr/local/airflow/dags/example_kubernetes_executor.py'], 
{'api_version': 'v1',
    'kind': 'Pod',
    'metadata': {'annotations': None,
                 'cluster_name': None,
                 'creation_timestamp': None,
                 'deletion_grace_period_seconds': None,
                 'deletion_timestamp': None,
                 'finalizers': None,
                 'generate_name': None,
                 'generation': None,
                 'initializers': None,
                 'labels': None,
                 'managed_fields': None,
                 'name': None,
                 'namespace': None,
                 'owner_references': None,
                 'resource_version': None,
                 'self_link': None,
                 'uid': None},
    'spec': {'active_deadline_seconds': None,
             'affinity': {'podAntiAffinity': 
{'requiredDuringSchedulingIgnoredDuringExecution': [{'labelSelector': 
{'matchExpressions': [{'key': 'app',
                                                                                
                                                          'operator': 'In',
                                                                                
                                                          'values': 
['airflow']}]},
                                                                                
                  'topologyKey': 'kubernetes.io/hostname'}]}},
             'automount_service_account_token': None,
             'containers': [{'args': [],
                             'command': [],
                             'env': [],
                             'env_from': [],
                             'image': None,
                             'image_pull_policy': None,
                             'lifecycle': None,
                             'liveness_probe': None,
                             'name': 'base',
                             'ports': [],
                             'readiness_probe': None,
                             'resources': {'limits': {'memory': '128Mi'},
                                           'requests': {'memory': '128Mi'}},
                             'security_context': None,
                             'stdin': None,
                             'stdin_once': None,
                             'termination_message_path': None,
                             'termination_message_policy': None,
                             'tty': None,
                             'volume_devices': None,
                             'volume_mounts': [],
                             'working_dir': None}],
             'dns_config': None,
             'dns_policy': None,
             'enable_service_links': None,
             'host_aliases': None,
             'host_ipc': None,
             'host_network': False,
             'host_pid': None,
             'hostname': None,
             'image_pull_secrets': [],
             'init_containers': None,
             'node_name': None,
             'node_selector': None,
             'preemption_policy': None,
             'priority': None,
             'priority_class_name': None,
             'readiness_gates': None,
             'restart_policy': None,
             'runtime_class_name': None,
             'scheduler_name': None,
             'security_context': None,
             'service_account': None,
             'service_account_name': None,
             'share_process_namespace': None,
             'subdomain': None,
             'termination_grace_period_seconds': None,
             'tolerations': [{'key': 'dedicated',
                              'operator': 'Equal',
                              'value': 'airflow'}],
             'volumes': []},
    'status': None}, None)
   [2020-11-17 18:12:08,594] {scheduler_job.py:1301} ERROR - Exception when 
executing SchedulerJob._run_scheduler_loop
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 
1283, in _execute
       self._run_scheduler_loop()
     File 
"/usr/local/lib/python3.7/site-packages/astronomer/airflow/version_check/plugin.py",
 line 29, in run_before
       fn(*args, **kwargs)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 
1387, in _run_scheduler_loop
       self.executor.heartbeat()
     File 
"/usr/local/lib/python3.7/site-packages/airflow/executors/base_executor.py", 
line 159, in heartbeat
       self.sync()
     File 
"/usr/local/lib/python3.7/site-packages/airflow/executors/kubernetes_executor.py",
 line 603, in sync
       self.kube_scheduler.run_next(task)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/executors/kubernetes_executor.py",
 line 313, in run_next
       self.launcher.run_pod_async(pod, 
**self.kube_config.kube_client_request_args)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/kubernetes/pod_launcher.py", 
line 90, in run_pod_async
       pod_mutation_hook(pod)
     File "/usr/local/airflow/config/airflow_local_settings.py", line 26, in 
pod_mutation_hook
       pod.spec.affinity = pod.spec.affinity.to_dict().update({})
   AttributeError: 'dict' object has no attribute 'to_dict'
   ```
   When the Affinity is changed to k8s object instead of the dictionary in the 
example dag the error is 
   
   ```
   [2020-11-17 19:39:55,850] {kubernetes_executor.py:543} INFO - Add task 
TaskInstanceKey(dag_id='example_kubernetes_executor', task_id='three_task', 
execution_date=datetime.datetime(2020, 11, 17, 19, 23, 35, 400362, 
tzinfo=Timezone('UTC')), try_number=1) with command ['airflow', 'tasks', 'run', 
'example_kubernetes_executor', 'three_task', 
'2020-11-17T19:23:35.400362+00:00', '--local', '--pool', 'default_pool', 
'--subdir', '/usr/local/airflow/dags/example_kubernetes_executor.py'] with 
executor_config {'KubernetesExecutor': {'request_memory': '128Mi', 
'limit_memory': '128Mi', 'tolerations': [{'key': 'dedicated', 'operator': 
'Equal', 'value': 'airflow'}], 'affinity': 
"{'preferred_during_scheduling_ignored_during_execution': None,\n 
'required_during_scheduling_ignored_during_execution': [{'label_selector': 
{'match_expressions': [{'key': 'app',\n                                         
                                                           'operator': 'in',\n  
                       
                                                                            
'values': ['airflow']}],\n                                                      
                       'match_labels': None},\n                                 
                         'namespaces': None,\n                                  
                        'topology_key': 'kubernetes.io/hostname'}]}"}}
   
/usr/local/lib/python3.7/site-packages/airflow/kubernetes/pod_generator.py:197 
   DeprecationWarning: Using a dictionary for the executor_config is deprecated 
and
   will soon be removed.please use a `kubernetes.client.models.V1Pod` class 
with a 
   "pod_override" key instead. 
   [2020-11-17 19:39:55,921] {kubernetes_executor.py:543} INFO - Add task 
TaskInstanceKey(dag_id='example_kubernetes_executor', task_id='four_task', 
execution_date=datetime.datetime(2020, 11, 17, 19, 23, 35, 400362, 
tzinfo=Timezone('UTC')), try_number=1) with command ['airflow', 'tasks', 'run', 
'example_kubernetes_executor', 'four_task', '2020-11-17T19:23:35.400362+00:00', 
'--local', '--pool', 'default_pool', '--subdir', 
'/usr/local/airflow/dags/example_kubernetes_executor.py'] with executor_config 
{'KubernetesExecutor': {'labels': {'foo': 'bar'}}}
   [2020-11-17 19:39:55,923] {kubernetes_executor.py:543} INFO - Add task 
TaskInstanceKey(dag_id='example_kubernetes_executor', task_id='two_task', 
execution_date=datetime.datetime(2020, 11, 17, 19, 23, 35, 400362, 
tzinfo=Timezone('UTC')), try_number=1) with command ['airflow', 'tasks', 'run', 
'example_kubernetes_executor', 'two_task', '2020-11-17T19:23:35.400362+00:00', 
'--local', '--pool', 'default_pool', '--subdir', 
'/usr/local/airflow/dags/example_kubernetes_executor.py'] with executor_config 
{'KubernetesExecutor': {'image': 'newproj:0.2'}}
   [2020-11-17 19:39:55,928] {kubernetes_executor.py:280} INFO - Kubernetes job 
is (TaskInstanceKey(dag_id='example_kubernetes_executor', task_id='three_task', 
execution_date=datetime.datetime(2020, 11, 17, 19, 23, 35, 400362, 
tzinfo=Timezone('UTC')), try_number=1), ['airflow', 'tasks', 'run', 
'example_kubernetes_executor', 'three_task', 
'2020-11-17T19:23:35.400362+00:00', '--local', '--pool', 'default_pool', 
'--subdir', '/usr/local/airflow/dags/example_kubernetes_executor.py'], 
{'api_version': 'v1',
    'kind': 'Pod',
    'metadata': {'annotations': None,
                 'cluster_name': None,
                 'creation_timestamp': None,
                 'deletion_grace_period_seconds': None,
                 'deletion_timestamp': None,
                 'finalizers': None,
                 'generate_name': None,
                 'generation': None,
                 'initializers': None,
                 'labels': None,
                 'managed_fields': None,
                 'name': None,
                 'namespace': None,
                 'owner_references': None,
                 'resource_version': None,
                 'self_link': None,
                 'uid': None},
    'spec': {'active_deadline_seconds': None,
             'affinity': 
"{'preferred_during_scheduling_ignored_during_execution': "
                         'None,\n'
                         " 
'required_during_scheduling_ignored_during_execution': "
                         "[{'label_selector': {'match_expressions': [{'key': "
                         "'app',\n"
                         '                                                      
                                              '
                         "'operator': 'in',\n"
                         '                                                      
                                              '
                         "'values': ['airflow']}],\n"
                         '                                                      
                       '
                         "'match_labels': None},\n"
                         '                                                      
    '
                         "'namespaces': None,\n"
                         '                                                      
    '
                         "'topology_key': 'kubernetes.io/hostname'}]}",
             'automount_service_account_token': None,
             'containers': [{'args': [],
                             'command': [],
                             'env': [],
                             'env_from': [],
                             'image': None,
                             'image_pull_policy': None,
                             'lifecycle': None,
                             'liveness_probe': None,
                             'name': 'base',
                             'ports': [],
                             'readiness_probe': None,
                             'resources': {'limits': {'memory': '128Mi'},
                                           'requests': {'memory': '128Mi'}},
                             'security_context': None,
                             'stdin': None,
                             'stdin_once': None,
                             'termination_message_path': None,
                             'termination_message_policy': None,
                             'tty': None,
                             'volume_devices': None,
                             'volume_mounts': [],
                             'working_dir': None}],
             'dns_config': None,
             'dns_policy': None,
             'enable_service_links': None,
             'host_aliases': None,
             'host_ipc': None,
             'host_network': False,
             'host_pid': None,
             'hostname': None,
             'image_pull_secrets': [],
             'init_containers': None,
             'node_name': None,
             'node_selector': None,
             'preemption_policy': None,
             'priority': None,
             'priority_class_name': None,
             'readiness_gates': None,
             'restart_policy': None,
             'runtime_class_name': None,
             'scheduler_name': None,
             'security_context': None,
             'service_account': None,
             'service_account_name': None,
             'share_process_namespace': None,
             'subdomain': None,
             'termination_grace_period_seconds': None,
             'tolerations': [{'key': 'dedicated',
                              'operator': 'Equal',
                              'value': 'airflow'}],
             'volumes': []},
    'status': None}, None)
   [2020-11-17 19:39:55,951] {scheduler_job.py:1301} ERROR - Exception when 
executing SchedulerJob._run_scheduler_loop
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 
1283, in _execute
       self._run_scheduler_loop()
     File 
"/usr/local/lib/python3.7/site-packages/astronomer/airflow/version_check/plugin.py",
 line 29, in run_before
       fn(*args, **kwargs)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 
1387, in _run_scheduler_loop
       self.executor.heartbeat()
     File 
"/usr/local/lib/python3.7/site-packages/airflow/executors/base_executor.py", 
line 159, in heartbeat
       self.sync()
     File 
"/usr/local/lib/python3.7/site-packages/airflow/executors/kubernetes_executor.py",
 line 603, in sync
       self.kube_scheduler.run_next(task)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/executors/kubernetes_executor.py",
 line 313, in run_next
       self.launcher.run_pod_async(pod, 
**self.kube_config.kube_client_request_args)
     File 
"/usr/local/lib/python3.7/site-packages/airflow/kubernetes/pod_launcher.py", 
line 90, in run_pod_async
       pod_mutation_hook(pod)
     File "/usr/local/airflow/config/airflow_local_settings.py", line 26, in 
pod_mutation_hook
       pod.spec.affinity = pod.spec.affinity.to_dict().update({})
   
   ```
   This prevents other tasks from creating pods
   
   **What you expected to happen**:
   
   I expected it to run without issues
   
   **How to reproduce it**:
   Run this example dag 
https://github.com/apache/airflow/blob/master/airflow/example_dags/example_kubernetes_executor.py
 in a kubernetes cluster using kubernetes executor


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to