amoghrajesh commented on code in PR #67118:
URL: https://github.com/apache/airflow/pull/67118#discussion_r3279254799


##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -198,8 +221,63 @@ def execute(self, context: Context) -> None:
             self.conf = 
inject_transport_information_into_spark_properties(self.conf, context)
         if self._hook is None:
             self._hook = self._get_hook()
+        if self._hook._should_track_driver_status:
+            return self.execute_resumable(context)
         self._hook.submit(self.application)
 
+    def submit_job(self, context: Context) -> str:
+        driver_id = self._hook.submit(self.application)
+        if not driver_id:
+            raise RuntimeError("spark-submit did not return a driver ID")
+        self.log.info("Spark driver submitted: %s", driver_id)
+        return driver_id
+
+    def get_job_status(self, external_id: str) -> str:
+        if self._hook._is_yarn:
+            # TODO: call YARN ResourceManager REST API
+            # GET http://rm:8088/ws/v1/cluster/apps/{external_id}
+            raise NotImplementedError("YARN job status not yet implemented")
+        if self._hook._is_kubernetes:
+            # TODO: call K8s pod status API
+            raise NotImplementedError("K8s job status not yet implemented")
+        host = self._hook._connection["master"].replace("spark://", 
"").split(":")[0]
+        response = 
requests.get(f"http://{host}:6066/v1/submissions/status/{external_id}";, 
timeout=30)

Review Comment:
   Handled it, so I did this:
   
   - Handled the retries using `@retry(stop=stop_after_attempt(3), 
wait=wait_fixed(1), reraise=True)`
   - `scheme = self._hook._connection.get("rest_scheme", "http")` I am 
extracting this from connection_extras by exposing it as `rest_scheme`
   - Extracting that from the master URL since it has to have a port.



##########
providers/apache/spark/src/airflow/providers/apache/spark/operators/spark_submit.py:
##########
@@ -198,8 +221,63 @@ def execute(self, context: Context) -> None:
             self.conf = 
inject_transport_information_into_spark_properties(self.conf, context)
         if self._hook is None:
             self._hook = self._get_hook()
+        if self._hook._should_track_driver_status:
+            return self.execute_resumable(context)
         self._hook.submit(self.application)
 
+    def submit_job(self, context: Context) -> str:
+        driver_id = self._hook.submit(self.application)
+        if not driver_id:
+            raise RuntimeError("spark-submit did not return a driver ID")
+        self.log.info("Spark driver submitted: %s", driver_id)
+        return driver_id
+
+    def get_job_status(self, external_id: str) -> str:
+        if self._hook._is_yarn:
+            # TODO: call YARN ResourceManager REST API
+            # GET http://rm:8088/ws/v1/cluster/apps/{external_id}
+            raise NotImplementedError("YARN job status not yet implemented")
+        if self._hook._is_kubernetes:
+            # TODO: call K8s pod status API
+            raise NotImplementedError("K8s job status not yet implemented")
+        host = self._hook._connection["master"].replace("spark://", 
"").split(":")[0]
+        response = 
requests.get(f"http://{host}:6066/v1/submissions/status/{external_id}";, 
timeout=30)

Review Comment:
   Handled in 
https://github.com/apache/airflow/commit/aca5837137df585b675be36ad7e7b9a2cdaf1375



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to