This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch release-2.35.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.35.0 by this push:
     new edcfdc4  [BEAM-13388] Cherry-pick more changes for google cloud dlp 
update. (#16268)
edcfdc4 is described below

commit edcfdc40954b05dd47905cbefb13d7a7437ea99b
Author: tvalentyn <[email protected]>
AuthorDate: Thu Dec 16 22:18:33 2021 -0800

    [BEAM-13388] Cherry-pick more changes for google cloud dlp update. (#16268)
    
    Co-authored-by: Brian Hulette <[email protected]>
    Co-authored-by: Yichi Zhang <[email protected]>
    Co-authored-by: Kyle Weaver <[email protected]>
    Co-authored-by: Andy Ye <[email protected]>
---
 .../apache_beam/io/gcp/pubsub_integration_test.py  |  2 ++
 sdks/python/apache_beam/ml/gcp/cloud_dlp.py        | 22 ++++++++-----
 sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py   | 37 ++++++++++++++--------
 3 files changed, 39 insertions(+), 22 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py 
b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
index 541bb52..bbb914e 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
@@ -210,6 +210,8 @@ class PubSubIntegrationTest(unittest.TestCase):
 
   @pytest.mark.it_postcommit
   def test_streaming_with_attributes(self):
+    if self.runner_name == 'TestDataflowRunner':
+      pytest.skip("BEAM-13218")
     self._test_streaming(with_attributes=True)
 
 
diff --git a/sdks/python/apache_beam/ml/gcp/cloud_dlp.py 
b/sdks/python/apache_beam/ml/gcp/cloud_dlp.py
index 93510c8..e3fddba 100644
--- a/sdks/python/apache_beam/ml/gcp/cloud_dlp.py
+++ b/sdks/python/apache_beam/ml/gcp/cloud_dlp.py
@@ -20,9 +20,11 @@ functionality.
 """
 
 import logging
+from typing import List
 
 from google.cloud import dlp_v2
 
+from apache_beam import typehints
 from apache_beam.options.pipeline_options import GoogleCloudOptions
 from apache_beam.transforms import DoFn
 from apache_beam.transforms import ParDo
@@ -35,6 +37,8 @@ _LOGGER = logging.getLogger(__name__)
 
 
 @experimental()
[email protected]_input_types(str)
[email protected]_output_types(str)
 class MaskDetectedDetails(PTransform):
   """Scrubs sensitive information detected in text.
   The ``PTransform`` returns a ``PCollection`` of ``str``
@@ -126,6 +130,8 @@ class MaskDetectedDetails(PTransform):
 
 
 @experimental()
[email protected]_input_types(str)
[email protected]_output_types(List[dlp_v2.types.dlp.Finding])
 class InspectForDetails(PTransform):
   """Inspects input text for sensitive information.
   the ``PTransform`` returns a ``PCollection`` of
@@ -190,13 +196,13 @@ class _DeidentifyFn(DoFn):
       self.client = dlp_v2.DlpServiceClient()
     self.params = {
         'timeout': self.timeout,
-        'parent': self.client.project_path(self.project)
     }
-    self.params.update(self.config)
+    self.parent = self.client.common_project_path(self.project)
 
   def process(self, element, **kwargs):
-    operation = self.client.deidentify_content(
-        item={"value": element}, **self.params)
+    request = {'item': {'value': element}, 'parent': self.parent}
+    request.update(self.config)
+    operation = self.client.deidentify_content(request=request, **self.params)
     yield operation.item.value
 
 
@@ -213,12 +219,12 @@ class _InspectFn(DoFn):
       self.client = dlp_v2.DlpServiceClient()
     self.params = {
         'timeout': self.timeout,
-        "parent": self.client.project_path(self.project)
     }
-    self.params.update(self.config)
+    self.parent = self.client.common_project_path(self.project)
 
   def process(self, element, **kwargs):
-    operation = self.client.inspect_content(
-        item={"value": element}, **self.params)
+    request = {'item': {'value': element}, 'parent': self.parent}
+    request.update(self.config)
+    operation = self.client.inspect_content(request=request, **self.params)
     hits = [x for x in operation.result.findings]
     yield hits
diff --git a/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py 
b/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py
index 111e5be..d4153e5 100644
--- a/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py
+++ b/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py
@@ -37,6 +37,7 @@ else:
   from apache_beam.ml.gcp.cloud_dlp import MaskDetectedDetails
   from apache_beam.ml.gcp.cloud_dlp import _DeidentifyFn
   from apache_beam.ml.gcp.cloud_dlp import _InspectFn
+  from google.cloud.dlp_v2.types import dlp
 # pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
 
 _LOGGER = logging.getLogger(__name__)
@@ -56,6 +57,9 @@ class TestDeidentifyFn(unittest.TestCase):
   def test_deidentify_called(self):
     class ClientMock(object):
       def deidentify_content(self, *args, **kwargs):
+        # Check that we can marshal a valid request.
+        dlp.DeidentifyContentRequest(kwargs['request'])
+
         called = Metrics.counter('test_deidentify_text', 'called')
         called.inc()
         operation = mock.Mock()
@@ -64,27 +68,29 @@ class TestDeidentifyFn(unittest.TestCase):
         operation.item = item
         return operation
 
-      def project_path(self, *args):
+      def common_project_path(self, *args):
         return 'test'
 
     with mock.patch('google.cloud.dlp_v2.DlpServiceClient', ClientMock):
       p = TestPipeline()
-      deidentify_config = {
-          "info_type_transformations": {
-              "transformations": [{
-                  "primitive_transformation": {
-                      "character_mask_config": {
-                          "masking_character": '#'
+      config = {
+          "deidentify_config": {
+              "info_type_transformations": {
+                  "transformations": [{
+                      "primitive_transformation": {
+                          "character_mask_config": {
+                              "masking_character": '#'
+                          }
                       }
-                  }
-              }]
+                  }]
+              }
           }
       }
       # pylint: disable=expression-not-assigned
       (
           p
           | beam.Create(['[email protected]', '[email protected]'])
-          | beam.ParDo(_DeidentifyFn(config=deidentify_config)))
+          | beam.ParDo(_DeidentifyFn(config=config)))
       result = p.run()
       result.wait_until_finish()
     called = result.metrics().query()['counters'][0]
@@ -101,10 +107,13 @@ class TestInspectText(unittest.TestCase):
 
 
 @unittest.skipIf(dlp_v2 is None, 'GCP dependencies are not installed')
-class TestDeidentifyFn(unittest.TestCase):
+class TestInspectFn(unittest.TestCase):
   def test_inspect_called(self):
     class ClientMock(object):
       def inspect_content(self, *args, **kwargs):
+        # Check that we can marshal a valid request.
+        dlp.InspectContentRequest(kwargs['request'])
+
         called = Metrics.counter('test_inspect_text', 'called')
         called.inc()
         operation = mock.Mock()
@@ -112,17 +121,17 @@ class TestDeidentifyFn(unittest.TestCase):
         operation.result.findings = [None]
         return operation
 
-      def project_path(self, *args):
+      def common_project_path(self, *args):
         return 'test'
 
     with mock.patch('google.cloud.dlp_v2.DlpServiceClient', ClientMock):
       p = TestPipeline()
-      inspect_config = {"info_types": [{"name": "EMAIL_ADDRESS"}]}
+      config = {"inspect_config": {"info_types": [{"name": "EMAIL_ADDRESS"}]}}
       # pylint: disable=expression-not-assigned
       (
           p
           | beam.Create(['[email protected]', '[email protected]'])
-          | beam.ParDo(_InspectFn(config=inspect_config)))
+          | beam.ParDo(_InspectFn(config=config)))
       result = p.run()
       result.wait_until_finish()
       called = result.metrics().query()['counters'][0]

Reply via email to