nailo2c commented on code in PR #65991:
URL: https://github.com/apache/airflow/pull/65991#discussion_r3300019395


##########
providers/apache/spark/tests/unit/apache/spark/hooks/test_spark_submit.py:
##########
@@ -1336,3 +1371,316 @@ def 
test_post_submit_commands_none_gives_empty_list(self):
         """Test that None post_submit_commands results in an empty list."""
         hook = SparkSubmitHook(conn_id="")
         assert hook._post_submit_commands == []
+
+    # ---------------------------------------------------------------
+    # yarn_track_via_rm_api (issue #24171)
+    # ---------------------------------------------------------------
+    # Tests for the YARN ResourceManager REST API polling path that lets
+    # SparkSubmitHook release the spark-submit JVM after YARN accepts the
+    # application, instead of holding the JVM open just to read stdout.
+
+    _YARN_LOG_LINES = [
+        "INFO Client: Requesting a new application from cluster with 1 
NodeManagers",
+        "INFO Client: Uploading resource file:/tmp/lib.zip -> "
+        
"hdfs://namenode:8020/user/root/.sparkStaging/application_1700000000000_0001/lib.zip",
+        "INFO Client: Submitting application application_1700000000000_0001 to 
ResourceManager",
+        "INFO YarnClientImpl: Submitted application 
application_1700000000000_0001",
+        "INFO Client: Application report for application_1700000000000_0001 
(state: ACCEPTED)",
+        "INFO Client: Application report for application_1700000000000_0001 
(state: RUNNING)",
+        "INFO Client: Application report for application_1700000000000_0001 
(state: FINISHED)",
+        "INFO Client: final status: SUCCEEDED",
+    ]
+
+    _RM_BASE_URL = "http://rm.test:8088";
+    _RM_APP_ID = "application_1700000000000_0001"
+
+    @classmethod
+    def _rm_status_url(cls, app_id: str | None = None) -> str:
+        return f"{cls._RM_BASE_URL}/ws/v1/cluster/apps/{app_id or 
cls._RM_APP_ID}"
+
+    @classmethod
+    def _rm_kill_url(cls, app_id: str | None = None) -> str:
+        return f"{cls._RM_BASE_URL}/ws/v1/cluster/apps/{app_id or 
cls._RM_APP_ID}/state"
+
+    @classmethod
+    def _rm_status_resp(cls, final_status: str, state: str = "FINISHED") -> 
MagicMock:
+        resp = MagicMock(spec=requests.Response)
+        resp.status_code = 200
+        resp.json.return_value = {"app": {"id": cls._RM_APP_ID, "state": 
state, "finalStatus": final_status}}
+        return resp
+
+    @staticmethod
+    def _rm_failure_resp(status_code: int = 500, text: str = "Internal Server 
Error") -> MagicMock:
+        resp = MagicMock(spec=requests.Response)
+        resp.status_code = status_code
+        resp.text = text
+        return resp
+
+    @patch("airflow.providers.apache.spark.hooks.spark_submit.requests.put")
+    @patch("airflow.providers.apache.spark.hooks.spark_submit.requests.get")
+    
@patch("airflow.providers.apache.spark.hooks.spark_submit.subprocess.Popen")
+    def test_default_keeps_existing_behavior_in_yarn_cluster(self, mock_popen, 
mock_get, mock_put):
+        """Flag default False -> no HTTP calls; behavior identical to today."""
+        proc = MagicMock(spec=["stdout", "terminate", "wait"])
+        proc.stdout = iter(self._YARN_LOG_LINES)
+        proc.wait.return_value = 0
+        mock_popen.return_value = proc
+
+        hook = SparkSubmitHook(conn_id="spark_yarn_cluster")
+        hook.submit()
+
+        proc.terminate.assert_not_called()
+        mock_get.assert_not_called()
+        mock_put.assert_not_called()
+        assert hook._yarn_application_id == "application_1700000000000_0001"
+
+    @patch("airflow.providers.apache.spark.hooks.spark_submit.time.sleep")
+    @patch("airflow.providers.apache.spark.hooks.spark_submit.requests.get")
+    
@patch("airflow.providers.apache.spark.hooks.spark_submit.subprocess.Popen")
+    def test_yarn_status_tracking_succeeds(self, mock_popen, mock_get, 
mock_sleep):
+        """RM returns UNDEFINED then SUCCEEDED -> hook returns normally."""
+        proc = MagicMock()

Review Comment:
   Ah, my bad. I forgot to add `spec`. I'll be more mindful next time. Thanks 
for the reminder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to