ahidalgob commented on code in PR #32365:
URL: https://github.com/apache/airflow/pull/32365#discussion_r1269369494
##########
airflow/providers/google/cloud/hooks/compute_ssh.py:
##########
@@ -225,40 +227,58 @@ def get_conn(self) -> paramiko.SSHClient:
hostname = self.hostname
privkey, pubkey = self._generate_ssh_key(self.user)
- if self.use_oslogin:
- user = self._authorize_os_login(pubkey)
- else:
- user = self.user
- self._authorize_compute_engine_instance_metadata(pubkey)
-
- proxy_command = None
- if self.use_iap_tunnel:
- proxy_command_args = [
- "gcloud",
- "compute",
- "start-iap-tunnel",
- str(self.instance_name),
- "22",
- "--listen-on-stdin",
- f"--project={self.project_id}",
- f"--zone={self.zone}",
- "--verbosity=warning",
- ]
- proxy_command = " ".join(shlex.quote(arg) for arg in
proxy_command_args)
-
- sshclient = self._connect_to_instance(user, hostname, privkey,
proxy_command)
+
+ max_retries = 10
+ max_delay = 10
+ sshclient = None
+ for retry in range(max_retries + 1):
+ try:
+ if self.use_oslogin:
+ user = self._authorize_os_login(pubkey)
+ else:
+ user = self.user
+ self._authorize_compute_engine_instance_metadata(pubkey)
+ proxy_command = None
+ if self.use_iap_tunnel:
+ proxy_command_args = [
+ "gcloud",
+ "compute",
+ "start-iap-tunnel",
+ str(self.instance_name),
+ "22",
+ "--listen-on-stdin",
+ f"--project={self.project_id}",
+ f"--zone={self.zone}",
+ "--verbosity=warning",
+ ]
+ proxy_command = " ".join(shlex.quote(arg) for arg in
proxy_command_args)
+ sshclient = self._connect_to_instance(user, hostname, privkey,
proxy_command)
+ break
+ except (HttpError, AirflowException, SSHException) as exc:
+ if (isinstance(exc, HttpError) and exc.resp.status == 412) or (
+ isinstance(exc, AirflowException) and "412 PRECONDITION
FAILED" in str(exc)
+ ):
+ self.log.info("Error occurred when trying to update
instance metadata: %s", exc)
+ elif isinstance(exc, SSHException):
+ self.log.info("Error occurred when establishing SSH
connection using Paramiko: %s", exc)
+ if retry == max_retries:
+ raise AirflowException("Maximum retries exceeded. Aborting
operation.")
+ delay = random.randint(0, max_delay)
+ self.log.info(f"Failed establish SSH connection, waiting
{delay} seconds to retry...")
+ time.sleep(delay)
+ if not sshclient:
+ raise AirflowException("Unable to establish SSH connection.")
Review Comment:
Nit: This case never happens, does it?
##########
airflow/providers/google/cloud/hooks/compute_ssh.py:
##########
@@ -225,40 +227,58 @@ def get_conn(self) -> paramiko.SSHClient:
hostname = self.hostname
privkey, pubkey = self._generate_ssh_key(self.user)
- if self.use_oslogin:
- user = self._authorize_os_login(pubkey)
- else:
- user = self.user
- self._authorize_compute_engine_instance_metadata(pubkey)
-
- proxy_command = None
- if self.use_iap_tunnel:
- proxy_command_args = [
- "gcloud",
- "compute",
- "start-iap-tunnel",
- str(self.instance_name),
- "22",
- "--listen-on-stdin",
- f"--project={self.project_id}",
- f"--zone={self.zone}",
- "--verbosity=warning",
- ]
- proxy_command = " ".join(shlex.quote(arg) for arg in
proxy_command_args)
-
- sshclient = self._connect_to_instance(user, hostname, privkey,
proxy_command)
+
+ max_retries = 10
Review Comment:
Should we make this parameterizable? If a user creates many ssh connections
at the same time 10 might not be enough.
##########
airflow/providers/google/cloud/hooks/compute_ssh.py:
##########
@@ -225,40 +227,58 @@ def get_conn(self) -> paramiko.SSHClient:
hostname = self.hostname
privkey, pubkey = self._generate_ssh_key(self.user)
- if self.use_oslogin:
- user = self._authorize_os_login(pubkey)
- else:
- user = self.user
- self._authorize_compute_engine_instance_metadata(pubkey)
-
- proxy_command = None
- if self.use_iap_tunnel:
- proxy_command_args = [
- "gcloud",
- "compute",
- "start-iap-tunnel",
- str(self.instance_name),
- "22",
- "--listen-on-stdin",
- f"--project={self.project_id}",
- f"--zone={self.zone}",
- "--verbosity=warning",
- ]
- proxy_command = " ".join(shlex.quote(arg) for arg in
proxy_command_args)
-
- sshclient = self._connect_to_instance(user, hostname, privkey,
proxy_command)
+
+ max_retries = 10
+ max_delay = 10
+ sshclient = None
+ for retry in range(max_retries + 1):
+ try:
+ if self.use_oslogin:
+ user = self._authorize_os_login(pubkey)
+ else:
+ user = self.user
+ self._authorize_compute_engine_instance_metadata(pubkey)
+ proxy_command = None
+ if self.use_iap_tunnel:
+ proxy_command_args = [
+ "gcloud",
+ "compute",
+ "start-iap-tunnel",
+ str(self.instance_name),
+ "22",
+ "--listen-on-stdin",
+ f"--project={self.project_id}",
+ f"--zone={self.zone}",
+ "--verbosity=warning",
+ ]
+ proxy_command = " ".join(shlex.quote(arg) for arg in
proxy_command_args)
+ sshclient = self._connect_to_instance(user, hostname, privkey,
proxy_command)
+ break
+ except (HttpError, AirflowException, SSHException) as exc:
+ if (isinstance(exc, HttpError) and exc.resp.status == 412) or (
+ isinstance(exc, AirflowException) and "412 PRECONDITION
FAILED" in str(exc)
+ ):
+ self.log.info("Error occurred when trying to update
instance metadata: %s", exc)
+ elif isinstance(exc, SSHException):
+ self.log.info("Error occurred when establishing SSH
connection using Paramiko: %s", exc)
Review Comment:
We could have an `else` to just raise the exception in case the exception
was `HttpError` or `AirflowException` that doesn't match the identified errors
we want to retry.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]