damccorm commented on code in PR #36321:
URL: https://github.com/apache/beam/pull/36321#discussion_r2402827388


##########
sdks/python/apache_beam/options/pipeline_options.py:
##########
@@ -1716,6 +1716,21 @@ def _add_argparse_args(cls, parser):
         help=(
             'Docker registry url to use for tagging and pushing the prebuilt '
             'sdk worker container image.'))
+    parser.add_argument(
+        '--gbek',
+        default=None,
+        help=(
+            'When set, will replace all GroupByKey transforms in the pipeline '
+            'with EncryptedGroupByKey transforms using the secret passed in '
+            'the option. Beam will infer the secret type and value based on '
+            'secret itself. This guarantees that any data at rest during the '
+            'GBK will be encrypted. Many runners only store data at rest when '
+            'performing a GBK, so this can be used to guarantee that data is '
+            'not unencrypted. Runners with this behavior include the '
+            'Dataflow, Flink, and Spark runners. The option should be '
+            'structured like: '
+            '--encrypt=type:<secret_type>;<secret_param>:<value>, for example '
+            '--encrypt=type:GcpSecret;version_name:my_secret/versions/latest'))

Review Comment:
   Fixed



##########
sdks/python/apache_beam/transforms/util.py:
##########
@@ -341,6 +341,44 @@ def generate_secret_bytes() -> bytes:
     """Generates a new secret key."""
     return Fernet.generate_key()
 
+  @staticmethod
+  def parse_secret_option(secret) -> 'Secret':
+    """Parses a secret string and returns the appropriate secret type.
+
+    The secret string should be formatted like:
+    'type:<secret_type>;<secret_param>:<value>'
+
+    For example, 'type:GcpSecret;version_name:my_secret/versions/latest'
+    would return a GcpSecret initialized with 'my_secret/versions/latest'.
+    """
+    param_map = {}
+    for param in secret.split(';'):
+      parts = param.split(':')
+      param_map[parts[0]] = parts[1]
+
+    if 'type' not in param_map:
+      raise RuntimeError('Secret string must contain a valid type parameter')
+
+    secret_type = param_map['type'].lower()
+    del param_map['type']
+    secret_class = None
+    secret_params = None
+    if secret_type == 'gcpsecret':
+      secret_class = GcpSecret
+      secret_params = ['version_name']
+    else:
+      raise RuntimeError(
+          f'Invalid secret type {secret_type}, currently only '
+          'GcpSecret is supported')
+
+    for param_name in param_map.keys():
+      if param_name not in secret_params:
+        raise RuntimeError(
+            f'Invalid secret parameter {param_name}, '
+            f'{secret_type} only supports the following '
+            f'parameters: {secret_params}')

Review Comment:
   Moved to ValueError



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to