VladaZakharova commented on code in PR #32365:
URL: https://github.com/apache/airflow/pull/32365#discussion_r1270572979
##########
airflow/providers/google/cloud/hooks/compute_ssh.py:
##########
@@ -225,40 +227,58 @@ def get_conn(self) -> paramiko.SSHClient:
hostname = self.hostname
privkey, pubkey = self._generate_ssh_key(self.user)
- if self.use_oslogin:
- user = self._authorize_os_login(pubkey)
- else:
- user = self.user
- self._authorize_compute_engine_instance_metadata(pubkey)
-
- proxy_command = None
- if self.use_iap_tunnel:
- proxy_command_args = [
- "gcloud",
- "compute",
- "start-iap-tunnel",
- str(self.instance_name),
- "22",
- "--listen-on-stdin",
- f"--project={self.project_id}",
- f"--zone={self.zone}",
- "--verbosity=warning",
- ]
- proxy_command = " ".join(shlex.quote(arg) for arg in
proxy_command_args)
-
- sshclient = self._connect_to_instance(user, hostname, privkey,
proxy_command)
+
+ max_retries = 10
+ max_delay = 10
+ sshclient = None
+ for retry in range(max_retries + 1):
+ try:
+ if self.use_oslogin:
+ user = self._authorize_os_login(pubkey)
+ else:
+ user = self.user
+ self._authorize_compute_engine_instance_metadata(pubkey)
+ proxy_command = None
+ if self.use_iap_tunnel:
+ proxy_command_args = [
+ "gcloud",
+ "compute",
+ "start-iap-tunnel",
+ str(self.instance_name),
+ "22",
+ "--listen-on-stdin",
+ f"--project={self.project_id}",
+ f"--zone={self.zone}",
+ "--verbosity=warning",
+ ]
+ proxy_command = " ".join(shlex.quote(arg) for arg in
proxy_command_args)
+ sshclient = self._connect_to_instance(user, hostname, privkey,
proxy_command)
+ break
+ except (HttpError, AirflowException, SSHException) as exc:
+ if (isinstance(exc, HttpError) and exc.resp.status == 412) or (
+ isinstance(exc, AirflowException) and "412 PRECONDITION
FAILED" in str(exc)
+ ):
+ self.log.info("Error occurred when trying to update
instance metadata: %s", exc)
+ elif isinstance(exc, SSHException):
+ self.log.info("Error occurred when establishing SSH
connection using Paramiko: %s", exc)
+ if retry == max_retries:
+ raise AirflowException("Maximum retries exceeded. Aborting
operation.")
+ delay = random.randint(0, max_delay)
+ self.log.info(f"Failed establish SSH connection, waiting
{delay} seconds to retry...")
+ time.sleep(delay)
+ if not sshclient:
+ raise AirflowException("Unable to establish SSH connection.")
Review Comment:
This check should be here for linter (i have static check errors without it
:( )
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]