o-nikolas commented on code in PR #39592: URL: https://github.com/apache/airflow/pull/39592#discussion_r1599187322
########## airflow/providers/amazon/aws/operators/comprehend.py: ########## @@ -0,0 +1,195 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from functools import cached_property +from typing import TYPE_CHECKING, Any, Sequence + +from airflow.configuration import conf +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.hooks.comprehend import ComprehendHook +from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator +from airflow.providers.amazon.aws.triggers.comprehend import ComprehendPiiEntitiesDetectionJobCompletedTrigger +from airflow.providers.amazon.aws.utils import validate_execute_complete_event +from airflow.providers.amazon.aws.utils.mixins import aws_template_fields +from airflow.utils.timezone import utcnow + +if TYPE_CHECKING: + import boto3 + + from airflow.utils.context import Context + + +class ComprehendBaseOperator(AwsBaseOperator[ComprehendHook]): + """ + This is the base operator for Comprehend Service operators (not supposed to be used directly in DAGs). + + :param input_data_config: The input properties for a PII entities detection job. (templated) + :param output_data_config: Provides `configuration` parameters for the output of PII entity detection + jobs. (templated) + :param mode: Specifies whether the output provides the locations (offsets) of PII entities or a file in + which PII entities are redacted. If you set the mode parameter to ONLY_REDACTION. In that case you + must provide a RedactionConfig. + :param data_access_role_arn: The Amazon Resource Name (ARN) of the IAM role that grants Amazon Comprehend + read access to your input data. (templated) + :param language_code: The language of the input documents. (templated) + """ + + aws_hook_class = ComprehendHook + + template_fields: Sequence[str] = aws_template_fields( + "input_data_config", "output_data_config", "data_access_role_arn", "language_code" + ) + + template_fields_renderers: dict = {"input_data_config": "json", "output_data_config": "json"} + + def __init__( + self, + input_data_config: dict, + output_data_config: dict, + data_access_role_arn: str, + language_code: str, + **kwargs, + ): + super().__init__(**kwargs) + self.input_data_config = input_data_config + self.output_data_config = output_data_config + self.data_access_role_arn = data_access_role_arn + self.language_code = language_code + + @cached_property + def client(self) -> boto3.client: + """Create and return the Comprehend client.""" + return self.hook.conn + + def execute(self, context: Context): + """Must overwrite in child classes.""" + raise NotImplementedError("Please implement execute() in subclass") + + +class ComprehendStartPiiEntitiesDetectionJobOperator(ComprehendBaseOperator): + """ + Create a comprehend pii entities detection job for a collection of documents. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:ComprehendStartPiiEntitiesDetectionJobOperator` + + :param input_data_config: The input properties for a PII entities detection job. (templated) + :param output_data_config: Provides `configuration` parameters for the output of PII entity detection + jobs. (templated) + :param mode: Specifies whether the output provides the locations (offsets) of PII entities or a file in + which PII entities are redacted. If you set the mode parameter to ONLY_REDACTION. In that case you + must provide a RedactionConfig. Review Comment: Provided in kwargs? ########## airflow/providers/amazon/aws/operators/comprehend.py: ########## @@ -0,0 +1,195 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from functools import cached_property +from typing import TYPE_CHECKING, Any, Sequence + +from airflow.configuration import conf +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.hooks.comprehend import ComprehendHook +from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator +from airflow.providers.amazon.aws.triggers.comprehend import ComprehendPiiEntitiesDetectionJobCompletedTrigger +from airflow.providers.amazon.aws.utils import validate_execute_complete_event +from airflow.providers.amazon.aws.utils.mixins import aws_template_fields +from airflow.utils.timezone import utcnow + +if TYPE_CHECKING: + import boto3 + + from airflow.utils.context import Context + + +class ComprehendBaseOperator(AwsBaseOperator[ComprehendHook]): + """ + This is the base operator for Comprehend Service operators (not supposed to be used directly in DAGs). + + :param input_data_config: The input properties for a PII entities detection job. (templated) + :param output_data_config: Provides `configuration` parameters for the output of PII entity detection + jobs. (templated) + :param mode: Specifies whether the output provides the locations (offsets) of PII entities or a file in Review Comment: `__init__` doesn't appear to take this param? Also you say that a `RedactionConfig` might be needed, how does the user pass that in? Do you expect these only in kwargs? ########## tests/system/providers/amazon/aws/example_comprehend.py: ########## @@ -0,0 +1,137 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import json +from datetime import datetime + +from airflow import DAG +from airflow.decorators import task_group +from airflow.models.baseoperator import chain +from airflow.providers.amazon.aws.operators.comprehend import ComprehendStartPiiEntitiesDetectionJobOperator +from airflow.providers.amazon.aws.operators.s3 import ( + S3CreateBucketOperator, + S3CreateObjectOperator, + S3DeleteBucketOperator, +) +from airflow.providers.amazon.aws.sensors.comprehend import ( + ComprehendStartPiiEntitiesDetectionJobCompletedSensor, +) +from airflow.utils.trigger_rule import TriggerRule +from tests.system.providers.amazon.aws.utils import SystemTestContextBuilder + +ROLE_ARN_KEY = "ROLE_ARN" +sys_test_context_task = SystemTestContextBuilder().add_variable(ROLE_ARN_KEY).build() + +DAG_ID = "example_comprehend" +INPUT_S3_KEY_START_PII_ENTITIES_DETECTION_JOB = "start-pii-entities-detection-job/sample_data.txt" + +SAMPLE_DATA = { + "username": "bob1234", + "name": "Bob", + "sex": "M", + "address": "1773 Raymond Ville Suite 682", + "mail": "[email protected]", +} + + +@task_group +def pii_entities_detection_job_workflow(): + # [START howto_operator_start_pii_entities_detection_job] + start_pii_entities_detection_job = ComprehendStartPiiEntitiesDetectionJobOperator( + task_id="start_pii_entities_detection_job", + input_data_config=input_data_configurations, + output_data_config=output_data_configurations, + mode="ONLY_REDACTION", + data_access_role_arn=test_context[ROLE_ARN_KEY], + language_code="en", + start_pii_entities_kwargs=pii_entities_kwargs, + ) + # [END howto_operator_start_pii_entities_detection_job] + start_pii_entities_detection_job.wait_for_completion = False + + # [START howto_sensor_start_pii_entities_detection_job] + await_start_pii_entities_detection_job = ComprehendStartPiiEntitiesDetectionJobCompletedSensor( + task_id="await_start_pii_entities_detection_job", job_id=start_pii_entities_detection_job.output + ) + # [END howto_sensor_start_pii_entities_detection_job] + + chain(start_pii_entities_detection_job, await_start_pii_entities_detection_job) + + +with DAG( Review Comment: Have you run this DAG against a real AWS account, as is, and ensured it's passing? ########## tests/providers/amazon/aws/operators/test_comprehend.py: ########## @@ -0,0 +1,163 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING, Generator +from unittest import mock + +import pytest +from moto import mock_aws + +from airflow.providers.amazon.aws.hooks.comprehend import ComprehendHook +from airflow.providers.amazon.aws.operators.comprehend import ( + ComprehendBaseOperator, + ComprehendStartPiiEntitiesDetectionJobOperator, +) +from airflow.utils.types import NOTSET + +if TYPE_CHECKING: + from airflow.providers.amazon.aws.hooks.base_aws import BaseAwsConnection + +INPUT_DATA_CONFIG = { + "S3Uri": "s3://input-data-comprehend/sample_data.txt", + "InputFormat": "ONE_DOC_PER_LINE", +} +OUTPUT_DATA_CONFIG = {"S3Uri": "s3://output-data-comprehend/redacted_output/"} +LANGUAGE_CODE = "en" +ROLE_ARN = "role_arn" + + +class TestComprehendBaseOperator: + @pytest.mark.parametrize("aws_conn_id", [None, NOTSET, "aws_test_conn"]) + @pytest.mark.parametrize("region_name", [None, NOTSET, "ca-central-1"]) + def test_initialize_comprehend_base_operator(self, aws_conn_id, region_name): + op_kw = {"aws_conn_id": aws_conn_id, "region_name": region_name} + op_kw = {k: v for k, v in op_kw.items() if v is not NOTSET} + + comprehend_base_op = ComprehendBaseOperator( + task_id="comprehend_base_operator", + input_data_config=INPUT_DATA_CONFIG, + output_data_config=OUTPUT_DATA_CONFIG, + language_code=LANGUAGE_CODE, + data_access_role_arn=ROLE_ARN, + **op_kw, + ) + + assert comprehend_base_op.aws_conn_id == (aws_conn_id if aws_conn_id is not NOTSET else "aws_default") + assert comprehend_base_op.region_name == (region_name if region_name is not NOTSET else None) + + @mock.patch.object(ComprehendBaseOperator, "hook", new_callable=mock.PropertyMock) + def test_initialize_comprehend_base_operator_hook(self, comprehend_base_operator_mock_hook): + comprehend_base_op = ComprehendBaseOperator( + task_id="comprehend_base_operator", + input_data_config=INPUT_DATA_CONFIG, + output_data_config=OUTPUT_DATA_CONFIG, + language_code=LANGUAGE_CODE, + data_access_role_arn=ROLE_ARN, + ) + mocked_hook = mock.MagicMock(name="MockHook") + mocked_client = mock.MagicMock(name="MockClient") + mocked_hook.conn = mocked_client + comprehend_base_operator_mock_hook.return_value = mocked_hook + assert comprehend_base_op.client == mocked_client + comprehend_base_operator_mock_hook.assert_called_once() + + +class TestComprehendStartPiiEntitiesDetectionJobOperator: + JOB_ID = "random-job-id-1234567" + MODE = "ONE_DOC_PER_LINE" + JOB_NAME = "TEST_START_PII_ENTITIES_DETECTION_JOB-1" + DEFAULT_JOB_NAME_STARTS_WITH = "start_pii_entities_detection_job" + + @pytest.fixture + def mock_conn(self) -> Generator[BaseAwsConnection, None, None]: + with mock.patch.object(ComprehendHook, "conn") as _conn: + _conn.start_pii_entities_detection_job.return_value = {"JobId": self.JOB_ID} + yield _conn + + @pytest.fixture + def comprehend_hook(self) -> Generator[ComprehendHook, None, None]: + with mock_aws(): + hook = ComprehendHook(aws_conn_id="aws_default") + yield hook + + def setup_method(self): + self.operator = ComprehendStartPiiEntitiesDetectionJobOperator( + task_id="start_pii_entities_detection_job", + input_data_config=INPUT_DATA_CONFIG, + output_data_config=OUTPUT_DATA_CONFIG, + data_access_role_arn=ROLE_ARN, + mode=self.MODE, + language_code=LANGUAGE_CODE, + start_pii_entities_kwargs={"JobName": self.JOB_NAME}, + ) + self.operator.defer = mock.MagicMock() + + def test_init(self): + assert self.operator.input_data_config == INPUT_DATA_CONFIG + assert self.operator.output_data_config == OUTPUT_DATA_CONFIG + assert self.operator.data_access_role_arn == ROLE_ARN + assert self.operator.mode == self.MODE + assert self.operator.language_code == LANGUAGE_CODE + assert self.operator.start_pii_entities_kwargs.get("JobName") == self.JOB_NAME + + @pytest.mark.parametrize( + "start_pii_entities_kwargs", + [ + pytest.param( Review Comment: Did you plan to add more parameters and not get around to it? If there is just one test case then you don't need the parametrization right? ########## airflow/providers/amazon/aws/operators/comprehend.py: ########## @@ -0,0 +1,195 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from functools import cached_property +from typing import TYPE_CHECKING, Any, Sequence + +from airflow.configuration import conf +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.hooks.comprehend import ComprehendHook +from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator +from airflow.providers.amazon.aws.triggers.comprehend import ComprehendPiiEntitiesDetectionJobCompletedTrigger +from airflow.providers.amazon.aws.utils import validate_execute_complete_event +from airflow.providers.amazon.aws.utils.mixins import aws_template_fields +from airflow.utils.timezone import utcnow + +if TYPE_CHECKING: + import boto3 + + from airflow.utils.context import Context + + +class ComprehendBaseOperator(AwsBaseOperator[ComprehendHook]): + """ + This is the base operator for Comprehend Service operators (not supposed to be used directly in DAGs). + + :param input_data_config: The input properties for a PII entities detection job. (templated) + :param output_data_config: Provides `configuration` parameters for the output of PII entity detection + jobs. (templated) + :param mode: Specifies whether the output provides the locations (offsets) of PII entities or a file in + which PII entities are redacted. If you set the mode parameter to ONLY_REDACTION. In that case you + must provide a RedactionConfig. + :param data_access_role_arn: The Amazon Resource Name (ARN) of the IAM role that grants Amazon Comprehend + read access to your input data. (templated) + :param language_code: The language of the input documents. (templated) + """ + + aws_hook_class = ComprehendHook + + template_fields: Sequence[str] = aws_template_fields( + "input_data_config", "output_data_config", "data_access_role_arn", "language_code" + ) + + template_fields_renderers: dict = {"input_data_config": "json", "output_data_config": "json"} + + def __init__( + self, + input_data_config: dict, + output_data_config: dict, + data_access_role_arn: str, + language_code: str, + **kwargs, + ): + super().__init__(**kwargs) + self.input_data_config = input_data_config + self.output_data_config = output_data_config + self.data_access_role_arn = data_access_role_arn + self.language_code = language_code + + @cached_property + def client(self) -> boto3.client: + """Create and return the Comprehend client.""" + return self.hook.conn + + def execute(self, context: Context): + """Must overwrite in child classes.""" + raise NotImplementedError("Please implement execute() in subclass") + + +class ComprehendStartPiiEntitiesDetectionJobOperator(ComprehendBaseOperator): + """ + Create a comprehend pii entities detection job for a collection of documents. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:ComprehendStartPiiEntitiesDetectionJobOperator` + + :param input_data_config: The input properties for a PII entities detection job. (templated) + :param output_data_config: Provides `configuration` parameters for the output of PII entity detection + jobs. (templated) + :param mode: Specifies whether the output provides the locations (offsets) of PII entities or a file in + which PII entities are redacted. If you set the mode parameter to ONLY_REDACTION. In that case you + must provide a RedactionConfig. + :param data_access_role_arn: The Amazon Resource Name (ARN) of the IAM role that grants Amazon Comprehend + read access to your input data. (templated) + :param language_code: The language of the input documents. (templated) + :param start_pii_entities_kwargs: Any optional parameters to pass to the job. If JobName is not provided + in start_pii_entities_kwargs, operator will create. + + :param wait_for_completion: Whether to wait for job to stop. (default: True) + :param waiter_delay: Time in seconds to wait between status checks. (default: 60) + :param waiter_max_attempts: Maximum number of attempts to check for job completion. (default: 20) + :param deferrable: If True, the operator will wait asynchronously for the job to stop. + This implies waiting for completion. This mode requires aiobotocore module to be installed. + (default: False) + :param aws_conn_id: The Airflow connection used for AWS credentials. + If this is ``None`` or empty then the default boto3 behaviour is used. If + running Airflow in a distributed manner and aws_conn_id is None or + empty, then default boto3 configuration would be used (and must be + maintained on each worker node). + :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. + :param verify: Whether to verify SSL certificates. See: + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html + :param botocore_config: Configuration dictionary (key-values) for botocore client. See: + https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html + """ + + def __init__( + self, + input_data_config: dict, + output_data_config: dict, + mode: str, + data_access_role_arn: str, + language_code: str, + start_pii_entities_kwargs: dict[str, Any] | None = None, + wait_for_completion: bool = True, + waiter_delay: int = 60, + waiter_max_attempts: int = 20, + deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), + **kwargs, + ): + super().__init__( + input_data_config=input_data_config, + output_data_config=output_data_config, + data_access_role_arn=data_access_role_arn, + language_code=language_code, + **kwargs, + ) + self.mode = mode + self.start_pii_entities_kwargs = start_pii_entities_kwargs or {} + self.wait_for_completion = wait_for_completion + self.waiter_delay = waiter_delay + self.waiter_max_attempts = waiter_max_attempts + self.deferrable = deferrable + + def execute(self, context: Context) -> str: + if self.start_pii_entities_kwargs.get("JobName", None) is None: + self.start_pii_entities_kwargs["JobName"] = ( + f"start_pii_entities_detection_job-{int(utcnow().timestamp())}" + ) + + self.log.info( + "Submitting start pii entities detection job '%s'.", self.start_pii_entities_kwargs["JobName"] + ) + job_id = self.client.start_pii_entities_detection_job( + InputDataConfig=self.input_data_config, + OutputDataConfig=self.output_data_config, + Mode=self.mode, + DataAccessRoleArn=self.data_access_role_arn, + LanguageCode=self.language_code, + **self.start_pii_entities_kwargs, + )["JobId"] + + message_description = f"start pii entities detection job {job_id} to complete." + if self.deferrable: + self.log.info("Deferring for %s.", message_description) + self.defer( + trigger=ComprehendPiiEntitiesDetectionJobCompletedTrigger( + job_id=job_id, + waiter_delay=self.waiter_delay, + waiter_max_attempts=self.waiter_max_attempts, + aws_conn_id=self.aws_conn_id, + ), + method_name="execute_complete", + ) + elif self.wait_for_completion: + self.log.info("Waiting to %s.", message_description) Review Comment: ```suggestion self.log.info("Waiting for %s", message_description) ``` ########## docs/apache-airflow-providers-amazon/operators/comprehend.rst: ########## @@ -0,0 +1,74 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +================= +Amazon Comprehend +================= + +`Amazon Comprehend <https://aws.amazon.com/comprehend/>`__ uses natural language processing (NLP) to +extract insights about the content of documents. It develops insights by recognizing the entities, key phrases, +language, sentiments, and other common elements in a document. + +Prerequisite Tasks +------------------ + +.. include:: ../_partials/prerequisite_tasks.rst + +Generic Parameters +------------------ + +.. include:: ../_partials/generic_parameters.rst + +Operators +--------- + +.. _howto/operator:ComprehendStartPiiEntitiesDetectionJobOperator: + +Create an Amazon Comprehend Start Pii Entities Detection Job +============================================================ + +To create an Amazon Comprehend Start Pii Entities Detection Job, you can use Review Comment: Acronyms should be all caps ```suggestion Create an Amazon Comprehend Start PII Entities Detection Job ============================================================ To create an Amazon Comprehend Start PII Entities Detection Job, you can use ``` ########## docs/apache-airflow-providers-amazon/operators/comprehend.rst: ########## @@ -0,0 +1,74 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +================= +Amazon Comprehend +================= + +`Amazon Comprehend <https://aws.amazon.com/comprehend/>`__ uses natural language processing (NLP) to +extract insights about the content of documents. It develops insights by recognizing the entities, key phrases, +language, sentiments, and other common elements in a document. + +Prerequisite Tasks +------------------ + +.. include:: ../_partials/prerequisite_tasks.rst + +Generic Parameters +------------------ + +.. include:: ../_partials/generic_parameters.rst + +Operators +--------- + +.. _howto/operator:ComprehendStartPiiEntitiesDetectionJobOperator: + +Create an Amazon Comprehend Start Pii Entities Detection Job +============================================================ + +To create an Amazon Comprehend Start Pii Entities Detection Job, you can use +:class:`~airflow.providers.amazon.aws.operators.comprehend.ComprehendStartPiiEntitiesDetectionJobOperator`. + +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_comprehend.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_start_pii_entities_detection_job] + :end-before: [END howto_operator_start_pii_entities_detection_job] + +Sensors +------- + +.. _howto/sensor:ComprehendStartPiiEntitiesDetectionJobCompletedSensor: + +Wait for an Amazon Comprehend Start Pii Entities Detection Job +============================================================== + +To wait on the state of an Amazon Comprehend Start Pii Entities Detection Job until it reaches a terminal Review Comment: ```suggestion Wait for an Amazon Comprehend Start Pii Entities Detection Job ============================================================== To wait on the state of an Amazon Comprehend Start Pii Entities Detection Job until it reaches a terminal ``` ########## airflow/providers/amazon/aws/operators/comprehend.py: ########## @@ -0,0 +1,195 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from functools import cached_property +from typing import TYPE_CHECKING, Any, Sequence + +from airflow.configuration import conf +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.hooks.comprehend import ComprehendHook +from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator +from airflow.providers.amazon.aws.triggers.comprehend import ComprehendPiiEntitiesDetectionJobCompletedTrigger +from airflow.providers.amazon.aws.utils import validate_execute_complete_event +from airflow.providers.amazon.aws.utils.mixins import aws_template_fields +from airflow.utils.timezone import utcnow + +if TYPE_CHECKING: + import boto3 + + from airflow.utils.context import Context + + +class ComprehendBaseOperator(AwsBaseOperator[ComprehendHook]): Review Comment: Is the benefit of this base class mostly just templating? ########## airflow/providers/amazon/aws/operators/comprehend.py: ########## @@ -0,0 +1,195 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from functools import cached_property +from typing import TYPE_CHECKING, Any, Sequence + +from airflow.configuration import conf +from airflow.exceptions import AirflowException +from airflow.providers.amazon.aws.hooks.comprehend import ComprehendHook +from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator +from airflow.providers.amazon.aws.triggers.comprehend import ComprehendPiiEntitiesDetectionJobCompletedTrigger +from airflow.providers.amazon.aws.utils import validate_execute_complete_event +from airflow.providers.amazon.aws.utils.mixins import aws_template_fields +from airflow.utils.timezone import utcnow + +if TYPE_CHECKING: + import boto3 + + from airflow.utils.context import Context + + +class ComprehendBaseOperator(AwsBaseOperator[ComprehendHook]): + """ + This is the base operator for Comprehend Service operators (not supposed to be used directly in DAGs). + + :param input_data_config: The input properties for a PII entities detection job. (templated) + :param output_data_config: Provides `configuration` parameters for the output of PII entity detection + jobs. (templated) + :param mode: Specifies whether the output provides the locations (offsets) of PII entities or a file in + which PII entities are redacted. If you set the mode parameter to ONLY_REDACTION. In that case you + must provide a RedactionConfig. + :param data_access_role_arn: The Amazon Resource Name (ARN) of the IAM role that grants Amazon Comprehend + read access to your input data. (templated) + :param language_code: The language of the input documents. (templated) + """ + + aws_hook_class = ComprehendHook + + template_fields: Sequence[str] = aws_template_fields( + "input_data_config", "output_data_config", "data_access_role_arn", "language_code" + ) + + template_fields_renderers: dict = {"input_data_config": "json", "output_data_config": "json"} + + def __init__( + self, + input_data_config: dict, + output_data_config: dict, + data_access_role_arn: str, + language_code: str, + **kwargs, + ): + super().__init__(**kwargs) + self.input_data_config = input_data_config + self.output_data_config = output_data_config + self.data_access_role_arn = data_access_role_arn + self.language_code = language_code + + @cached_property + def client(self) -> boto3.client: + """Create and return the Comprehend client.""" + return self.hook.conn + + def execute(self, context: Context): + """Must overwrite in child classes.""" + raise NotImplementedError("Please implement execute() in subclass") + + +class ComprehendStartPiiEntitiesDetectionJobOperator(ComprehendBaseOperator): + """ + Create a comprehend pii entities detection job for a collection of documents. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:ComprehendStartPiiEntitiesDetectionJobOperator` + + :param input_data_config: The input properties for a PII entities detection job. (templated) + :param output_data_config: Provides `configuration` parameters for the output of PII entity detection + jobs. (templated) + :param mode: Specifies whether the output provides the locations (offsets) of PII entities or a file in + which PII entities are redacted. If you set the mode parameter to ONLY_REDACTION. In that case you + must provide a RedactionConfig. + :param data_access_role_arn: The Amazon Resource Name (ARN) of the IAM role that grants Amazon Comprehend + read access to your input data. (templated) + :param language_code: The language of the input documents. (templated) + :param start_pii_entities_kwargs: Any optional parameters to pass to the job. If JobName is not provided + in start_pii_entities_kwargs, operator will create. + + :param wait_for_completion: Whether to wait for job to stop. (default: True) + :param waiter_delay: Time in seconds to wait between status checks. (default: 60) + :param waiter_max_attempts: Maximum number of attempts to check for job completion. (default: 20) + :param deferrable: If True, the operator will wait asynchronously for the job to stop. + This implies waiting for completion. This mode requires aiobotocore module to be installed. + (default: False) + :param aws_conn_id: The Airflow connection used for AWS credentials. + If this is ``None`` or empty then the default boto3 behaviour is used. If + running Airflow in a distributed manner and aws_conn_id is None or + empty, then default boto3 configuration would be used (and must be + maintained on each worker node). + :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. + :param verify: Whether to verify SSL certificates. See: + https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html + :param botocore_config: Configuration dictionary (key-values) for botocore client. See: + https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html + """ + + def __init__( + self, + input_data_config: dict, + output_data_config: dict, + mode: str, + data_access_role_arn: str, + language_code: str, + start_pii_entities_kwargs: dict[str, Any] | None = None, + wait_for_completion: bool = True, + waiter_delay: int = 60, + waiter_max_attempts: int = 20, + deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), + **kwargs, + ): + super().__init__( + input_data_config=input_data_config, + output_data_config=output_data_config, + data_access_role_arn=data_access_role_arn, + language_code=language_code, + **kwargs, + ) + self.mode = mode + self.start_pii_entities_kwargs = start_pii_entities_kwargs or {} + self.wait_for_completion = wait_for_completion + self.waiter_delay = waiter_delay + self.waiter_max_attempts = waiter_max_attempts + self.deferrable = deferrable + + def execute(self, context: Context) -> str: + if self.start_pii_entities_kwargs.get("JobName", None) is None: + self.start_pii_entities_kwargs["JobName"] = ( + f"start_pii_entities_detection_job-{int(utcnow().timestamp())}" + ) + + self.log.info( + "Submitting start pii entities detection job '%s'.", self.start_pii_entities_kwargs["JobName"] + ) + job_id = self.client.start_pii_entities_detection_job( + InputDataConfig=self.input_data_config, + OutputDataConfig=self.output_data_config, + Mode=self.mode, + DataAccessRoleArn=self.data_access_role_arn, + LanguageCode=self.language_code, + **self.start_pii_entities_kwargs, + )["JobId"] + + message_description = f"start pii entities detection job {job_id} to complete." + if self.deferrable: + self.log.info("Deferring for %s.", message_description) Review Comment: ```suggestion self.log.info("Deferring %s", message_description) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
