Taragolis commented on code in PR #27970:
URL: https://github.com/apache/airflow/pull/27970#discussion_r1034285775
##########
tests/providers/amazon/aws/operators/test_eks.py:
##########
@@ -243,80 +233,74 @@ def
test_fargate_compute_missing_fargate_pod_execution_role_arn(self):
missing_fargate_pod_execution_role_arn.execute({})
-class TestEksCreateFargateProfileOperator(unittest.TestCase):
- def setUp(self) -> None:
- self.create_fargate_profile_params: CreateFargateProfileParams = dict(
# type: ignore
+class TestEksCreateFargateProfileOperator:
Review Comment:
replace TestCase.subTest by parametrize tests
##########
tests/providers/amazon/aws/operators/test_eks.py:
##########
@@ -92,35 +91,21 @@ class CreateNodegroupParams(TypedDict):
nodegroup_role_arn: str
-class TestEksCreateClusterOperator(unittest.TestCase):
- def setUp(self) -> None:
+class TestEksCreateClusterOperator:
Review Comment:
1. replace TestCase.subTest by parametrize tests
2. Rename method `nodegroup_setUp` to `nodegroup_setup`
3. Rename method `fargate_profile_setup ` to `fargate_profile_setup `
##########
tests/providers/amazon/aws/operators/test_s3_file_transform.py:
##########
@@ -34,37 +29,33 @@
from airflow.providers.amazon.aws.operators.s3 import S3FileTransformOperator
-class TestS3FileTransformOperator(unittest.TestCase):
- def setUp(self):
[email protected]
+def transform_script_loc(request, tmp_path_factory):
+ transform_script = tmp_path_factory.mktemp(request.node.name) /
"transform.py"
+ transform_script.touch()
+ yield str(transform_script)
Review Comment:
Create empty "transform" file by `tmp_path_factory` template instead of
manual creation temporary directory and clear up it
##########
tests/providers/amazon/aws/operators/test_s3_object.py:
##########
@@ -18,10 +18,10 @@
from __future__ import annotations
Review Comment:
This test I entirely get from https://github.com/apache/airflow/pull/27858
##########
tests/providers/amazon/aws/operators/test_sagemaker_training.py:
##########
@@ -57,11 +57,12 @@
}
-class TestSageMakerTrainingOperator(unittest.TestCase):
- def setUp(self):
+class TestSageMakerTrainingOperator:
+ def setup_method(self):
+ self.create_training_params = copy.deepcopy(CREATE_TRAINING_PARAMS)
self.sagemaker = SageMakerTrainingOperator(
task_id="test_sagemaker_operator",
- config=CREATE_TRAINING_PARAMS,
+ config=self.create_training_params,
wait_for_completion=False,
check_interval=5,
)
Review Comment:
After migrate to pytest some test failed due to mutability of test
parameters so this parameters recreate for each test case.
##########
tests/providers/amazon/aws/operators/test_sagemaker_transform.py:
##########
@@ -50,15 +50,16 @@
"ExecutionRoleArn": "arn:aws:iam:role/test-role",
}
-CONFIG: dict = {"Model": CREATE_MODEL_PARAMS, "Transform":
CREATE_TRANSFORM_PARAMS}
-
-class TestSageMakerTransformOperator(unittest.TestCase):
- def setUp(self):
+class TestSageMakerTransformOperator:
+ def setup_method(self):
+ self.create_transform_params = copy.deepcopy(CREATE_TRANSFORM_PARAMS)
+ self.create_model_params = copy.deepcopy(CREATE_MODEL_PARAMS)
+ self.config = {"Model": self.create_model_params, "Transform":
self.create_transform_params}
Review Comment:
After migrate to `pytest` some test failed due to mutability of test
parameters so this parameters recreate for each test case.
##########
tests/providers/amazon/aws/sensors/test_emr_containers.py:
##########
@@ -41,36 +40,18 @@ def setUp(self):
# avoids an Airflow warning about connection cannot be found.
self.sensor.hook.get_connection = lambda _: None
- @mock.patch.object(EmrContainerHook, "check_query_status",
side_effect=("PENDING",))
- def test_poke_pending(self, mock_check_query_status):
- assert not self.sensor.poke(None)
-
- @mock.patch.object(EmrContainerHook, "check_query_status",
side_effect=("SUBMITTED",))
- def test_poke_submitted(self, mock_check_query_status):
- assert not self.sensor.poke(None)
-
- @mock.patch.object(EmrContainerHook, "check_query_status",
side_effect=("RUNNING",))
- def test_poke_running(self, mock_check_query_status):
- assert not self.sensor.poke(None)
-
- @mock.patch.object(EmrContainerHook, "check_query_status",
side_effect=("COMPLETED",))
- def test_poke_completed(self, mock_check_query_status):
- assert self.sensor.poke(None)
-
- @mock.patch.object(EmrContainerHook, "check_query_status",
side_effect=("FAILED",))
- def test_poke_failed(self, mock_check_query_status):
- with pytest.raises(AirflowException) as ctx:
- self.sensor.poke(None)
- assert "EMR Containers sensor failed" in str(ctx.value)
-
- @mock.patch.object(EmrContainerHook, "check_query_status",
side_effect=("CANCELLED",))
- def test_poke_cancelled(self, mock_check_query_status):
- with pytest.raises(AirflowException) as ctx:
- self.sensor.poke(None)
- assert "EMR Containers sensor failed" in str(ctx.value)
-
- @mock.patch.object(EmrContainerHook, "check_query_status",
side_effect=("CANCEL_PENDING",))
- def test_poke_cancel_pending(self, mock_check_query_status):
- with pytest.raises(AirflowException) as ctx:
- self.sensor.poke(None)
- assert "EMR Containers sensor failed" in str(ctx.value)
+ @pytest.mark.parametrize("query_status", ["COMPLETED"])
+ def test_poke_true_on_query_status(self, query_status):
+ with mock.patch.object(EmrContainerHook, "check_query_status",
side_effect=[query_status]):
+ assert self.sensor.poke({})
+
+ @pytest.mark.parametrize("query_status", ["PENDING", "SUBMITTED",
"RUNNING"])
+ def test_poke_false_on_query_status(self, query_status):
+ with mock.patch.object(EmrContainerHook, "check_query_status",
side_effect=[query_status]):
+ assert not self.sensor.poke({})
+
+ @pytest.mark.parametrize("query_status", ["FAILED", "CANCELLED",
"CANCEL_PENDING"])
+ def test_poke_raise_on_query_status(self, query_status):
+ with mock.patch.object(EmrContainerHook, "check_query_status",
side_effect=[query_status]):
+ with pytest.raises(AirflowException, match=r"EMR Containers sensor
failed"):
+ assert not self.sensor.poke({})
Review Comment:
Merge same tests into parametrize tests
##########
tests/providers/amazon/aws/operators/test_eks.py:
##########
@@ -243,80 +233,74 @@ def
test_fargate_compute_missing_fargate_pod_execution_role_arn(self):
missing_fargate_pod_execution_role_arn.execute({})
-class TestEksCreateFargateProfileOperator(unittest.TestCase):
- def setUp(self) -> None:
- self.create_fargate_profile_params: CreateFargateProfileParams = dict(
# type: ignore
+class TestEksCreateFargateProfileOperator:
+ def setup_method(self) -> None:
+ self.create_fargate_profile_params = CreateFargateProfileParams( #
type: ignore
cluster_name=CLUSTER_NAME,
pod_execution_role_arn=POD_EXECUTION_ROLE_ARN[1],
selectors=SELECTORS[1],
fargate_profile_name=FARGATE_PROFILE_NAME,
)
- self.create_fargate_profile_operator_without_kwargs =
EksCreateFargateProfileOperator(
- task_id=TASK_ID, **self.create_fargate_profile_params
- )
-
- self.create_fargate_profile_operator_with_kwargs =
EksCreateFargateProfileOperator(
- task_id=TASK_ID,
- create_fargate_profile_kwargs=CREATE_FARGATE_PROFILE_KWARGS,
- **self.create_fargate_profile_params,
- )
-
+ @pytest.mark.parametrize(
+ "create_fargate_profile_kwargs",
+ [
+ pytest.param(None, id="without fargate profile kwargs"),
+ pytest.param(CREATE_FARGATE_PROFILE_KWARGS, id="with fargate
profile kwargs"),
+ ],
+ )
@mock.patch.object(EksHook, "create_fargate_profile")
- def test_execute_when_fargate_profile_does_not_already_exist(self,
mock_create_fargate_profile):
- operator_under_test = [
- (self.create_fargate_profile_operator_without_kwargs,
self.create_fargate_profile_params),
- (
- self.create_fargate_profile_operator_with_kwargs,
- {**self.create_fargate_profile_params,
**CREATE_FARGATE_PROFILE_KWARGS},
- ),
- ]
-
- for (operator, parameters) in operator_under_test:
- with self.subTest():
- operator.execute({})
-
-
mock_create_fargate_profile.assert_called_with(**convert_keys(parameters))
-
-
-class TestEksCreateNodegroupOperator(unittest.TestCase):
- def setUp(self) -> None:
- self.create_nodegroup_params: CreateNodegroupParams = dict( # type:
ignore
+ def test_execute_when_fargate_profile_does_not_already_exist(
+ self, mock_create_fargate_profile, create_fargate_profile_kwargs
+ ):
+ op_kwargs = {**self.create_fargate_profile_params}
+ if create_fargate_profile_kwargs:
+ op_kwargs["create_fargate_profile_kwargs"] =
create_fargate_profile_kwargs
+ parameters = {**self.create_fargate_profile_params,
**create_fargate_profile_kwargs}
+ else:
+ assert "create_fargate_profile_kwargs" not in op_kwargs
+ parameters = self.create_fargate_profile_params
+
+ operator = EksCreateFargateProfileOperator(task_id=TASK_ID,
**op_kwargs)
+ operator.execute({})
+
mock_create_fargate_profile.assert_called_with(**convert_keys(parameters))
+
+
+class TestEksCreateNodegroupOperator:
Review Comment:
replace TestCase.subTest by parametrize tests
##########
tests/providers/amazon/aws/sensors/test_athena.py:
##########
@@ -37,26 +36,18 @@ def setUp(self):
aws_conn_id="aws_default",
)
- @mock.patch.object(AthenaHook, "poll_query_status",
side_effect=("SUCCEEDED",))
- def test_poke_success(self, mock_poll_query_status):
- assert self.sensor.poke({})
-
- @mock.patch.object(AthenaHook, "poll_query_status",
side_effect=("RUNNING",))
- def test_poke_running(self, mock_poll_query_status):
- assert not self.sensor.poke({})
-
- @mock.patch.object(AthenaHook, "poll_query_status",
side_effect=("QUEUED",))
- def test_poke_queued(self, mock_poll_query_status):
- assert not self.sensor.poke({})
-
- @mock.patch.object(AthenaHook, "poll_query_status",
side_effect=("FAILED",))
- def test_poke_failed(self, mock_poll_query_status):
- with pytest.raises(AirflowException) as ctx:
- self.sensor.poke({})
- assert "Athena sensor failed" in str(ctx.value)
-
- @mock.patch.object(AthenaHook, "poll_query_status",
side_effect=("CANCELLED",))
- def test_poke_cancelled(self, mock_poll_query_status):
- with pytest.raises(AirflowException) as ctx:
- self.sensor.poke({})
- assert "Athena sensor failed" in str(ctx.value)
+ @pytest.mark.parametrize("poll_query_status", ["SUCCEEDED"])
+ def test_poke_true_on_status(self, poll_query_status):
+ with mock.patch.object(AthenaHook, "poll_query_status",
side_effect=[poll_query_status]):
+ assert self.sensor.poke({})
+
+ @pytest.mark.parametrize("poll_query_status", ["RUNNING", "QUEUED"])
+ def test_poke_false_on_status(self, poll_query_status):
+ with mock.patch.object(AthenaHook, "poll_query_status",
side_effect=[poll_query_status]):
+ assert not self.sensor.poke({})
+
+ @pytest.mark.parametrize("poll_query_status", ["FAILED", "CANCELLED"])
+ def test_poke_raise_on_status(self, poll_query_status):
+ with mock.patch.object(AthenaHook, "poll_query_status",
side_effect=[poll_query_status]):
+ with pytest.raises(AirflowException, match=r"Athena sensor
failed"):
+ self.sensor.poke({})
Review Comment:
Merge same tests into parametrize tests
##########
tests/providers/amazon/aws/operators/test_eks.py:
##########
@@ -327,15 +311,12 @@ def setUp(self) -> None:
@mock.patch.object(EksHook, "delete_cluster")
def test_existing_cluster_not_in_use(self, mock_delete_cluster,
mock_list_nodegroups):
mock_list_nodegroups.return_value = []
-
self.delete_cluster_operator.execute({})
-
- mock_list_nodegroups.assert_called_once
Review Comment:
This actually do nothing right now if convert to
`mock_list_nodegroups.assert_called_once()` than it failed.
Remove this for now
##########
tests/providers/amazon/aws/sensors/test_dms_task.py:
##########
@@ -26,46 +25,26 @@
from airflow.providers.amazon.aws.sensors.dms import DmsTaskCompletedSensor
-class TestDmsTaskCompletedSensor(unittest.TestCase):
- def setUp(self):
+class TestDmsTaskCompletedSensor:
+ def setup_method(self):
self.sensor = DmsTaskCompletedSensor(
task_id="test_dms_sensor",
aws_conn_id="aws_default",
replication_task_arn="task_arn",
)
- @mock.patch.object(DmsHook, "get_task_status", side_effect=("stopped",))
- def test_poke_stopped(self, mock_get_task_status):
- assert self.sensor.poke(None)
-
- @mock.patch.object(DmsHook, "get_task_status", side_effect=("running",))
- def test_poke_running(self, mock_get_task_status):
- assert not self.sensor.poke(None)
-
- @mock.patch.object(DmsHook, "get_task_status", side_effect=("starting",))
- def test_poke_starting(self, mock_get_task_status):
- assert not self.sensor.poke(None)
-
- @mock.patch.object(DmsHook, "get_task_status", side_effect=("ready",))
- def test_poke_ready(self, mock_get_task_status):
- with pytest.raises(AirflowException) as ctx:
- self.sensor.poke(None)
- assert "Unexpected status: ready" in str(ctx.value)
-
- @mock.patch.object(DmsHook, "get_task_status", side_effect=("creating",))
- def test_poke_creating(self, mock_get_task_status):
- with pytest.raises(AirflowException) as ctx:
- self.sensor.poke(None)
- assert "Unexpected status: creating" in str(ctx.value)
-
- @mock.patch.object(DmsHook, "get_task_status", side_effect=("failed",))
- def test_poke_failed(self, mock_get_task_status):
- with pytest.raises(AirflowException) as ctx:
- self.sensor.poke(None)
- assert "Unexpected status: failed" in str(ctx.value)
-
- @mock.patch.object(DmsHook, "get_task_status", side_effect=("deleting",))
- def test_poke_deleting(self, mock_get_task_status):
- with pytest.raises(AirflowException) as ctx:
- self.sensor.poke(None)
- assert "Unexpected status: deleting" in str(ctx.value)
+ @pytest.mark.parametrize("task_status", ["stopped"])
+ def test_poke_true_on_status(self, task_status):
+ with mock.patch.object(DmsHook, "get_task_status",
side_effect=[task_status]):
+ assert self.sensor.poke({})
+
+ @pytest.mark.parametrize("task_status", ["running", "starting"])
+ def test_poke_false_on_status(self, task_status):
+ with mock.patch.object(DmsHook, "get_task_status",
side_effect=[task_status]):
+ assert not self.sensor.poke({})
+
+ @pytest.mark.parametrize("task_status", ["ready", "creating", "failed",
"deleting"])
+ def test_poke_raise_unexpected_status_on_status(self, task_status):
+ with mock.patch.object(DmsHook, "get_task_status",
side_effect=[task_status]):
+ with pytest.raises(AirflowException, match=rf"Unexpected status:
{task_status}"):
+ self.sensor.poke({})
Review Comment:
Merge same tests into parametrize tests
##########
tests/providers/amazon/aws/operators/test_emr_add_steps.py:
##########
@@ -79,6 +78,22 @@ def test_init(self):
assert self.operator.job_flow_id == "j-8989898989"
assert self.operator.aws_conn_id == "aws_default"
+ @pytest.mark.parametrize(
+ "job_flow_id, job_flow_name",
+ [
+ pytest.param("j-8989898989", "test_cluster", id="both-specified"),
+ pytest.param(None, None, id="both-none"),
+ ],
+ )
+ def test_validate_mutually_exclusive_args(self, job_flow_id,
job_flow_name):
+ error_message = r"Exactly one of job_flow_id or job_flow_name must be
specified\."
+ with pytest.raises(AirflowException, match=error_message):
+ EmrAddStepsOperator(
+ task_id="test_validate_mutually_exclusive_args",
+ job_flow_id=job_flow_id,
+ job_flow_name=job_flow_name,
+ )
+
Review Comment:
Add this test from https://github.com/apache/airflow/pull/27858
##########
tests/providers/amazon/aws/operators/test_sagemaker_processing.py:
##########
@@ -185,7 +184,7 @@ def test_execute_with_failure(self, mock_processing,
mock_client):
with pytest.raises(AirflowException):
sagemaker.execute(None)
- @unittest.skip("Currently, the auto-increment jobname functionality is not
missing.")
+ @pytest.mark.skip(reason="Currently, the auto-increment jobname
functionality is not missing.")
Review Comment:
Need to fix/remove this test instead of just skip
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]