[ 
https://issues.apache.org/jira/browse/AIRFLOW-5413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17346091#comment-17346091
 ] 

ASF GitHub Bot commented on AIRFLOW-5413:
-----------------------------------------

simonvanderveldt commented on a change in pull request #7114:
URL: https://github.com/apache/airflow/pull/7114#discussion_r633446213



##########
File path: airflow/kubernetes/pod_generator.py
##########
@@ -295,51 +343,178 @@ def reconcile_pods(base_pod: k8s.V1Pod, client_pod: 
k8s.V1Pod) -> k8s.V1Pod:
         :type client_pod: k8s.V1Pod
         :return: the merged pods
 
-        This can't be done recursively as certain fields are preserved,
-        some overwritten, and some concatenated, e.g. The command
-        should be preserved from base, the volumes appended to and
-        the other fields overwritten.
+        This can't be done recursively as certain fields some overwritten, and 
some concatenated.
         """
+        if client_pod is None:
+            return base_pod
 
         client_pod_cp = copy.deepcopy(client_pod)
+        client_pod_cp.spec = PodGenerator.reconcile_specs(base_pod.spec, 
client_pod_cp.spec)
 
-        def merge_objects(base_obj, client_obj):
-            for base_key in base_obj.to_dict().keys():
-                base_val = getattr(base_obj, base_key, None)
-                if not getattr(client_obj, base_key, None) and base_val:
-                    setattr(client_obj, base_key, base_val)
-
-        def extend_object_field(base_obj, client_obj, field_name):
-            base_obj_field = getattr(base_obj, field_name, None)
-            client_obj_field = getattr(client_obj, field_name, None)
-            if not base_obj_field:
-                return
-            if not client_obj_field:
-                setattr(client_obj, field_name, base_obj_field)
-                return
-            appended_fields = base_obj_field + client_obj_field
-            setattr(client_obj, field_name, appended_fields)
-
-        # Values at the pod and metadata should be overwritten where they 
exist,
-        # but certain values at the spec and container level must be conserved.
-        base_container = base_pod.spec.containers[0]
-        client_container = client_pod_cp.spec.containers[0]
-
-        extend_object_field(base_container, client_container, 'volume_mounts')
-        extend_object_field(base_container, client_container, 'env')
-        extend_object_field(base_container, client_container, 'env_from')
-        extend_object_field(base_container, client_container, 'ports')
-        extend_object_field(base_container, client_container, 'volume_devices')
-        client_container.command = base_container.command
-        client_container.args = base_container.args
-        merge_objects(base_pod.spec.containers[0], 
client_pod_cp.spec.containers[0])
-        # Just append any additional containers from the base pod
-        client_pod_cp.spec.containers.extend(base_pod.spec.containers[1:])
-
-        merge_objects(base_pod.metadata, client_pod_cp.metadata)
-
-        extend_object_field(base_pod.spec, client_pod_cp.spec, 'volumes')
-        merge_objects(base_pod.spec, client_pod_cp.spec)
-        merge_objects(base_pod, client_pod_cp)
+        client_pod_cp.metadata = merge_objects(base_pod.metadata, 
client_pod_cp.metadata)
+        client_pod_cp = merge_objects(base_pod, client_pod_cp)
 
         return client_pod_cp
+
+    @staticmethod
+    def reconcile_specs(base_spec: Optional[k8s.V1PodSpec],
+                        client_spec: Optional[k8s.V1PodSpec]) -> 
Optional[k8s.V1PodSpec]:
+        """
+        :param base_spec: has the base attributes which are overwritten if 
they exist
+            in the client_spec and remain if they do not exist in the 
client_spec
+        :type base_spec: k8s.V1PodSpec
+        :param client_spec: the spec that the client wants to create.
+        :type client_spec: k8s.V1PodSpec
+        :return: the merged specs
+        """
+        if base_spec and not client_spec:
+            return base_spec
+        if not base_spec and client_spec:
+            return client_spec
+        elif client_spec and base_spec:
+            client_spec.containers = PodGenerator.reconcile_containers(
+                base_spec.containers, client_spec.containers
+            )
+            merged_spec = extend_object_field(base_spec, client_spec, 
'volumes')
+            return merge_objects(base_spec, merged_spec)
+
+        return None
+
+    @staticmethod
+    def reconcile_containers(base_containers: List[k8s.V1Container],
+                             client_containers: List[k8s.V1Container]) -> 
List[k8s.V1Container]:
+        """
+        :param base_containers: has the base attributes which are overwritten 
if they exist
+            in the client_containers and remain if they do not exist in the 
client_containers
+        :type base_containers: List[k8s.V1Container]
+        :param client_containers: the containers that the client wants to 
create.
+        :type client_containers: List[k8s.V1Container]
+        :return: the merged containers
+
+        The runs recursively over the list of containers.
+        """
+        if not base_containers:
+            return client_containers
+        if not client_containers:
+            return base_containers
+
+        client_container = client_containers[0]
+        base_container = base_containers[0]
+        client_container = extend_object_field(base_container, 
client_container, 'volume_mounts')
+        client_container = extend_object_field(base_container, 
client_container, 'env')
+        client_container = extend_object_field(base_container, 
client_container, 'env_from')
+        client_container = extend_object_field(base_container, 
client_container, 'ports')
+        client_container = extend_object_field(base_container, 
client_container, 'volume_devices')
+        client_container = merge_objects(base_container, client_container)
+
+        return [client_container] + PodGenerator.reconcile_containers(
+            base_containers[1:], client_containers[1:]
+        )
+
+    @staticmethod
+    def construct_pod(
+        dag_id: str,
+        task_id: str,
+        pod_id: str,
+        try_number: int,
+        date: str,
+        command: List[str],
+        kube_executor_config: Optional[k8s.V1Pod],
+        worker_config: k8s.V1Pod,
+        namespace: str,
+        worker_uuid: str
+    ) -> k8s.V1Pod:
+        """
+        Construct a pod by gathering and consolidating the configuration from 
3 places:
+            - airflow.cfg
+            - executor_config
+            - dynamic arguments
+        """
+        dynamic_pod = PodGenerator(
+            namespace=namespace,
+            labels={
+                'airflow-worker': worker_uuid,
+                'dag_id': dag_id,
+                'task_id': task_id,
+                'execution_date': date,
+                'try_number': str(try_number),
+                'airflow_version': airflow_version.replace('+', '-'),
+                'kubernetes_executor': 'True',
+            },
+            cmds=command,
+            name=pod_id
+        ).gen_pod()
+
+        # Reconcile the pod generated by the Operator and the Pod
+        # generated by the .cfg file
+        pod_with_executor_config = PodGenerator.reconcile_pods(worker_config,
+                                                               
kube_executor_config)
+        # Reconcile that pod with the dynamic fields.
+        return PodGenerator.reconcile_pods(pod_with_executor_config, 
dynamic_pod)
+
+    @staticmethod
+    def make_unique_pod_id(dag_id):
+        """
+        Kubernetes pod names must be <= 253 chars and must pass the following 
regex for
+        validation
+        ``^[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*$``
+
+        :param dag_id: a dag_id with only alphanumeric characters
+        :return: ``str`` valid Pod name of appropriate length
+        """
+        if not dag_id:
+            return None
+
+        safe_uuid = uuid.uuid4().hex

Review comment:
       Why was this changed from the 8 character long string it was before (see 
old line 133) to this new 32 character long string?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


> Get pod configuration from JSON or Yaml
> ---------------------------------------
>
>                 Key: AIRFLOW-5413
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-5413
>             Project: Apache Airflow
>          Issue Type: New Feature
>          Components: executor-kubernetes
>    Affects Versions: 1.10.5
>            Reporter: David Lum
>            Assignee: David Lum
>            Priority: Minor
>              Labels: kubernetes
>             Fix For: 2.0.0
>
>
> Currently the Kubernetes components of Airflow try to recreate the Kubernetes 
> API, but don't let users pass in their own JSON or YAML. This feature would 
> allow users to do this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to