paramjeet01 commented on issue #39267:
URL: https://github.com/apache/airflow/issues/39267#issuecomment-2113375888

   I can confirm that the issue is solved with the below code , we have added 
this as custom extract_xcom : 
   This is also mentioned here : 
https://github.com/kubernetes-client/python-base/issues/190#issuecomment-805073981
 
   ```
       def extract_xcom_json(self, pod: V1Pod):
           try:
               self.log.info(f'Running command... cat 
{PodDefaults.XCOM_MOUNT_PATH}/return.json')
               client = kubernetes_stream(
                   self._client.connect_get_namespaced_pod_exec,
                   pod.metadata.name,
                   pod.metadata.namespace,
                   container=PodDefaults.SIDECAR_CONTAINER_NAME,
                   command=[
                       '/bin/sh',
                       '-c',
                       f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json',
                   ],
                   stderr=True,
                   stdin=False,
                   stdout=True,
                   tty=False,
                   _preload_content=False,
                   _request_timeout=10,
               )
               client.run_forever(timeout=10)
               result = client.read_all()
               self.log.info("Received {} ({}) ({} ... 
{}))".format(type(result), len(result), result[:64], result[-64:]))
   
               # validate it's valid json
               _ = json.loads(result)
   
               # Terminate the sidecar
               _ = kubernetes_stream(
                   self._client.connect_get_namespaced_pod_exec,
                   pod.metadata.name,
                   pod.metadata.namespace,
                   container=PodDefaults.SIDECAR_CONTAINER_NAME,
                   command=[
                       '/bin/sh',
                       '-c',
                       'kill -s SIGINT 1',
                   ],
                   stderr=True,
                   stdin=False,
                   stdout=True,
                   tty=False,
                   _preload_content=True,
                   _request_timeout=10,
               )
   
               return result
   
           except json.JSONDecodeError:
               message = f'Failed to decode json document from pod: 
{pod.metadata.name}'
               self.log.exception(message)
               raise AirflowException(message)
   
           except Exception as e:
               message = f'Failed to extract xcom from pod: {pod.metadata.name}'
               self.log.exception(message)
               raise AirflowException(message)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to