seyoon-lim commented on code in PR #40757:
URL: https://github.com/apache/airflow/pull/40757#discussion_r1676984823


##########
airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -268,6 +291,12 @@ def _resolve_connection(self) -> dict[str, Any]:
                 )
             conn_data["spark_binary"] = self.spark_binary
             conn_data["namespace"] = extra.get("namespace")
+            conn_data["principal"] = self._principal or extra.get("principal")
+            base64_keytab = extra.get("keytab")
+            if self._keytab is not None:
+                conn_data["keytab"] = self._keytab
+            elif base64_keytab is not None:
+                conn_data["keytab"] = 
self._get_keytab_from_base64(base64_keytab, conn_data["principal"])

Review Comment:
   Values passed as parameters to the constructor have higher priority.



##########
airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -281,10 +310,53 @@ def _resolve_connection(self) -> dict[str, Any]:
     def get_conn(self) -> Any:
         pass
 
+    def _get_keytab_from_base64(self, base64_keytab: str, principal: str | 
None) -> str:
+        _uuid = uuid.uuid4()
+        temp_dir_path = Path(tempfile.gettempdir()).resolve()
+        temp_file_name = f"airflow_keytab-{principal or _uuid}"
+
+        keytab_path = temp_dir_path / temp_file_name
+        staging_path = temp_dir_path / f".{temp_file_name}.{_uuid}"
+
+        keytab = base64.b64decode(base64_keytab)
+
+        # validate exists keytab file
+        if keytab_path.exists():
+            with open(keytab_path, "rb") as f:
+                existing_keytab = f.read()
+            if existing_keytab == keytab:

Review Comment:
   I was considering comparing the checksums for this part, but I'd appreciate 
your opinion on it



##########
airflow/providers/apache/spark/hooks/spark_submit.py:
##########
@@ -281,10 +310,53 @@ def _resolve_connection(self) -> dict[str, Any]:
     def get_conn(self) -> Any:
         pass
 
+    def _get_keytab_from_base64(self, base64_keytab: str, principal: str | 
None) -> str:
+        _uuid = uuid.uuid4()
+        temp_dir_path = Path(tempfile.gettempdir()).resolve()
+        temp_file_name = f"airflow_keytab-{principal or _uuid}"
+
+        keytab_path = temp_dir_path / temp_file_name
+        staging_path = temp_dir_path / f".{temp_file_name}.{_uuid}"
+
+        keytab = base64.b64decode(base64_keytab)
+
+        # validate exists keytab file
+        if keytab_path.exists():
+            with open(keytab_path, "rb") as f:
+                existing_keytab = f.read()
+            if existing_keytab == keytab:
+                self.log.info("Keytab file already exists and is the same as 
the provided keytab")
+                return str(keytab_path)
+
+        # write new keytab file
+        try:
+            with open(staging_path, "wb") as f:
+                self.log.info("Saving keytab to %s", staging_path)
+                f.write(keytab)
+
+            self.log.info("Moving keytab from %s to %s", staging_path, 
keytab_path)
+            shutil.move(str(staging_path), str(keytab_path))
+            return str(keytab_path)
+        except Exception as err:
+            self.log.error("Failed to save keytab: %s", err)
+            raise AirflowException("Failed to save keytab") from err
+        finally:
+            if staging_path.exists():
+                self.log.info("Removing staging keytab file: %s", staging_path)
+                staging_path.unlink()
+
     def _get_spark_binary_path(self) -> list[str]:
         # Assume that spark-submit is present in the path to the executing user
         return [self._connection["spark_binary"]]
 
+    def _get_principal(self) -> str | None:
+        # for the case where the principal is not set in the broken connection
+        return self._connection["principal"] or self._principal
+
+    def _get_keytab(self) -> str | None:
+        # for the case where the keytab is not set in the broken connection
+        return self._connection["keytab"] or self._keytab

Review Comment:
   if an exception occurs in the `_resolve_connection` method, the values 
passed as parameters will be used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to