phanikumv commented on code in PR #32029:
URL: https://github.com/apache/airflow/pull/32029#discussion_r1241795063
##########
tests/providers/amazon/aws/operators/test_emr_containers.py:
##########
@@ -144,6 +145,19 @@ def test_execute_with_polling_timeout(self,
mock_check_query_status):
assert "Final state of EMR Containers job is SUBMITTED" in
str(ctx.value)
assert "Max tries of poll status exceeded" in str(ctx.value)
+ @mock.patch.object(EmrContainerHook, "submit_job")
+ @mock.patch.object(
+ EmrContainerHook, "check_query_status",
return_value=EmrContainerHook.INTERMEDIATE_STATES[0]
+ )
+ def test_operator_defer(self, mock_submit_job, mock_check_query_status):
+ self.emr_container.deferrable = True
Review Comment:
docstrings are missing
##########
tests/providers/amazon/aws/sensors/test_emr_job_flow.py:
##########
@@ -276,3 +277,20 @@ def test_different_target_states(self):
# make sure it was called with the job_flow_id
calls = [mock.call(ClusterId="j-8989898989")]
self.mock_emr_client.describe_cluster.assert_has_calls(calls)
+
+
@mock.patch("airflow.providers.amazon.aws.sensors.emr.EmrJobFlowSensor.poke")
+ def test_sensor_defer(self, mock_poke):
+ sensor = EmrJobFlowSensor(
Review Comment:
docstrings are missing
##########
tests/providers/amazon/aws/sensors/test_emr_step.py:
##########
@@ -230,3 +231,19 @@ def test_step_interrupted(self, *_):
mock_isinstance.return_value = True
with pytest.raises(AirflowException):
self.sensor.execute(None)
+
+ @mock.patch("airflow.providers.amazon.aws.sensors.emr.EmrStepSensor.poke")
+ def test_sensor_defer(self, mock_poke):
+ sensor = EmrStepSensor(
Review Comment:
docstrings are missing
##########
tests/providers/amazon/aws/triggers/test_emr.py:
##########
@@ -460,3 +463,105 @@ async def test_emr_trigger_run_attempts_failed(self,
mock_async_conn, mock_get_w
assert mock_get_waiter().wait.call_count == 3
assert response == TriggerEvent({"status": "failure", "message": f"Job
Failed: {error_failed}"})
+
+
+class TestEmrStepSensorTrigger:
+ def test_emr_step_trigger_serialize(self):
+ emr_trigger = EmrStepSensorTrigger(
Review Comment:
docstrings are missing
##########
tests/providers/amazon/aws/triggers/test_emr.py:
##########
@@ -460,3 +463,105 @@ async def test_emr_trigger_run_attempts_failed(self,
mock_async_conn, mock_get_w
assert mock_get_waiter().wait.call_count == 3
assert response == TriggerEvent({"status": "failure", "message": f"Job
Failed: {error_failed}"})
+
+
+class TestEmrStepSensorTrigger:
+ def test_emr_step_trigger_serialize(self):
+ emr_trigger = EmrStepSensorTrigger(
+ job_flow_id=TEST_JOB_FLOW_ID,
+ step_id=STEP_ID,
+ aws_conn_id=AWS_CONN_ID,
+ poke_interval=POLL_INTERVAL,
+ )
+ class_path, args = emr_trigger.serialize()
+ assert class_path ==
"airflow.providers.amazon.aws.triggers.emr.EmrStepSensorTrigger"
+ assert args["job_flow_id"] == TEST_JOB_FLOW_ID
+ assert args["step_id"] == STEP_ID
+ assert args["aws_conn_id"] == AWS_CONN_ID
+ assert args["poke_interval"] == POLL_INTERVAL
+
+ @pytest.mark.asyncio
+ @mock.patch("airflow.providers.amazon.aws.hooks.emr.EmrHook.get_waiter")
+ @mock.patch("airflow.providers.amazon.aws.hooks.emr.EmrHook.async_conn")
+ async def test_emr_step_trigger_run(self, mock_async_conn,
mock_get_waiter):
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+
+ mock_get_waiter().wait = AsyncMock()
+
+ emr_trigger = EmrStepSensorTrigger(
+ job_flow_id=TEST_JOB_FLOW_ID,
+ step_id=STEP_ID,
+ aws_conn_id=AWS_CONN_ID,
+ poll_interval=POLL_INTERVAL,
+ )
+
+ generator = emr_trigger.run()
+ response = await generator.asend(None)
+
+ assert response == TriggerEvent({"status": "success"})
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep")
+ @mock.patch("airflow.providers.amazon.aws.hooks.emr.EmrHook.get_waiter")
+ @mock.patch("airflow.providers.amazon.aws.hooks.emr.EmrHook.async_conn")
+ async def test_emr_trigger_run_multiple_attempts(self, mock_async_conn,
mock_get_waiter, mock_sleep):
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+
+ error = WaiterError(
+ name="test_name",
+ reason="test_reason",
+ last_response={"Step": {"Status": {"State": "RUNNING"}}},
+ )
+ mock_get_waiter().wait.side_effect = AsyncMock(side_effect=[error,
error, True])
+ mock_sleep.return_value = True
+
+ emr_trigger = EmrStepSensorTrigger(
+ job_flow_id=TEST_JOB_FLOW_ID,
+ step_id=STEP_ID,
+ aws_conn_id=AWS_CONN_ID,
+ poll_interval=POLL_INTERVAL,
+ )
+
+ generator = emr_trigger.run()
+ response = await generator.asend(None)
+
+ assert mock_get_waiter().wait.call_count == 3
+ assert response == TriggerEvent({"status": "success"})
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep")
+ @mock.patch("airflow.providers.amazon.aws.hooks.emr.EmrHook.get_waiter")
+ @mock.patch("airflow.providers.amazon.aws.hooks.emr.EmrHook.async_conn")
+ async def test_emr_trigger_run_attempts_failed(self, mock_async_conn,
mock_get_waiter, mock_sleep):
+ a_mock = mock.MagicMock()
Review Comment:
docstrings are missing
##########
tests/providers/amazon/aws/triggers/test_emr.py:
##########
@@ -460,3 +463,105 @@ async def test_emr_trigger_run_attempts_failed(self,
mock_async_conn, mock_get_w
assert mock_get_waiter().wait.call_count == 3
assert response == TriggerEvent({"status": "failure", "message": f"Job
Failed: {error_failed}"})
+
+
+class TestEmrStepSensorTrigger:
+ def test_emr_step_trigger_serialize(self):
+ emr_trigger = EmrStepSensorTrigger(
+ job_flow_id=TEST_JOB_FLOW_ID,
+ step_id=STEP_ID,
+ aws_conn_id=AWS_CONN_ID,
+ poke_interval=POLL_INTERVAL,
+ )
+ class_path, args = emr_trigger.serialize()
+ assert class_path ==
"airflow.providers.amazon.aws.triggers.emr.EmrStepSensorTrigger"
+ assert args["job_flow_id"] == TEST_JOB_FLOW_ID
+ assert args["step_id"] == STEP_ID
+ assert args["aws_conn_id"] == AWS_CONN_ID
+ assert args["poke_interval"] == POLL_INTERVAL
+
+ @pytest.mark.asyncio
+ @mock.patch("airflow.providers.amazon.aws.hooks.emr.EmrHook.get_waiter")
+ @mock.patch("airflow.providers.amazon.aws.hooks.emr.EmrHook.async_conn")
+ async def test_emr_step_trigger_run(self, mock_async_conn,
mock_get_waiter):
+ a_mock = mock.MagicMock()
+ mock_async_conn.__aenter__.return_value = a_mock
+
+ mock_get_waiter().wait = AsyncMock()
+
+ emr_trigger = EmrStepSensorTrigger(
+ job_flow_id=TEST_JOB_FLOW_ID,
+ step_id=STEP_ID,
+ aws_conn_id=AWS_CONN_ID,
+ poll_interval=POLL_INTERVAL,
+ )
+
+ generator = emr_trigger.run()
+ response = await generator.asend(None)
+
+ assert response == TriggerEvent({"status": "success"})
+
+ @pytest.mark.asyncio
+ @mock.patch("asyncio.sleep")
+ @mock.patch("airflow.providers.amazon.aws.hooks.emr.EmrHook.get_waiter")
+ @mock.patch("airflow.providers.amazon.aws.hooks.emr.EmrHook.async_conn")
+ async def test_emr_trigger_run_multiple_attempts(self, mock_async_conn,
mock_get_waiter, mock_sleep):
+ a_mock = mock.MagicMock()
Review Comment:
docstrings are missing
##########
tests/providers/amazon/aws/triggers/test_emr.py:
##########
@@ -460,3 +463,105 @@ async def test_emr_trigger_run_attempts_failed(self,
mock_async_conn, mock_get_w
assert mock_get_waiter().wait.call_count == 3
assert response == TriggerEvent({"status": "failure", "message": f"Job
Failed: {error_failed}"})
+
+
+class TestEmrStepSensorTrigger:
+ def test_emr_step_trigger_serialize(self):
+ emr_trigger = EmrStepSensorTrigger(
+ job_flow_id=TEST_JOB_FLOW_ID,
+ step_id=STEP_ID,
+ aws_conn_id=AWS_CONN_ID,
+ poke_interval=POLL_INTERVAL,
+ )
+ class_path, args = emr_trigger.serialize()
+ assert class_path ==
"airflow.providers.amazon.aws.triggers.emr.EmrStepSensorTrigger"
+ assert args["job_flow_id"] == TEST_JOB_FLOW_ID
+ assert args["step_id"] == STEP_ID
+ assert args["aws_conn_id"] == AWS_CONN_ID
+ assert args["poke_interval"] == POLL_INTERVAL
+
+ @pytest.mark.asyncio
+ @mock.patch("airflow.providers.amazon.aws.hooks.emr.EmrHook.get_waiter")
+ @mock.patch("airflow.providers.amazon.aws.hooks.emr.EmrHook.async_conn")
+ async def test_emr_step_trigger_run(self, mock_async_conn,
mock_get_waiter):
+ a_mock = mock.MagicMock()
Review Comment:
docstrings are missing
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]