This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch release-2.35.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.35.0 by this push:
new edcfdc4 [BEAM-13388] Cherry-pick more changes for google cloud dlp
update. (#16268)
edcfdc4 is described below
commit edcfdc40954b05dd47905cbefb13d7a7437ea99b
Author: tvalentyn <[email protected]>
AuthorDate: Thu Dec 16 22:18:33 2021 -0800
[BEAM-13388] Cherry-pick more changes for google cloud dlp update. (#16268)
Co-authored-by: Brian Hulette <[email protected]>
Co-authored-by: Yichi Zhang <[email protected]>
Co-authored-by: Kyle Weaver <[email protected]>
Co-authored-by: Andy Ye <[email protected]>
---
.../apache_beam/io/gcp/pubsub_integration_test.py | 2 ++
sdks/python/apache_beam/ml/gcp/cloud_dlp.py | 22 ++++++++-----
sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py | 37 ++++++++++++++--------
3 files changed, 39 insertions(+), 22 deletions(-)
diff --git a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
index 541bb52..bbb914e 100644
--- a/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
+++ b/sdks/python/apache_beam/io/gcp/pubsub_integration_test.py
@@ -210,6 +210,8 @@ class PubSubIntegrationTest(unittest.TestCase):
@pytest.mark.it_postcommit
def test_streaming_with_attributes(self):
+ if self.runner_name == 'TestDataflowRunner':
+ pytest.skip("BEAM-13218")
self._test_streaming(with_attributes=True)
diff --git a/sdks/python/apache_beam/ml/gcp/cloud_dlp.py
b/sdks/python/apache_beam/ml/gcp/cloud_dlp.py
index 93510c8..e3fddba 100644
--- a/sdks/python/apache_beam/ml/gcp/cloud_dlp.py
+++ b/sdks/python/apache_beam/ml/gcp/cloud_dlp.py
@@ -20,9 +20,11 @@ functionality.
"""
import logging
+from typing import List
from google.cloud import dlp_v2
+from apache_beam import typehints
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.transforms import DoFn
from apache_beam.transforms import ParDo
@@ -35,6 +37,8 @@ _LOGGER = logging.getLogger(__name__)
@experimental()
[email protected]_input_types(str)
[email protected]_output_types(str)
class MaskDetectedDetails(PTransform):
"""Scrubs sensitive information detected in text.
The ``PTransform`` returns a ``PCollection`` of ``str``
@@ -126,6 +130,8 @@ class MaskDetectedDetails(PTransform):
@experimental()
[email protected]_input_types(str)
[email protected]_output_types(List[dlp_v2.types.dlp.Finding])
class InspectForDetails(PTransform):
"""Inspects input text for sensitive information.
the ``PTransform`` returns a ``PCollection`` of
@@ -190,13 +196,13 @@ class _DeidentifyFn(DoFn):
self.client = dlp_v2.DlpServiceClient()
self.params = {
'timeout': self.timeout,
- 'parent': self.client.project_path(self.project)
}
- self.params.update(self.config)
+ self.parent = self.client.common_project_path(self.project)
def process(self, element, **kwargs):
- operation = self.client.deidentify_content(
- item={"value": element}, **self.params)
+ request = {'item': {'value': element}, 'parent': self.parent}
+ request.update(self.config)
+ operation = self.client.deidentify_content(request=request, **self.params)
yield operation.item.value
@@ -213,12 +219,12 @@ class _InspectFn(DoFn):
self.client = dlp_v2.DlpServiceClient()
self.params = {
'timeout': self.timeout,
- "parent": self.client.project_path(self.project)
}
- self.params.update(self.config)
+ self.parent = self.client.common_project_path(self.project)
def process(self, element, **kwargs):
- operation = self.client.inspect_content(
- item={"value": element}, **self.params)
+ request = {'item': {'value': element}, 'parent': self.parent}
+ request.update(self.config)
+ operation = self.client.inspect_content(request=request, **self.params)
hits = [x for x in operation.result.findings]
yield hits
diff --git a/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py
b/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py
index 111e5be..d4153e5 100644
--- a/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py
+++ b/sdks/python/apache_beam/ml/gcp/cloud_dlp_test.py
@@ -37,6 +37,7 @@ else:
from apache_beam.ml.gcp.cloud_dlp import MaskDetectedDetails
from apache_beam.ml.gcp.cloud_dlp import _DeidentifyFn
from apache_beam.ml.gcp.cloud_dlp import _InspectFn
+ from google.cloud.dlp_v2.types import dlp
# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
_LOGGER = logging.getLogger(__name__)
@@ -56,6 +57,9 @@ class TestDeidentifyFn(unittest.TestCase):
def test_deidentify_called(self):
class ClientMock(object):
def deidentify_content(self, *args, **kwargs):
+ # Check that we can marshal a valid request.
+ dlp.DeidentifyContentRequest(kwargs['request'])
+
called = Metrics.counter('test_deidentify_text', 'called')
called.inc()
operation = mock.Mock()
@@ -64,27 +68,29 @@ class TestDeidentifyFn(unittest.TestCase):
operation.item = item
return operation
- def project_path(self, *args):
+ def common_project_path(self, *args):
return 'test'
with mock.patch('google.cloud.dlp_v2.DlpServiceClient', ClientMock):
p = TestPipeline()
- deidentify_config = {
- "info_type_transformations": {
- "transformations": [{
- "primitive_transformation": {
- "character_mask_config": {
- "masking_character": '#'
+ config = {
+ "deidentify_config": {
+ "info_type_transformations": {
+ "transformations": [{
+ "primitive_transformation": {
+ "character_mask_config": {
+ "masking_character": '#'
+ }
}
- }
- }]
+ }]
+ }
}
}
# pylint: disable=expression-not-assigned
(
p
| beam.Create(['[email protected]', '[email protected]'])
- | beam.ParDo(_DeidentifyFn(config=deidentify_config)))
+ | beam.ParDo(_DeidentifyFn(config=config)))
result = p.run()
result.wait_until_finish()
called = result.metrics().query()['counters'][0]
@@ -101,10 +107,13 @@ class TestInspectText(unittest.TestCase):
@unittest.skipIf(dlp_v2 is None, 'GCP dependencies are not installed')
-class TestDeidentifyFn(unittest.TestCase):
+class TestInspectFn(unittest.TestCase):
def test_inspect_called(self):
class ClientMock(object):
def inspect_content(self, *args, **kwargs):
+ # Check that we can marshal a valid request.
+ dlp.InspectContentRequest(kwargs['request'])
+
called = Metrics.counter('test_inspect_text', 'called')
called.inc()
operation = mock.Mock()
@@ -112,17 +121,17 @@ class TestDeidentifyFn(unittest.TestCase):
operation.result.findings = [None]
return operation
- def project_path(self, *args):
+ def common_project_path(self, *args):
return 'test'
with mock.patch('google.cloud.dlp_v2.DlpServiceClient', ClientMock):
p = TestPipeline()
- inspect_config = {"info_types": [{"name": "EMAIL_ADDRESS"}]}
+ config = {"inspect_config": {"info_types": [{"name": "EMAIL_ADDRESS"}]}}
# pylint: disable=expression-not-assigned
(
p
| beam.Create(['[email protected]', '[email protected]'])
- | beam.ParDo(_InspectFn(config=inspect_config)))
+ | beam.ParDo(_InspectFn(config=config)))
result = p.run()
result.wait_until_finish()
called = result.metrics().query()['counters'][0]