swapz-z commented on code in PR #28282:
URL: https://github.com/apache/airflow/pull/28282#discussion_r1050772669


##########
tests/providers/amazon/aws/hooks/test_emr.py:
##########
@@ -190,3 +190,100 @@ def test_get_cluster_id_by_name(self):
         no_match = hook.get_cluster_id_by_name("foo", ["RUNNING", "WAITING", 
"BOOTSTRAPPING"])
 
         assert no_match is None
+
+    @mock_emr
+    def test_send_cancel_steps_first_invocation(self):
+        """
+        Test that we can resolve cluster id by cluster name.
+        """
+        hook = EmrHook(aws_conn_id="aws_default", emr_conn_id="emr_default")
+
+        job_flow = hook.create_job_flow(
+            {"Name": "test_cluster", "Instances": 
{"KeepJobFlowAliveWhenNoSteps": True}}
+        )
+
+        job_flow_id = job_flow["JobFlowId"]
+
+        step = [
+            {
+                "ActionOnFailure": "test_step",
+                "HadoopJarStep": {
+                    "Args": ["test args"],
+                    "Jar": "test.jar",
+                },
+                "Name": "step_1",
+            }
+        ]
+
+        did_not_execute_response = hook.send_cancel_steps(
+            steps_states=["PENDING", "RUNNING"],
+            emr_cluster_id=job_flow_id,
+            cancellation_option="SEND_INTERRUPT",
+            steps=step,
+        )
+
+        assert did_not_execute_response is None
+
+    @mock_emr
+    @pytest.mark.parametrize("num_steps", [1, 2, 3, 4])
+    def test_send_cancel_steps_on_pre_existing_step_name(self, num_steps):
+        """
+        Test that we can resolve cluster id by cluster name.
+        """
+        hook = EmrHook(aws_conn_id="aws_default", emr_conn_id="emr_default")
+
+        job_flow = hook.create_job_flow(
+            {"Name": "test_cluster", "Instances": 
{"KeepJobFlowAliveWhenNoSteps": True}}
+        )
+
+        job_flow_id = job_flow["JobFlowId"]
+
+        steps = [
+            {
+                "ActionOnFailure": "test_step",
+                "HadoopJarStep": {
+                    "Args": ["test args"],
+                    "Jar": "test.jar",
+                },
+                "Name": f"step_{i}",
+            }
+            for i in range(num_steps)
+        ]
+
+        retry_step = [
+            {
+                "ActionOnFailure": "test_step",
+                "HadoopJarStep": {
+                    "Args": ["test args"],
+                    "Jar": "test.jar",
+                },
+                "Name": "retry_step_1",
+            }
+        ]
+
+        triggered = hook.add_job_flow_steps(job_flow_id=job_flow_id, 
steps=steps)
+
+        triggered_steps = 
hook._get_list_of_steps_already_triggered(job_flow_id, ["PENDING", "RUNNING"])
+        assert len(triggered_steps) == num_steps == len(triggered)
+
+        cancel_steps = hook._cancel_list_of_steps_already_triggered(
+            steps + retry_step, job_flow_id, ["PENDING", "RUNNING"]
+        )
+
+        assert len(cancel_steps) == num_steps
+
+        with pytest.raises(NotImplementedError):

Review Comment:
   Actually, moto library which mocks aws api's, currently does not have an 
implementation of `cancel_steps` needed for verifying. 
   However, in 
[this](https://github.com/apache/airflow/pull/28282/files#diff-12403a99d050969934d307ac584b2a414382e29c3a49d535be55bdc59e411213R262)
 test case I have basically checked it with methods in hook if they are 
generating expected values before getting sent to the boto3 cancel_steps method.
   
   I am not much aware about moto, but curious if there is a way we can achieve 
it in a different way, let me know ?
   @Taragolis @o-nikolas 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to