gemini-code-assist[bot] commented on code in PR #36321:
URL: https://github.com/apache/beam/pull/36321#discussion_r2402710502


##########
sdks/python/apache_beam/transforms/util.py:
##########
@@ -341,6 +341,44 @@ def generate_secret_bytes() -> bytes:
     """Generates a new secret key."""
     return Fernet.generate_key()
 
+  @staticmethod
+  def parse_secret_option(secret) -> 'Secret':
+    """Parses a secret string and returns the appropriate secret type.
+
+    The secret string should be formatted like:
+    'type:<secret_type>;<secret_param>:<value>'
+
+    For example, 'type:GcpSecret;version_name:my_secret/versions/latest'
+    would return a GcpSecret initialized with 'my_secret/versions/latest'.
+    """
+    param_map = {}
+    for param in secret.split(';'):
+      parts = param.split(':')
+      param_map[parts[0]] = parts[1]

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The current parsing logic `parts = param.split(':')` is not robust. If a 
parameter value contains a colon (e.g., a URL), it will be parsed incorrectly. 
Using `param.split(':', 1)` would be safer. Additionally, it's good practice to 
handle malformed or empty parameters.
   
   ```suggestion
       for param in secret.split(';'):
         if not param:
           continue
         parts = param.split(':', 1)
         if len(parts) != 2:
           raise ValueError(
               f"Invalid parameter format in secret option: '{param}'. "
               "Expected format is '<key>:<value>'.")
         param_map[parts[0]] = parts[1]
   ```



##########
sdks/python/apache_beam/transforms/util.py:
##########
@@ -341,6 +341,44 @@ def generate_secret_bytes() -> bytes:
     """Generates a new secret key."""
     return Fernet.generate_key()
 
+  @staticmethod
+  def parse_secret_option(secret) -> 'Secret':
+    """Parses a secret string and returns the appropriate secret type.
+
+    The secret string should be formatted like:
+    'type:<secret_type>;<secret_param>:<value>'
+
+    For example, 'type:GcpSecret;version_name:my_secret/versions/latest'
+    would return a GcpSecret initialized with 'my_secret/versions/latest'.
+    """
+    param_map = {}
+    for param in secret.split(';'):
+      parts = param.split(':')
+      param_map[parts[0]] = parts[1]
+
+    if 'type' not in param_map:
+      raise RuntimeError('Secret string must contain a valid type parameter')
+
+    secret_type = param_map['type'].lower()
+    del param_map['type']
+    secret_class = None
+    secret_params = None
+    if secret_type == 'gcpsecret':
+      secret_class = GcpSecret
+      secret_params = ['version_name']
+    else:
+      raise RuntimeError(
+          f'Invalid secret type {secret_type}, currently only '
+          'GcpSecret is supported')
+
+    for param_name in param_map.keys():
+      if param_name not in secret_params:
+        raise RuntimeError(
+            f'Invalid secret parameter {param_name}, '
+            f'{secret_type} only supports the following '
+            f'parameters: {secret_params}')

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This method raises `RuntimeError` for invalid user input. It's more 
idiomatic in Python to raise `ValueError` when an argument has the right type 
but an inappropriate value. This would make the error handling more precise.
   
   Additionally, the if/else block for selecting the secret class is not very 
extensible. To make it easier to add new secret types in the future, consider 
using a dictionary to register secret types and their parameters. This would 
make the code more maintainable.



##########
sdks/python/apache_beam/options/pipeline_options.py:
##########
@@ -1716,6 +1716,21 @@ def _add_argparse_args(cls, parser):
         help=(
             'Docker registry url to use for tagging and pushing the prebuilt '
             'sdk worker container image.'))
+    parser.add_argument(
+        '--gbek',
+        default=None,
+        help=(
+            'When set, will replace all GroupByKey transforms in the pipeline '
+            'with EncryptedGroupByKey transforms using the secret passed in '
+            'the option. Beam will infer the secret type and value based on '
+            'secret itself. This guarantees that any data at rest during the '
+            'GBK will be encrypted. Many runners only store data at rest when '
+            'performing a GBK, so this can be used to guarantee that data is '
+            'not unencrypted. Runners with this behavior include the '
+            'Dataflow, Flink, and Spark runners. The option should be '
+            'structured like: '
+            '--encrypt=type:<secret_type>;<secret_param>:<value>, for example '
+            '--encrypt=type:GcpSecret;version_name:my_secret/versions/latest'))

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The help text for the `--gbek` option provides examples using `--encrypt`. 
This is confusing for users. The examples should use `--gbek`.
   
   ```python
               '--gbek=type:<secret_type>;<secret_param>:<value>, for example 
'\n            '--gbek=type:GcpSecret;version_name:my_secret/versions/latest'))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to