VladaZakharova commented on code in PR #32365:
URL: https://github.com/apache/airflow/pull/32365#discussion_r1270572979


##########
airflow/providers/google/cloud/hooks/compute_ssh.py:
##########
@@ -225,40 +227,58 @@ def get_conn(self) -> paramiko.SSHClient:
             hostname = self.hostname
 
         privkey, pubkey = self._generate_ssh_key(self.user)
-        if self.use_oslogin:
-            user = self._authorize_os_login(pubkey)
-        else:
-            user = self.user
-            self._authorize_compute_engine_instance_metadata(pubkey)
-
-        proxy_command = None
-        if self.use_iap_tunnel:
-            proxy_command_args = [
-                "gcloud",
-                "compute",
-                "start-iap-tunnel",
-                str(self.instance_name),
-                "22",
-                "--listen-on-stdin",
-                f"--project={self.project_id}",
-                f"--zone={self.zone}",
-                "--verbosity=warning",
-            ]
-            proxy_command = " ".join(shlex.quote(arg) for arg in 
proxy_command_args)
-
-        sshclient = self._connect_to_instance(user, hostname, privkey, 
proxy_command)
+
+        max_retries = 10
+        max_delay = 10
+        sshclient = None
+        for retry in range(max_retries + 1):
+            try:
+                if self.use_oslogin:
+                    user = self._authorize_os_login(pubkey)
+                else:
+                    user = self.user
+                    self._authorize_compute_engine_instance_metadata(pubkey)
+                proxy_command = None
+                if self.use_iap_tunnel:
+                    proxy_command_args = [
+                        "gcloud",
+                        "compute",
+                        "start-iap-tunnel",
+                        str(self.instance_name),
+                        "22",
+                        "--listen-on-stdin",
+                        f"--project={self.project_id}",
+                        f"--zone={self.zone}",
+                        "--verbosity=warning",
+                    ]
+                    proxy_command = " ".join(shlex.quote(arg) for arg in 
proxy_command_args)
+                sshclient = self._connect_to_instance(user, hostname, privkey, 
proxy_command)
+                break
+            except (HttpError, AirflowException, SSHException) as exc:
+                if (isinstance(exc, HttpError) and exc.resp.status == 412) or (
+                    isinstance(exc, AirflowException) and "412 PRECONDITION 
FAILED" in str(exc)
+                ):
+                    self.log.info("Error occurred when trying to update 
instance metadata: %s", exc)
+                elif isinstance(exc, SSHException):
+                    self.log.info("Error occurred when establishing SSH 
connection using Paramiko: %s", exc)
+                if retry == max_retries:
+                    raise AirflowException("Maximum retries exceeded. Aborting 
operation.")
+                delay = random.randint(0, max_delay)
+                self.log.info(f"Failed establish SSH connection, waiting 
{delay} seconds to retry...")
+                time.sleep(delay)
+        if not sshclient:
+            raise AirflowException("Unable to establish SSH connection.")

Review Comment:
   This check should be here for linter (i have static check errors without it 
:( )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to