jedcunningham commented on a change in pull request #19572:
URL: https://github.com/apache/airflow/pull/19572#discussion_r773383817



##########
File path: airflow/providers/cncf/kubernetes/utils/pod_launcher.py
##########
@@ -117,79 +129,99 @@ def delete_pod(self, pod: V1Pod) -> None:
         reraise=True,
         retry=tenacity.retry_if_exception(should_retry_start_pod),
     )
-    def start_pod(self, pod: V1Pod, startup_timeout: int = 120) -> None:
+    def create_pod(self, pod: V1Pod) -> V1Pod:
+        """Launches the pod asynchronously."""
+        return self.run_pod_async(pod)
+
+    def await_pod_start(self, pod: V1Pod, startup_timeout: int = 120) -> None:
         """
-        Launches the pod synchronously and waits for completion.
+        Waits for the pod to reach phase other than ``Pending``
 
         :param pod:
         :param startup_timeout: Timeout (in seconds) for startup of the pod
             (if pod is pending for too long, fails task)
         :return:
         """
-        resp = self.run_pod_async(pod)
         curr_time = dt.now()
-        if resp.status.start_time is None:
-            while self.pod_not_started(pod):
-                self.log.warning("Pod not yet started: %s", pod.metadata.name)
-                delta = dt.now() - curr_time
-                if delta.total_seconds() >= startup_timeout:
-                    msg = (
-                        f"Pod took longer than {startup_timeout} seconds to 
start. "
-                        "Check the pod events in kubernetes to determine why."
-                    )
-                    raise AirflowException(msg)
-                time.sleep(1)
-
-    def monitor_pod(self, pod: V1Pod, get_logs: bool) -> Tuple[State, V1Pod, 
Optional[str]]:
+        while True:
+            remote_pod = self.read_pod(pod)
+            if remote_pod.status.phase != PodStatus.PENDING:
+                break
+            self.log.warning("Pod not yet started: %s", pod.metadata.name)
+            delta = dt.now() - curr_time
+            if delta.total_seconds() >= startup_timeout:
+                msg = (
+                    f"Pod took longer than {startup_timeout} seconds to start. 
"
+                    "Check the pod events in kubernetes to determine why."
+                )
+                raise PodLaunchFailedException(msg)
+            time.sleep(1)
+
+    def follow_container_logs(self, pod: V1Pod, container_name: str):
+        """
+        Follows the logs of container and streams to airflow logging.
+        Returns when container exits.
+        """
+        container_stopped = False
+        read_logs_since_sec = None
+        last_log_time = None
+
+        # `read_pod_logs` follows the logs so we shouldn't necessarily _need_ 
to loop
+        # but in a long-running process we might lose connectivity and this 
way we
+        # can resume following the logs
+        while True:
+            try:
+                logs = self.read_pod_logs(
+                    pod=pod,
+                    container_name=container_name,
+                    timestamps=True,
+                    since_seconds=read_logs_since_sec,
+                )
+                for line in logs:  # type: bytes
+                    timestamp, message = 
self.parse_log_line(line.decode('utf-8'))
+                    self.log.info(message)
+                    if timestamp:
+                        last_log_time = timestamp
+            except BaseHTTPError:  # Catches errors like 
ProtocolError(TimeoutError).
+                self.log.warning(
+                    'Failed to read logs for pod %s',
+                    pod.metadata.name,
+                    exc_info=True,
+                )
+
+            if container_stopped is True:
+                break
+
+            if last_log_time:
+                delta = pendulum.now() - last_log_time
+                read_logs_since_sec = math.ceil(delta.total_seconds())
+
+            time.sleep(1)
+
+            if self.container_is_running(pod, container_name=container_name):
+                self.log.info('Container %s is running', pod.metadata.name)
+                self.log.warning('Pod %s log read interrupted', 
pod.metadata.name)

Review comment:
       We had a chat and we are going to leave this unlikely race condition 
alone for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to