ahidalgob commented on code in PR #32365:
URL: https://github.com/apache/airflow/pull/32365#discussion_r1269369494


##########
airflow/providers/google/cloud/hooks/compute_ssh.py:
##########
@@ -225,40 +227,58 @@ def get_conn(self) -> paramiko.SSHClient:
             hostname = self.hostname
 
         privkey, pubkey = self._generate_ssh_key(self.user)
-        if self.use_oslogin:
-            user = self._authorize_os_login(pubkey)
-        else:
-            user = self.user
-            self._authorize_compute_engine_instance_metadata(pubkey)
-
-        proxy_command = None
-        if self.use_iap_tunnel:
-            proxy_command_args = [
-                "gcloud",
-                "compute",
-                "start-iap-tunnel",
-                str(self.instance_name),
-                "22",
-                "--listen-on-stdin",
-                f"--project={self.project_id}",
-                f"--zone={self.zone}",
-                "--verbosity=warning",
-            ]
-            proxy_command = " ".join(shlex.quote(arg) for arg in 
proxy_command_args)
-
-        sshclient = self._connect_to_instance(user, hostname, privkey, 
proxy_command)
+
+        max_retries = 10
+        max_delay = 10
+        sshclient = None
+        for retry in range(max_retries + 1):
+            try:
+                if self.use_oslogin:
+                    user = self._authorize_os_login(pubkey)
+                else:
+                    user = self.user
+                    self._authorize_compute_engine_instance_metadata(pubkey)
+                proxy_command = None
+                if self.use_iap_tunnel:
+                    proxy_command_args = [
+                        "gcloud",
+                        "compute",
+                        "start-iap-tunnel",
+                        str(self.instance_name),
+                        "22",
+                        "--listen-on-stdin",
+                        f"--project={self.project_id}",
+                        f"--zone={self.zone}",
+                        "--verbosity=warning",
+                    ]
+                    proxy_command = " ".join(shlex.quote(arg) for arg in 
proxy_command_args)
+                sshclient = self._connect_to_instance(user, hostname, privkey, 
proxy_command)
+                break
+            except (HttpError, AirflowException, SSHException) as exc:
+                if (isinstance(exc, HttpError) and exc.resp.status == 412) or (
+                    isinstance(exc, AirflowException) and "412 PRECONDITION 
FAILED" in str(exc)
+                ):
+                    self.log.info("Error occurred when trying to update 
instance metadata: %s", exc)
+                elif isinstance(exc, SSHException):
+                    self.log.info("Error occurred when establishing SSH 
connection using Paramiko: %s", exc)
+                if retry == max_retries:
+                    raise AirflowException("Maximum retries exceeded. Aborting 
operation.")
+                delay = random.randint(0, max_delay)
+                self.log.info(f"Failed establish SSH connection, waiting 
{delay} seconds to retry...")
+                time.sleep(delay)
+        if not sshclient:
+            raise AirflowException("Unable to establish SSH connection.")

Review Comment:
   Nit: This case never happens, does it?



##########
airflow/providers/google/cloud/hooks/compute_ssh.py:
##########
@@ -225,40 +227,58 @@ def get_conn(self) -> paramiko.SSHClient:
             hostname = self.hostname
 
         privkey, pubkey = self._generate_ssh_key(self.user)
-        if self.use_oslogin:
-            user = self._authorize_os_login(pubkey)
-        else:
-            user = self.user
-            self._authorize_compute_engine_instance_metadata(pubkey)
-
-        proxy_command = None
-        if self.use_iap_tunnel:
-            proxy_command_args = [
-                "gcloud",
-                "compute",
-                "start-iap-tunnel",
-                str(self.instance_name),
-                "22",
-                "--listen-on-stdin",
-                f"--project={self.project_id}",
-                f"--zone={self.zone}",
-                "--verbosity=warning",
-            ]
-            proxy_command = " ".join(shlex.quote(arg) for arg in 
proxy_command_args)
-
-        sshclient = self._connect_to_instance(user, hostname, privkey, 
proxy_command)
+
+        max_retries = 10

Review Comment:
   Should we make this parameterizable? If a user creates many ssh connections 
at the same time 10 might not be enough.



##########
airflow/providers/google/cloud/hooks/compute_ssh.py:
##########
@@ -225,40 +227,58 @@ def get_conn(self) -> paramiko.SSHClient:
             hostname = self.hostname
 
         privkey, pubkey = self._generate_ssh_key(self.user)
-        if self.use_oslogin:
-            user = self._authorize_os_login(pubkey)
-        else:
-            user = self.user
-            self._authorize_compute_engine_instance_metadata(pubkey)
-
-        proxy_command = None
-        if self.use_iap_tunnel:
-            proxy_command_args = [
-                "gcloud",
-                "compute",
-                "start-iap-tunnel",
-                str(self.instance_name),
-                "22",
-                "--listen-on-stdin",
-                f"--project={self.project_id}",
-                f"--zone={self.zone}",
-                "--verbosity=warning",
-            ]
-            proxy_command = " ".join(shlex.quote(arg) for arg in 
proxy_command_args)
-
-        sshclient = self._connect_to_instance(user, hostname, privkey, 
proxy_command)
+
+        max_retries = 10
+        max_delay = 10
+        sshclient = None
+        for retry in range(max_retries + 1):
+            try:
+                if self.use_oslogin:
+                    user = self._authorize_os_login(pubkey)
+                else:
+                    user = self.user
+                    self._authorize_compute_engine_instance_metadata(pubkey)
+                proxy_command = None
+                if self.use_iap_tunnel:
+                    proxy_command_args = [
+                        "gcloud",
+                        "compute",
+                        "start-iap-tunnel",
+                        str(self.instance_name),
+                        "22",
+                        "--listen-on-stdin",
+                        f"--project={self.project_id}",
+                        f"--zone={self.zone}",
+                        "--verbosity=warning",
+                    ]
+                    proxy_command = " ".join(shlex.quote(arg) for arg in 
proxy_command_args)
+                sshclient = self._connect_to_instance(user, hostname, privkey, 
proxy_command)
+                break
+            except (HttpError, AirflowException, SSHException) as exc:
+                if (isinstance(exc, HttpError) and exc.resp.status == 412) or (
+                    isinstance(exc, AirflowException) and "412 PRECONDITION 
FAILED" in str(exc)
+                ):
+                    self.log.info("Error occurred when trying to update 
instance metadata: %s", exc)
+                elif isinstance(exc, SSHException):
+                    self.log.info("Error occurred when establishing SSH 
connection using Paramiko: %s", exc)

Review Comment:
   We could have an `else` to just raise the exception in case the exception 
was `HttpError` or `AirflowException` that doesn't match the identified errors 
we want to retry.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to