ashb commented on a change in pull request #6230: [AIRFLOW-5413] Allow K8S 
worker pod to be configured from JSON/YAML file
URL: https://github.com/apache/airflow/pull/6230#discussion_r355992984
 
 

 ##########
 File path: airflow/kubernetes/pod_generator.py
 ##########
 @@ -298,51 +361,171 @@ def reconcile_pods(base_pod: k8s.V1Pod, client_pod: 
k8s.V1Pod) -> k8s.V1Pod:
         :type client_pod: k8s.V1Pod
         :return: the merged pods
 
-        This can't be done recursively as certain fields are preserved,
-        some overwritten, and some concatenated, e.g. The command
-        should be preserved from base, the volumes appended to and
-        the other fields overwritten.
+        This can't be done recursively as certain fields some overwritten, and 
some concatenated.
         """
 
+        if client_pod is None:
+            return base_pod
+
         client_pod_cp = copy.deepcopy(client_pod)
 
         def merge_objects(base_obj, client_obj):
+            if not base_obj:
+                return client_obj
+            if not client_obj:
+                return base_obj
+
+            client_obj_cp = copy.deepcopy(client_obj)
+
             for base_key in base_obj.to_dict().keys():
                 base_val = getattr(base_obj, base_key, None)
                 if not getattr(client_obj, base_key, None) and base_val:
-                    setattr(client_obj, base_key, base_val)
+                    setattr(client_obj_cp, base_key, base_val)
+            return client_obj_cp
 
         def extend_object_field(base_obj, client_obj, field_name):
+            client_obj_cp = copy.deepcopy(client_obj)
             base_obj_field = getattr(base_obj, field_name, None)
             client_obj_field = getattr(client_obj, field_name, None)
             if not base_obj_field:
-                return
+                return client_obj_cp
             if not client_obj_field:
-                setattr(client_obj, field_name, base_obj_field)
-                return
+                setattr(client_obj_cp, field_name, base_obj_field)
+                return client_obj_cp
             appended_fields = base_obj_field + client_obj_field
-            setattr(client_obj, field_name, appended_fields)
-
-        # Values at the pod and metadata should be overwritten where they 
exist,
-        # but certain values at the spec and container level must be conserved.
-        base_container = base_pod.spec.containers[0]
-        client_container = client_pod_cp.spec.containers[0]
-
-        extend_object_field(base_container, client_container, 'volume_mounts')
-        extend_object_field(base_container, client_container, 'env')
-        extend_object_field(base_container, client_container, 'env_from')
-        extend_object_field(base_container, client_container, 'ports')
-        extend_object_field(base_container, client_container, 'volume_devices')
-        client_container.command = base_container.command
-        client_container.args = base_container.args
-        merge_objects(base_pod.spec.containers[0], 
client_pod_cp.spec.containers[0])
-        # Just append any additional containers from the base pod
-        client_pod_cp.spec.containers.extend(base_pod.spec.containers[1:])
-
-        merge_objects(base_pod.metadata, client_pod_cp.metadata)
-
-        extend_object_field(base_pod.spec, client_pod_cp.spec, 'volumes')
-        merge_objects(base_pod.spec, client_pod_cp.spec)
-        merge_objects(base_pod, client_pod_cp)
+            setattr(client_obj_cp, field_name, appended_fields)
+            return client_obj_cp
+
+        if base_pod.spec and not client_pod.spec:
+            client_pod_cp.spec = base_pod.spec
+        elif client_pod_cp.spec and base_pod.spec:
+            client_container = client_pod_cp.spec.containers[0]
+            base_container = base_pod.spec.containers[0]
+            cc1 = extend_object_field(base_container, client_container, 
'volume_mounts')
+            cc2 = extend_object_field(base_container, cc1, 'env')
+            cc3 = extend_object_field(base_container, cc2, 'env_from')
+            cc4 = extend_object_field(base_container, cc3, 'ports')
+            cc5 = extend_object_field(base_container, cc4, 'volume_devices')
+
+            cc6 = merge_objects(base_container, cc5)
+            client_pod_cp.spec.containers[0] = cc6
+            # Just append any additional containers from the base pod
+            client_pod_cp.spec.containers.extend(base_pod.spec.containers[1:])
+            merged_spec = extend_object_field(base_pod.spec, 
client_pod_cp.spec, 'volumes')
+            client_pod_cp.spec = merge_objects(base_pod.spec, merged_spec)
+
+        client_pod_cp.metadata = merge_objects(base_pod.metadata, 
client_pod_cp.metadata)
+        client_pod_cp = merge_objects(base_pod, client_pod_cp)
 
         return client_pod_cp
+
+    @staticmethod
+    def construct_pod(
+        dag_id: str,
+        task_id: str,
+        pod_id: str,
+        try_number: int,
+        date: str,
+        command: List[str],
+        kube_executor_config: Optional[k8s.V1Pod],
+        worker_config: k8s.V1Pod,
+        namespace: str,
+        worker_uuid: str
+    ) -> k8s.V1Pod:
+        """
+        Construct a pod by gathering and consolidating the configuration from 
3 places:
+            - airflow.cfg
+            - executor_config
+            - dynamic arguments
+        """
+
+        dynamic_pod = PodGenerator(
+            namespace=namespace,
+            labels={
+                'airflow-worker': worker_uuid,
+                'dag_id': dag_id,
+                'task_id': task_id,
+                'execution_date': date,
+                'try_number': str(try_number),
+                'airflow_version': airflow_version.replace('+', '-'),
+                'kubernetes_executor': 'True',
+            },
+            cmds=command,
+            name=pod_id
+        ).gen_pod()
+
+        # Reconcile the pod generated by the Operator and the Pod
+        # generated by the .cfg file
+        pod_with_executor_config = PodGenerator.reconcile_pods(worker_config,
+                                                               
kube_executor_config)
+        # Reconcile that pod with the dynamic fields.
+        return PodGenerator.reconcile_pods(pod_with_executor_config, 
dynamic_pod)
+
+    @staticmethod
+    def deserialize_model_file(api_client: ApiClient, path: str) -> k8s.V1Pod:
+        """
+        :param api_client: K8S client object
+        :param path: Path to the file
+        :return: a kubernetes.client.models.V1Pod
+        """
+        pod = None
+        with open(path) as stream:
+            if '.json' in path:
+                pod = json.load(stream)
+            elif '.yaml' in path:
+                pod = yaml.safe_load(stream)
+            elif not pod:
+                raise AirflowConfigException("Path was neither .json nor 
.yaml")
+
+            # pylint: disable=protected-access
+            return api_client._ApiClient__deserialize_model(pod, k8s.V1Pod)
+
+    @staticmethod
+    def deserialize_model_string(api_client: ApiClient, string: str) -> 
k8s.V1Pod:
+        """
+        :param api_client: K8S client object
+        :param string: a string of the deployment
+        :return: a kubernetes.client.models.V1Pod
+        """
+        try:
+            pod = json.loads(string)
+        except json.decoder.JSONDecodeError:
+            try:
+                pod = yaml.safe_load(string)
+            except ScannerError:
+                raise AirflowConfigException(
+                    "Could not parse {} as yaml or json".format(string)
+                )
+
+        # pylint: disable=protected-access
+        return api_client._ApiClient__deserialize_model(pod, k8s.V1Pod)
+
+    @staticmethod
+    def deserialize_model(path_or_string) -> k8s.V1Pod:
+        """
+        :param path_or_string: path to a pod JSON/YAML or a string
+        :return: a kubernetes.client.models.V1Pod
+        """
+        api_client = ApiClient()
+        if os.path.exists(path_or_string):
+            return PodGenerator.deserialize_model_file(api_client, 
path_or_string)
 
 Review comment:
   Make this a `@classmethod` and then do
   ```suggestion
               return cls.deserialize_model_file(api_client, path_or_string)
   ```
   
   etc.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to