This is an automated email from the ASF dual-hosted git repository. mwalenia pushed a commit to branch BEAM-9723-java-dlp in repository https://gitbox.apache.org/repos/asf/beam.git
commit a6c0dc3024f1defc1f659b2371f516925bc9d7f2 Author: Michal Walenia <[email protected]> AuthorDate: Mon Apr 27 10:04:12 2020 +0200 [BEAM-9723] Add DLP integration transforms --- sdks/java/extensions/ml/build.gradle | 5 +- .../beam/sdk/extensions/ml/BatchRequestForDLP.java | 119 +++++++++++++ .../beam/sdk/extensions/ml/DLPDeidentifyText.java | 186 +++++++++++++++++++++ .../beam/sdk/extensions/ml/DLPInspectText.java | 147 ++++++++++++++++ .../beam/sdk/extensions/ml/DLPReidentifyText.java | 179 ++++++++++++++++++++ .../sdk/extensions/ml/DLPTextOperationsIT.java | 121 ++++++++++++++ 6 files changed, 756 insertions(+), 1 deletion(-) diff --git a/sdks/java/extensions/ml/build.gradle b/sdks/java/extensions/ml/build.gradle index 274c074..6f9b567 100644 --- a/sdks/java/extensions/ml/build.gradle +++ b/sdks/java/extensions/ml/build.gradle @@ -26,10 +26,13 @@ description = 'Apache Beam :: SDKs :: Java :: Extensions :: ML' dependencies { compile project(path: ":sdks:java:core", configuration: "shadow") compile project(":sdks:java:expansion-service") - testCompile project(path: ':sdks:java:core', configuration: 'shadowTest') compile 'com.google.cloud:google-cloud-video-intelligence:1.2.0' + compile 'com.google.cloud:google-cloud-dlp:1.1.1' + testCompile project(path: ':sdks:java:core', configuration: 'shadowTest') testCompile library.java.mockito_core testCompile 'com.google.cloud:google-cloud-video-intelligence:1.2.0' + testCompile 'com.google.cloud:google-cloud-dlp:1.1.1' + testCompile project(path: ":sdks:java:extensions:google-cloud-platform-core", configuration: "testRuntime") testCompile library.java.junit testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") testRuntimeOnly project(":runners:google-cloud-dataflow-java") diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java new file mode 100644 index 0000000..9a2d4d2 --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForDLP.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Instant; + +/** + * DoFn batching the input PCollection into bigger requests in order to better utilize the Cloud DLP + * service. + */ +class BatchRequestForDLP extends DoFn<KV<String, String>, KV<String, String>> { + private final Integer batchSize; + public static final Integer DLP_PAYLOAD_LIMIT = 52400; + + public BatchRequestForDLP(Integer batchSize) { + if (batchSize > DLP_PAYLOAD_LIMIT) { + throw new IllegalArgumentException( + "DLP batch size exceeds payload limit.\n" + + "Batch size should be smaller than " + + DLP_PAYLOAD_LIMIT); + } + this.batchSize = batchSize; + } + + @StateId("elementsBag") + private final StateSpec<BagState<KV<String, String>>> elementsBag = StateSpecs.bag(); + + @StateId("elementsSize") + private final StateSpec<ValueState<Integer>> elementsSize = StateSpecs.value(); + + @TimerId("eventTimer") + private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); + + @ProcessElement + public void process( + @Element KV<String, String> element, + @StateId("elementsBag") BagState<KV<String, String>> elementsBag, + @StateId("elementsSize") ValueState<Integer> elementsSize, + @Timestamp Instant elementTs, + @TimerId("eventTimer") Timer eventTimer, + OutputReceiver<KV<String, String>> output) { + eventTimer.set(elementTs); + Integer currentElementSize = + (element.getValue() == null) ? 0 : element.getValue().getBytes(UTF_8).length; + Integer currentBufferSize = (elementsSize.read() == null) ? 0 : elementsSize.read(); + boolean clearBuffer = (currentElementSize + currentBufferSize) > batchSize; + if (clearBuffer) { + KV<String, String> inspectBufferedData = emitResult(elementsBag.read()); + output.output(inspectBufferedData); + DLPInspectText.LOG.info( + "****CLEAR BUFFER Key {} **** Current Content Size {}", + inspectBufferedData.getKey(), + inspectBufferedData.getValue().getBytes(UTF_8).length); + clearState(elementsBag, elementsSize); + } else { + elementsBag.add(element); + elementsSize.write(currentElementSize + currentBufferSize); + } + } + + @OnTimer("eventTimer") + public void onTimer( + @StateId("elementsBag") BagState<KV<String, String>> elementsBag, + @StateId("elementsSize") ValueState<Integer> elementsSize, + OutputReceiver<KV<String, String>> output) { + // Process left over records less than batch size + KV<String, String> inspectBufferedData = emitResult(elementsBag.read()); + output.output(inspectBufferedData); + DLPInspectText.LOG.info( + "****Timer Triggered Key {} **** Current Content Size {}", + inspectBufferedData.getKey(), + inspectBufferedData.getValue().getBytes(UTF_8).length); + clearState(elementsBag, elementsSize); + } + + private static KV<String, String> emitResult(Iterable<KV<String, String>> bufferData) { + StringBuilder builder = new StringBuilder(); + String fileName = + (bufferData.iterator().hasNext()) ? bufferData.iterator().next().getKey() : "UNKNOWN_FILE"; + bufferData.forEach( + e -> { + builder.append(e.getValue()); + }); + return KV.of(fileName, builder.toString()); + } + + private static void clearState( + BagState<KV<String, String>> elementsBag, ValueState<Integer> elementsSize) { + elementsBag.clear(); + elementsSize.clear(); + } +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java new file mode 100644 index 0000000..b7e037a --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPDeidentifyText.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.auto.value.AutoValue; +import com.google.cloud.dlp.v2.DlpServiceClient; +import com.google.privacy.dlp.v2.ContentItem; +import com.google.privacy.dlp.v2.DeidentifyConfig; +import com.google.privacy.dlp.v2.DeidentifyContentRequest; +import com.google.privacy.dlp.v2.DeidentifyContentResponse; +import com.google.privacy.dlp.v2.InspectConfig; +import com.google.privacy.dlp.v2.ProjectName; +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} connecting to Cloud DLP and deidentifying text according to provided + * settings. + * + * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set. The + * situation is the same with deidentifyTemplateName and deidentifyConfig ({@link DeidentifyConfig}. + * + * <p>Batch size defines how big are batches sent to DLP at once in bytes. + */ +@Experimental +@AutoValue +public abstract class DLPDeidentifyText + extends PTransform<PCollection<KV<String, String>>, PCollection<KV<String, String>>> { + + public static final Logger LOG = LoggerFactory.getLogger(DLPInspectText.class); + + public static final Integer DLP_PAYLOAD_LIMIT = 52400; + + @Nullable + public abstract String inspectTemplateName(); + + @Nullable + public abstract String deidentifyTemplateName(); + + @Nullable + public abstract InspectConfig inspectConfig(); + + @Nullable + public abstract DeidentifyConfig deidentifyConfig(); + + public abstract Integer batchSize(); + + public abstract String projectId(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setInspectTemplateName(String inspectTemplateName); + + public abstract Builder setBatchSize(Integer batchSize); + + public abstract Builder setProjectId(String projectId); + + public abstract Builder setDeidentifyTemplateName(String deidentifyTemplateName); + + public abstract Builder setInspectConfig(InspectConfig inspectConfig); + + public abstract Builder setDeidentifyConfig(DeidentifyConfig deidentifyConfig); + + public abstract DLPDeidentifyText build(); + } + + public static DLPDeidentifyText.Builder newBuilder() { + return new AutoValue_DLPDeidentifyText.Builder(); + } + + /** + * The transform batches the contents of input PCollection and then calls Cloud DLP service to + * perform the deidentification. + * + * @param input input PCollection + * @return PCollection after transformations + */ + @Override + public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) { + return input + .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize()))) + .apply( + "DLPDeidentify", + ParDo.of( + new DeidentifyText( + projectId(), + inspectTemplateName(), + deidentifyTemplateName(), + inspectConfig(), + deidentifyConfig()))); + } + + static class DeidentifyText extends DoFn<KV<String, String>, KV<String, String>> { + private final String projectId; + private final String inspectTemplateName; + private final String deidentifyTemplateName; + private final InspectConfig inspectConfig; + private final DeidentifyConfig deidentifyConfig; + private transient DeidentifyContentRequest.Builder requestBuilder; + + @Setup + public void setup() throws IOException { + requestBuilder = + DeidentifyContentRequest.newBuilder().setParent(ProjectName.of(projectId).toString()); + if (inspectTemplateName != null) { + requestBuilder.setInspectTemplateName(inspectTemplateName); + } + if (inspectConfig != null) { + requestBuilder.setInspectConfig(inspectConfig); + } + if (inspectConfig == null && inspectTemplateName == null) { + throw new IllegalArgumentException( + "Either inspectConfig or inspectTemplateName need to be set!"); + } + if (deidentifyConfig != null) { + requestBuilder.setDeidentifyConfig(deidentifyConfig); + } + if (deidentifyTemplateName != null) { + requestBuilder.setDeidentifyTemplateName(deidentifyTemplateName); + } + if (deidentifyConfig == null && deidentifyTemplateName == null) { + throw new IllegalArgumentException( + "Either deidentifyConfig or deidentifyTemplateName need to be set!"); + } + } + + public DeidentifyText( + String projectId, + String inspectTemplateName, + String deidentifyTemplateName, + InspectConfig inspectConfig, + DeidentifyConfig deidentifyConfig) { + this.projectId = projectId; + this.inspectTemplateName = inspectTemplateName; + this.deidentifyTemplateName = deidentifyTemplateName; + this.inspectConfig = inspectConfig; + this.deidentifyConfig = deidentifyConfig; + } + + @ProcessElement + public void processElement(ProcessContext c) throws IOException { + try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) { + if (!c.element().getValue().isEmpty()) { + ContentItem contentItem = + ContentItem.newBuilder().setValue(c.element().getValue()).build(); + this.requestBuilder.setItem(contentItem); + if (this.requestBuilder.build().getSerializedSize() > DLP_PAYLOAD_LIMIT) { + String errorMessage = + String.format( + "Payload Size %s Exceeded Batch Size %s", + this.requestBuilder.build().getSerializedSize(), DLP_PAYLOAD_LIMIT); + LOG.error(errorMessage); + } else { + DeidentifyContentResponse response = + dlpServiceClient.deidentifyContent(this.requestBuilder.build()); + response.getItem().getValue(); + c.output(KV.of(c.element().getKey(), response.getItem().getValue())); + } + } + } + } + } +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java new file mode 100644 index 0000000..e63bfe3 --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPInspectText.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.auto.value.AutoValue; +import com.google.cloud.dlp.v2.DlpServiceClient; +import com.google.privacy.dlp.v2.ContentItem; +import com.google.privacy.dlp.v2.Finding; +import com.google.privacy.dlp.v2.InspectConfig; +import com.google.privacy.dlp.v2.InspectContentRequest; +import com.google.privacy.dlp.v2.InspectContentResponse; +import com.google.privacy.dlp.v2.ProjectName; +import java.io.IOException; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} connecting to Cloud DLP and inspecting text for identifying data according + * to provided settings. + * + * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set. + * + * <p>Batch size defines how big are batches sent to DLP at once in bytes. + */ +@Experimental +@AutoValue +public abstract class DLPInspectText + extends PTransform<PCollection<KV<String, String>>, PCollection<List<Finding>>> { + public static final Logger LOG = LoggerFactory.getLogger(DLPInspectText.class); + + public static final Integer DLP_PAYLOAD_LIMIT = 52400; + + @Nullable + public abstract String inspectTemplateName(); + + @Nullable + public abstract InspectConfig inspectConfig(); + + public abstract Integer batchSize(); + + public abstract String projectId(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setInspectTemplateName(String inspectTemplateName); + + public abstract Builder setInspectConfig(InspectConfig inspectConfig); + + public abstract Builder setBatchSize(Integer batchSize); + + public abstract Builder setProjectId(String projectId); + + public abstract DLPInspectText build(); + } + + public static Builder newBuilder() { + return new AutoValue_DLPInspectText.Builder(); + } + + @Override + public PCollection<List<Finding>> expand(PCollection<KV<String, String>> input) { + return input + .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize()))) + .apply( + "DLPInspect", + ParDo.of(new InspectData(projectId(), inspectTemplateName(), inspectConfig()))); + } + + public static class InspectData extends DoFn<KV<String, String>, List<Finding>> { + private final String projectId; + private final String inspectTemplateName; + private final InspectConfig inspectConfig; + private transient InspectContentRequest.Builder requestBuilder; + private final Counter numberOfBytesInspected = + Metrics.counter(InspectData.class, "NumberOfBytesInspected"); + + public InspectData(String projectId, String inspectTemplateName, InspectConfig inspectConfig) { + this.projectId = projectId; + this.inspectTemplateName = inspectTemplateName; + this.inspectConfig = inspectConfig; + } + + @Setup + public void setup() { + this.requestBuilder = + InspectContentRequest.newBuilder().setParent(ProjectName.of(this.projectId).toString()); + if (inspectTemplateName != null) { + requestBuilder.setInspectTemplateName(this.inspectTemplateName); + } + if (inspectConfig != null) { + requestBuilder.setInspectConfig(inspectConfig); + } + if (inspectTemplateName == null && inspectConfig == null) { + throw new IllegalArgumentException(""); + } + } + + @ProcessElement + public void processElement(ProcessContext c) throws IOException { + try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) { + if (!c.element().getValue().isEmpty()) { + ContentItem contentItem = + ContentItem.newBuilder().setValue(c.element().getValue()).build(); + this.requestBuilder.setItem(contentItem); + if (this.requestBuilder.build().getSerializedSize() > DLP_PAYLOAD_LIMIT) { + String errorMessage = + String.format( + "Payload Size %s Exceeded Batch Size %s", + this.requestBuilder.build().getSerializedSize(), DLP_PAYLOAD_LIMIT); + LOG.error(errorMessage); + } else { + InspectContentResponse response = + dlpServiceClient.inspectContent(this.requestBuilder.build()); + List<Finding> findingsList = response.getResult().getFindingsList(); + c.output(findingsList); + numberOfBytesInspected.inc(contentItem.getSerializedSize()); + } + } + } + } + } +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java new file mode 100644 index 0000000..e841d9a --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/DLPReidentifyText.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.auto.value.AutoValue; +import com.google.cloud.dlp.v2.DlpServiceClient; +import com.google.privacy.dlp.v2.ContentItem; +import com.google.privacy.dlp.v2.DeidentifyConfig; +import com.google.privacy.dlp.v2.InspectConfig; +import com.google.privacy.dlp.v2.ProjectName; +import com.google.privacy.dlp.v2.ReidentifyContentRequest; +import com.google.privacy.dlp.v2.ReidentifyContentResponse; +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A {@link PTransform} connecting to Cloud DLP and inspecting text for identifying data according + * to provided settings. + * + * <p>Either inspectTemplateName (String) or inspectConfig {@link InspectConfig} need to be set, the + * same goes for reidentifyTemplateName or reidentifyConfig. + * + * <p>Batch size defines how big are batches sent to DLP at once in bytes. + */ +@Experimental +@AutoValue +public abstract class DLPReidentifyText + extends PTransform<PCollection<KV<String, String>>, PCollection<KV<String, String>>> { + + public static final Logger LOG = LoggerFactory.getLogger(DLPInspectText.class); + + public static final Integer DLP_PAYLOAD_LIMIT = 52400; + + @Nullable + public abstract String inspectTemplateName(); + + @Nullable + public abstract String reidentifyTemplateName(); + + @Nullable + public abstract InspectConfig inspectConfig(); + + @Nullable + public abstract DeidentifyConfig reidentifyConfig(); + + public abstract Integer batchSize(); + + public abstract String projectId(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setInspectTemplateName(String inspectTemplateName); + + public abstract Builder setInspectConfig(InspectConfig inspectConfig); + + public abstract Builder setReidentifyConfig(DeidentifyConfig deidentifyConfig); + + public abstract Builder setReidentifyTemplateName(String deidentifyTemplateName); + + public abstract Builder setBatchSize(Integer batchSize); + + public abstract Builder setProjectId(String projectId); + + public abstract DLPReidentifyText build(); + } + + public static DLPReidentifyText.Builder newBuilder() { + return new AutoValue_DLPReidentifyText.Builder(); + } + + @Override + public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) { + return input + .apply("Batch Contents", ParDo.of(new BatchRequestForDLP(batchSize()))) + .apply( + "DLPDeidentify", + ParDo.of( + new ReidentifyText( + projectId(), + inspectTemplateName(), + reidentifyTemplateName(), + inspectConfig(), + reidentifyConfig()))); + } + + public static class ReidentifyText extends DoFn<KV<String, String>, KV<String, String>> { + private final String projectId; + private final String inspectTemplateName; + private final String reidentifyTemplateName; + private final InspectConfig inspectConfig; + private final DeidentifyConfig reidentifyConfig; + private transient ReidentifyContentRequest.Builder requestBuilder; + + @Setup + public void setup() throws IOException { + requestBuilder = + ReidentifyContentRequest.newBuilder().setParent(ProjectName.of(projectId).toString()); + if (inspectTemplateName != null) { + requestBuilder.setInspectTemplateName(inspectTemplateName); + } + if (inspectConfig != null) { + requestBuilder.setInspectConfig(inspectConfig); + } + if (inspectConfig == null && inspectTemplateName == null) { + throw new IllegalArgumentException( + "Either inspectConfig or inspectTemplateName need to be set!"); + } + if (reidentifyConfig != null) { + requestBuilder.setReidentifyConfig(reidentifyConfig); + } + if (reidentifyTemplateName != null) { + requestBuilder.setReidentifyTemplateName(reidentifyTemplateName); + } + if (reidentifyConfig == null && reidentifyTemplateName == null) { + throw new IllegalArgumentException( + "Either reidentifyConfig or reidentifyTemplateName need to be set!"); + } + } + + public ReidentifyText( + String projectId, + String inspectTemplateName, + String reidentifyTemplateName, + InspectConfig inspectConfig, + DeidentifyConfig reidentifyConfig) { + this.projectId = projectId; + this.inspectTemplateName = inspectTemplateName; + this.reidentifyTemplateName = reidentifyTemplateName; + this.inspectConfig = inspectConfig; + this.reidentifyConfig = reidentifyConfig; + } + + @ProcessElement + public void processElement(ProcessContext c) throws IOException { + try (DlpServiceClient dlpServiceClient = DlpServiceClient.create()) { + if (!c.element().getValue().isEmpty()) { + ContentItem contentItem = + ContentItem.newBuilder().setValue(c.element().getValue()).build(); + this.requestBuilder.setItem(contentItem); + if (this.requestBuilder.build().getSerializedSize() > DLP_PAYLOAD_LIMIT) { + String errorMessage = + String.format( + "Payload Size %s Exceeded Batch Size %s", + this.requestBuilder.build().getSerializedSize(), DLP_PAYLOAD_LIMIT); + LOG.error(errorMessage); + } else { + ReidentifyContentResponse response = + dlpServiceClient.reidentifyContent(this.requestBuilder.build()); + response.getItem().getValue(); + c.output(KV.of(c.element().getKey(), response.getItem().getValue())); + } + } + } + } + } +} diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java new file mode 100644 index 0000000..5401105 --- /dev/null +++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DLPTextOperationsIT.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import static org.junit.Assert.assertEquals; + +import com.google.privacy.dlp.v2.CharacterMaskConfig; +import com.google.privacy.dlp.v2.DeidentifyConfig; +import com.google.privacy.dlp.v2.Finding; +import com.google.privacy.dlp.v2.InfoType; +import com.google.privacy.dlp.v2.InfoTypeTransformations; +import com.google.privacy.dlp.v2.InspectConfig; +import com.google.privacy.dlp.v2.Likelihood; +import com.google.privacy.dlp.v2.PrimitiveTransformation; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class DLPTextOperationsIT { + @Rule public TestPipeline testPipeline = TestPipeline.create(); + + private static final String IDENTIFYING_TEXT = "[email protected]"; + private static InfoType emailAddress = InfoType.newBuilder().setName("EMAIL_ADDRESS").build();; + private static InspectConfig inspectConfig = + InspectConfig.newBuilder() + .addInfoTypes(emailAddress) + .setMinLikelihood(Likelihood.LIKELY) + .build(); + + @Test + public void inspectsText() { + String projectId = testPipeline.getOptions().as(GcpOptions.class).getProject(); + PCollection<List<Finding>> inspectionResult = + testPipeline + .apply(Create.of(KV.of("", IDENTIFYING_TEXT))) + .apply( + DLPInspectText.newBuilder() + .setBatchSize(52400) + .setProjectId(projectId) + .setInspectConfig(inspectConfig) + .build()); + PAssert.that(inspectionResult).satisfies(new VerifyInspectionResult()); + testPipeline.run().waitUntilFinish(); + } + + @Test + public void deidentifiesText() { + emailAddress = InfoType.newBuilder().setName("EMAIL_ADDRESS").build(); + String projectId = testPipeline.getOptions().as(GcpOptions.class).getProject(); + + PCollection<KV<String, String>> deidentificationResult = + testPipeline + .apply(Create.of(KV.of("", IDENTIFYING_TEXT))) + .apply( + DLPDeidentifyText.newBuilder() + .setBatchSize(52400) + .setProjectId(projectId) + .setInspectConfig(inspectConfig) + .setDeidentifyConfig(getDeidentifyConfig()) + .build()); + PAssert.that(deidentificationResult).containsInAnyOrder(KV.of("", "####################")); + testPipeline.run().waitUntilFinish(); + } + + private DeidentifyConfig getDeidentifyConfig() { + CharacterMaskConfig characterMaskConfig = + CharacterMaskConfig.newBuilder().setMaskingCharacter("#").build(); + PrimitiveTransformation primitiveTransformation = + PrimitiveTransformation.newBuilder().setCharacterMaskConfig(characterMaskConfig).build(); + InfoTypeTransformations.InfoTypeTransformation infoTypeTransformation = + InfoTypeTransformations.InfoTypeTransformation.newBuilder() + .addInfoTypes(emailAddress) + .setPrimitiveTransformation(primitiveTransformation) + .build(); + return DeidentifyConfig.newBuilder() + .setInfoTypeTransformations( + InfoTypeTransformations.newBuilder().addTransformations(infoTypeTransformation).build()) + .build(); + } + + private static class VerifyInspectionResult + implements SerializableFunction<Iterable<List<Finding>>, Void> { + @Override + public Void apply(Iterable<List<Finding>> input) { + List<Boolean> matches = new ArrayList<>(); + input.forEach( + resultList -> + matches.add( + resultList.stream() + .anyMatch(finding -> finding.getInfoType().equals(emailAddress)))); + assertEquals(Boolean.TRUE, matches.contains(Boolean.TRUE)); + return null; + } + } +}
