paramjeet01 commented on issue #39267: URL: https://github.com/apache/airflow/issues/39267#issuecomment-2113375888
I can confirm that the issue is solved with the below code , we have added this as custom extract_xcom : This is also mentioned here : https://github.com/kubernetes-client/python-base/issues/190#issuecomment-805073981 ``` def extract_xcom_json(self, pod: V1Pod): try: self.log.info(f'Running command... cat {PodDefaults.XCOM_MOUNT_PATH}/return.json') client = kubernetes_stream( self._client.connect_get_namespaced_pod_exec, pod.metadata.name, pod.metadata.namespace, container=PodDefaults.SIDECAR_CONTAINER_NAME, command=[ '/bin/sh', '-c', f'cat {PodDefaults.XCOM_MOUNT_PATH}/return.json', ], stderr=True, stdin=False, stdout=True, tty=False, _preload_content=False, _request_timeout=10, ) client.run_forever(timeout=10) result = client.read_all() self.log.info("Received {} ({}) ({} ... {}))".format(type(result), len(result), result[:64], result[-64:])) # validate it's valid json _ = json.loads(result) # Terminate the sidecar _ = kubernetes_stream( self._client.connect_get_namespaced_pod_exec, pod.metadata.name, pod.metadata.namespace, container=PodDefaults.SIDECAR_CONTAINER_NAME, command=[ '/bin/sh', '-c', 'kill -s SIGINT 1', ], stderr=True, stdin=False, stdout=True, tty=False, _preload_content=True, _request_timeout=10, ) return result except json.JSONDecodeError: message = f'Failed to decode json document from pod: {pod.metadata.name}' self.log.exception(message) raise AirflowException(message) except Exception as e: message = f'Failed to extract xcom from pod: {pod.metadata.name}' self.log.exception(message) raise AirflowException(message) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
