This is an automated email from the ASF dual-hosted git repository. leahecole pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/master by this push: new c281979 Documentation and example dag for CloudDLPDeidentifyContentOperator, GCSObjectExistenceSensor, GCSObjectsWithPrefixExistenceSensor (#14033) c281979 is described below commit c281979982c36f16c4c346c996a0c8d6ca7c630d Author: rachael-ds <45947385+rachael...@users.noreply.github.com> AuthorDate: Tue Feb 23 18:09:38 2021 +0000 Documentation and example dag for CloudDLPDeidentifyContentOperator, GCSObjectExistenceSensor, GCSObjectsWithPrefixExistenceSensor (#14033) * Add documentation and example dag for: CloudDLPDeidentifyContentOperator, GCSObjectExistenceSensor, GCSObjectsWtihPrefixExistenceSensor * Moving gcs sensor docs and example dags to gcs operators docs/example dags * Add system tests for dlp and gcs * Adding further information on DLPDeidentifyContent operators * Pre-Commit tidyup: Renamed gcs/dlp system tests * Apply suggestions from code review Co-authored-by: Kamil Breguła <mik-...@users.noreply.github.com> * reverting some changes following code review * removed redundant @pytest.mark.system("google.cloud") * removed operators with newly added examples from missing examples list (pytest fix) * updated all references to GCSObjectsWtihPrefixExistenceSensor (typo) to newly fixed: GCSObjectsWithPrefixExistenceSensor * fixing merge issue: including deprecated operator to be excluded from test suite of operators Co-authored-by: rachael-ds <rachael...@outlook.com> Co-authored-by: Kamil Breguła <mik-...@users.noreply.github.com> --- .../google/cloud/example_dags/example_dlp.py | 31 +++++++++++ .../google/cloud/example_dags/example_gcs.py | 40 ++++++++++++++ .../operators/cloud/data_loss_prevention.rst | 17 +++++- .../operators/cloud/gcs.rst | 62 +++++++++++++++++++--- docs/conf.py | 4 +- tests/always/test_project_structure.py | 6 +-- .../google/cloud/operators/test_dlp_system.py | 4 ++ .../google/cloud/operators/test_gcs_system.py | 4 ++ 8 files changed, 152 insertions(+), 16 deletions(-) diff --git a/airflow/providers/google/cloud/example_dags/example_dlp.py b/airflow/providers/google/cloud/example_dags/example_dlp.py index fdc6136..43a3d92 100644 --- a/airflow/providers/google/cloud/example_dags/example_dlp.py +++ b/airflow/providers/google/cloud/example_dags/example_dlp.py @@ -33,6 +33,7 @@ from airflow.providers.google.cloud.operators.dlp import ( CloudDLPCreateInspectTemplateOperator, CloudDLPCreateJobTriggerOperator, CloudDLPCreateStoredInfoTypeOperator, + CloudDLPDeidentifyContentOperator, CloudDLPDeleteInspectTemplateOperator, CloudDLPDeleteJobTriggerOperator, CloudDLPDeleteStoredInfoTypeOperator, @@ -177,3 +178,33 @@ with models.DAG( ) # [END howto_operator_dlp_delete_job_trigger] create_trigger >> update_trigger >> delete_trigger + +# [START dlp_deidentify_config_example] +DEIDENTIFY_CONFIG = { + "info_type_transformations": { + "transformations": [ + { + "primitive_transformation": { + "replace_config": {"new_value": {"string_value": "[deidentified_number]"}} + } + } + ] + } +} +# [END dlp_deidentify_config_example] + +with models.DAG( + "example_gcp_dlp_deidentify_content", + schedule_interval=None, + start_date=days_ago(1), + tags=["example", "dlp", "deidentify"], +) as dag4: + # [START _howto_operator_dlp_deidentify_content] + deidentify_content = CloudDLPDeidentifyContentOperator( + project_id=GCP_PROJECT, + item=ITEM, + deidentify_config=DEIDENTIFY_CONFIG, + inspect_config=INSPECT_CONFIG, + task_id="deidentify_content", + ) + # [END _howto_operator_dlp_deidentify_content] diff --git a/airflow/providers/google/cloud/example_dags/example_gcs.py b/airflow/providers/google/cloud/example_dags/example_gcs.py index 8719d4e..40a9c47 100644 --- a/airflow/providers/google/cloud/example_dags/example_gcs.py +++ b/airflow/providers/google/cloud/example_dags/example_gcs.py @@ -32,6 +32,10 @@ from airflow.providers.google.cloud.operators.gcs import ( GCSListObjectsOperator, GCSObjectCreateAclEntryOperator, ) +from airflow.providers.google.cloud.sensors.gcs import ( + GCSObjectExistenceSensor, + GCSObjectsWithPrefixExistenceSensor, +) from airflow.providers.google.cloud.transfers.gcs_to_gcs import GCSToGCSOperator from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator @@ -48,6 +52,7 @@ BUCKET_2 = os.environ.get("GCP_GCS_BUCKET_2", "test-gcs-example-bucket-2") PATH_TO_TRANSFORM_SCRIPT = os.environ.get('GCP_GCS_PATH_TO_TRANSFORM_SCRIPT', 'test.py') PATH_TO_UPLOAD_FILE = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE", "test-gcs-example.txt") +PATH_TO_UPLOAD_FILE_PREFIX = os.environ.get("GCP_GCS_PATH_TO_UPLOAD_FILE_PREFIX", "test-gcs-") PATH_TO_SAVED_FILE = os.environ.get("GCP_GCS_PATH_TO_SAVED_FILE", "test-gcs-example-download.txt") BUCKET_FILE_LOCATION = PATH_TO_UPLOAD_FILE.rpartition("/")[-1] @@ -151,6 +156,41 @@ with models.DAG( copy_file >> delete_bucket_2 delete_files >> delete_bucket_1 +with models.DAG( + "example_gcs_sensors", + start_date=days_ago(1), + schedule_interval=None, + tags=['example'], +) as dag2: + create_bucket = GCSCreateBucketOperator( + task_id="create_bucket", bucket_name=BUCKET_1, project_id=PROJECT_ID + ) + upload_file = LocalFilesystemToGCSOperator( + task_id="upload_file", + src=PATH_TO_UPLOAD_FILE, + dst=BUCKET_FILE_LOCATION, + bucket=BUCKET_1, + ) + # [START howto_sensor_object_exists_task] + gcs_object_exists = GCSObjectExistenceSensor( + bucket=BUCKET_1, + object=PATH_TO_UPLOAD_FILE, + mode='poke', + task_id="gcs_object_exists_task", + ) + # [END howto_sensor_object_exists_task] + # [START howto_sensor_object_with_prefix_exists_task] + gcs_object_with_prefix_exists = GCSObjectsWithPrefixExistenceSensor( + bucket=BUCKET_1, + prefix=PATH_TO_UPLOAD_FILE_PREFIX, + mode='poke', + task_id="gcs_object_with_prefix_exists_task", + ) + # [END howto_sensor_object_with_prefix_exists_task] + delete_bucket = GCSDeleteBucketOperator(task_id="delete_bucket", bucket_name=BUCKET_1) + + create_bucket >> upload_file >> [gcs_object_exists, gcs_object_with_prefix_exists] >> delete_bucket + if __name__ == '__main__': dag.clear(dag_run_state=State.NONE) diff --git a/docs/apache-airflow-providers-google/operators/cloud/data_loss_prevention.rst b/docs/apache-airflow-providers-google/operators/cloud/data_loss_prevention.rst index 1e5082e..af981d2 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/data_loss_prevention.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/data_loss_prevention.rst @@ -296,10 +296,25 @@ Unlike storage methods (Jobs) content method are synchronous, stateless methods. De-identify Content """"""""""""""""""" +De-identification is the process of removing identifying information from data. +Configuration information defines how you want the sensitive data de-identified. -To de-identify potentially sensitive info from a content item, you can use +This config can either be saved and persisted in de-identification templates or defined in a :class:`~google.cloud.dlp_v2.types.DeidentifyConfig` object: + +.. literalinclude:: /../../airflow/providers/google/cloud/example_dags/example_dlp.py + :language: python + :start-after: [START dlp_deidentify_config_example] + :end-before: [END dlp_deidentify_config_example] + +To de-identify potentially sensitive information from a content item, you can use :class:`~airflow.providers.google.cloud.operators.cloud.dlp.CloudDLPDeidentifyContentOperator`. +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_dlp.py + :language: python + :dedent: 4 + :start-after: [START _howto_operator_dlp_deidentify_content] + :end-before: [END _howto_operator_dlp_deidentify_content] + .. _howto/operator:CloudDLPReidentifyContentOperator: Re-identify Content diff --git a/docs/apache-airflow-providers-google/operators/cloud/gcs.rst b/docs/apache-airflow-providers-google/operators/cloud/gcs.rst index b251f94..0ad54a1 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/gcs.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/gcs.rst @@ -20,6 +20,10 @@ Google Cloud Storage Operators ============================== +Cloud Storage allows world-wide storage and retrieval of any amount of data at any time. +You can use Cloud Storage for a range of scenarios including serving website content, +storing data for archival and disaster recovery, or distributing large data objects to users via direct download. + .. contents:: :depth: 1 :local: @@ -29,6 +33,9 @@ Prerequisite Tasks .. include::/operators/_partials/prerequisite_tasks.rst +Operators +^^^^^^^^^ + .. _howto/operator:GCSToBigQueryOperator: GCSToBigQueryOperator @@ -111,13 +118,6 @@ More information See Google Cloud Storage insert documentation to `create a ACL entry for ObjectAccess <https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert>`_. -Reference ---------- - -For further information, look at: - -* `Client Library Documentation <https://googleapis.github.io/google-cloud-python/latest/storage/index.html>`__ -* `Product Documentation <https://cloud.google.com/storage/docs/>`__ .. _howto/operator:GCSDeleteBucketOperator: @@ -134,14 +134,60 @@ It is performed through the :start-after: [START howto_operator_gcs_delete_bucket] :end-before: [END howto_operator_gcs_delete_bucket] + You can use :ref:`Jinja templating <jinja-templating>` with :template-fields:`airflow.providers.google.cloud.operators.gcs.GCSDeleteBucketOperator` parameters which allows you to dynamically determine values. Reference -^^^^^^^^^ +--------- For further information, look at: * `Client Library Documentation <https://googleapis.dev/python/storage/latest/buckets.html>`__ * `Product Documentation <https://cloud.google.com/storage/docs/json_api/v1/buckets>`__ + +Sensors +^^^^^^^ + +.. _howto/sensor:GCSObjectExistenceSensor: + +GCSObjectExistenceSensor +------------------------ + +Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor` to wait (poll) for the existence of a file in Google Cloud Storage. + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs.py + :language: python + :dedent: 4 + :start-after: [START howto_sensor_object_exists_task] + :end-before: [END howto_sensor_object_exists_task] + +.. _howto/sensor:GCSObjectsWithPrefixExistenceSensor: + +GCSObjectsWithPrefixExistenceSensor +----------------------------------- + +Use the :class:`~airflow.providers.google.cloud.sensors.gcs.GCSObjectsWithPrefixExistenceSensor` to wait (poll) for the existence of a file with a specified prefix in Google Cloud Storage. + +.. exampleinclude:: /../../airflow/providers/google/cloud/example_dags/example_gcs.py + :language: python + :dedent: 4 + :start-after: [START howto_sensor_object_with_prefix_exists_task] + :end-before: [END howto_sensor_object_with_prefix_exists_task] + +More information +"""""""""""""""" + +Sensors have different modes that determine the behaviour of resources while the task is executing. +See `Airflow sensors documentation +<https://airflow.apache.org/docs/apache-airflow/stable/concepts.html#sensors>`_ for best practices when using sensors. + + +Reference +^^^^^^^^^ + +For further information, look at: + +* `Client Library Documentation <https://googleapis.github.io/google-cloud-python/latest/storage/index.html>`__ +* `Product Documentation <https://cloud.google.com/storage/docs/>`__ diff --git a/docs/conf.py b/docs/conf.py index d49759d..e3b25a7 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -152,9 +152,7 @@ if PACKAGE_NAME == 'apache-airflow': 'README.rst', ] elif PACKAGE_NAME.startswith('apache-airflow-providers-'): - exclude_patterns = [ - 'operators/_partials', - ] + exclude_patterns = ['operators/_partials'] else: exclude_patterns = [] diff --git a/tests/always/test_project_structure.py b/tests/always/test_project_structure.py index dc2a33c..560f379 100644 --- a/tests/always/test_project_structure.py +++ b/tests/always/test_project_structure.py @@ -169,6 +169,8 @@ class TestGoogleProviderProjectStructure(unittest.TestCase): # Deprecated operator. Ignore it. 'airflow.providers.google.cloud.operators.cloud_storage_transfer_service' '.CloudDataTransferServiceGCSToGCSOperator', + # Deprecated operator. Ignore it. + 'airflow.providers.google.cloud.sensors.gcs.GCSObjectsWtihPrefixExistenceSensor', # Base operator. Ignore it. 'airflow.providers.google.cloud.operators.cloud_sql.CloudSQLBaseOperator', # Deprecated operator. Ignore it @@ -198,7 +200,6 @@ class TestGoogleProviderProjectStructure(unittest.TestCase): 'airflow.providers.google.cloud.operators.dlp.CloudDLPCreateDeidentifyTemplateOperator', 'airflow.providers.google.cloud.operators.dlp.CloudDLPCreateDLPJobOperator', 'airflow.providers.google.cloud.operators.dlp.CloudDLPUpdateDeidentifyTemplateOperator', - 'airflow.providers.google.cloud.operators.dlp.CloudDLPDeidentifyContentOperator', 'airflow.providers.google.cloud.operators.dlp.CloudDLPGetDLPJobTriggerOperator', 'airflow.providers.google.cloud.operators.dlp.CloudDLPListDeidentifyTemplatesOperator', 'airflow.providers.google.cloud.operators.dlp.CloudDLPGetDeidentifyTemplateOperator', @@ -218,10 +219,7 @@ class TestGoogleProviderProjectStructure(unittest.TestCase): 'airflow.providers.google.cloud.operators.datastore.CloudDatastoreGetOperationOperator', # Base operator. Ignore it 'airflow.providers.google.cloud.operators.compute.ComputeEngineBaseOperator', - 'airflow.providers.google.cloud.sensors.gcs.GCSObjectExistenceSensor', 'airflow.providers.google.cloud.sensors.gcs.GCSObjectUpdateSensor', - 'airflow.providers.google.cloud.sensors.gcs.GCSObjectsWithPrefixExistenceSensor', - 'airflow.providers.google.cloud.sensors.gcs.GCSObjectsWtihPrefixExistenceSensor', 'airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor', } diff --git a/tests/providers/google/cloud/operators/test_dlp_system.py b/tests/providers/google/cloud/operators/test_dlp_system.py index 12296ae..38f6bcb 100644 --- a/tests/providers/google/cloud/operators/test_dlp_system.py +++ b/tests/providers/google/cloud/operators/test_dlp_system.py @@ -51,3 +51,7 @@ class GcpDLPExampleDagsSystemTest(GoogleSystemTest): @provide_gcp_context(GCP_DLP_KEY) def test_run_example_dlp_job(self): self.run_dag('example_gcp_dlp_job', CLOUD_DAG_FOLDER) + + @provide_gcp_context(GCP_DLP_KEY) + def test_run_example_dlp_deidentify_content(self): + self.run_dag('example_gcp_dlp_deidentify_content', CLOUD_DAG_FOLDER) diff --git a/tests/providers/google/cloud/operators/test_gcs_system.py b/tests/providers/google/cloud/operators/test_gcs_system.py index 66f5345..bcff567 100644 --- a/tests/providers/google/cloud/operators/test_gcs_system.py +++ b/tests/providers/google/cloud/operators/test_gcs_system.py @@ -41,3 +41,7 @@ class GoogleCloudStorageExampleDagsTest(GoogleSystemTest): @provide_gcp_context(GCP_GCS_KEY) def test_run_example_dag(self): self.run_dag('example_gcs', CLOUD_DAG_FOLDER) + + @provide_gcp_context(GCP_GCS_KEY) + def test_run_example_gcs_sensor_dag(self): + self.run_dag('example_gcs_sensors', CLOUD_DAG_FOLDER)