Abacn commented on code in PR #36346:
URL: https://github.com/apache/beam/pull/36346#discussion_r2409014302


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/options/PipelineOptions.java:
##########
@@ -413,6 +414,40 @@ public Long create(PipelineOptions options) {
 
   void setUserAgent(String userAgent);
 
+  /**
+   * A string defining whether GroupByKey transforms should be replaced by 
GroupByEncryptedKey
+   *
+   * <p>Beam will infer the secret type and value based on the secret itself. 
This guarantees that
+   * any data at rest during the performing a GBK, so this can be used to 
guarantee that data is not
+   * unencrypted. Runners with this behavior include the Dataflow, Flink, and 
Spark runners. The
+   * option should be structured like:
+   *
+   * <pre><code>
+   * --gbek=type:<secret_type>;<secret_param>:<value>
+   * </code></pre>
+   *
+   * for example:
+   *
+   * <pre><code>
+   * --gbek=type:GcpSecret;version_name:my_secret/versions/latest
+   * </code></pre>
+   *
+   * All variables should use snake case to allow consistency across languages.

Review Comment:
   iirc "GBEK" will convert to "g_b_e_k" in Python pipeline. (e.g. in Java 
`--tempLocation` corresponds to python `--temp_location`)



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java:
##########
@@ -117,6 +140,20 @@ public PCollection<KV<K, Iterable<V>>> 
expand(PCollection<KV<K, V>> input) {
           "the keyCoder of a DataflowGroupByKey must be deterministic", e);
     }
 
+    PipelineOptions options = input.getPipeline().getOptions();
+    String gbekOveride = options.getGBEK();
+    if (!this.insideGBEK && gbekOveride != null && 
!gbekOveride.trim().isEmpty()) {

Review Comment:
   does 
`org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings.isNullOrEmpty(...)`
 fit here?



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/Secret.java:
##########
@@ -33,4 +38,48 @@ public interface Secret extends Serializable {
    * @return The secret as a byte array.
    */
   byte[] getSecretBytes();
+
+  static Secret parseSecretOption(String secretOption) {
+    Map<String, String> paramMap = new HashMap<>();
+    for (String param : secretOption.split(";", -1)) {
+      String[] parts = param.split(":", 2);
+      if (parts.length == 2) {
+        paramMap.put(parts[0], parts[1]);
+      }
+    }
+
+    if (!paramMap.containsKey("type")) {
+      throw new RuntimeException("Secret string must contain a valid type 
parameter");
+    }
+
+    String secretType = paramMap.get("type");
+    paramMap.remove("type");
+
+    if (secretType == null) {

Review Comment:
   Do you mean `PreConditions.checkNotNull(secretType, "Secret string must 
contain a valid value for type parameter");`



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java:
##########
@@ -171,10 +208,22 @@ public String getUrn() {
       return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
     }
 
+    @Override
+    public String getUrn(DataflowGroupByKey<?, ?> transform) {
+      if (transform.surroundsGBEK()) {
+        return "beam:transform:group_by_key_wrapper:v1";
+      }
+      return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
+    }
+
     @Override
     @SuppressWarnings("nullness")
     public RunnerApi.FunctionSpec translate(
         AppliedPTransform<?, ?, DataflowGroupByKey<?, ?>> transform, 
SdkComponents components) {
+      if (transform.getTransform().surroundsGBEK()) {

Review Comment:
   shall we change the return type of translate to `RunnerApi.@Nullable 
FunctionSpec` ? The base interface returns a `@Nullable` already



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java:
##########
@@ -171,10 +208,22 @@ public String getUrn() {
       return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
     }
 
+    @Override
+    public String getUrn(DataflowGroupByKey<?, ?> transform) {
+      if (transform.surroundsGBEK()) {
+        return "beam:transform:group_by_key_wrapper:v1";

Review Comment:
   Shall we make this a constant in PTransformTranslation as well?



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/GroupByKeyTranslation.java:
##########
@@ -43,9 +43,21 @@ public String getUrn() {
       return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
     }
 
+    @Override
+    public String getUrn(GroupByKey<?, ?> transform) {
+      if (transform.surroundsGBEK()) {
+        return "beam:transform:group_by_key_wrapper:v1";

Review Comment:
   same, see above (define constant, nullable)



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/internal/DataflowGroupByKey.java:
##########
@@ -117,6 +140,20 @@ public PCollection<KV<K, Iterable<V>>> 
expand(PCollection<KV<K, V>> input) {
           "the keyCoder of a DataflowGroupByKey must be deterministic", e);
     }
 
+    PipelineOptions options = input.getPipeline().getOptions();
+    String gbekOveride = options.getGBEK();
+    if (!this.insideGBEK && gbekOveride != null && 
!gbekOveride.trim().isEmpty()) {
+      this.surroundsGBEK = true;
+      Secret hmacSecret = Secret.parseSecretOption(gbekOveride);

Review Comment:
   just to confirm, is the access to actual secret required at pipeline 
expansion time? (read the code it seems the answer is no, which would be great) 
We have seen use cases that pipeline submission machine doesn't have access to 
certain resources (database for IO, etc) and causes some hurdle for users



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java:
##########
@@ -244,6 +264,20 @@ public PCollection<KV<K, Iterable<V>>> 
expand(PCollection<KV<K, V>> input) {
       throw new IllegalStateException("the keyCoder of a GroupByKey must be 
deterministic", e);
     }
 
+    PipelineOptions options = input.getPipeline().getOptions();
+    String gbekOveride = options.getGBEK();
+    if (!this.insideGBEK && gbekOveride != null && 
!gbekOveride.trim().isEmpty()) {

Review Comment:
   same, consider `Strings.isNullOrEmpty`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to