lgeiger opened a new issue, #30166:
URL: https://github.com/apache/beam/issues/30166

   ### What happened?
   
   I am upgrading Beam from version 2.52 to 2.53 but it looks like I am running 
into an issue introduced in #25676.
   
   When trying to call `delete_batch` inside `beam.Map` something like this:
   ```python
       def teardown(self):
           def cleanup(count):
               client = GcsIO()
               blobs = client.list_prefix(self.resultio.path)
               client.delete_batch(blobs)
   
           return (
               "Count all elements" >> beam.combiners.Count.Globally()
               | "Delete blobs" >> beam.Map(cleanup)
           )
   ```
   
   I am seeing the following error with Beam 2.53.0 while Beam 2.52.0 works as 
expected:
   ```
   Traceback (most recent call last):
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 300, in _execute
       response = task()
                  ^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 375, in <lambda>
       lambda: self.create_worker().do_instruction(request), request)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 639, in do_instruction
       return getattr(self, request_type)(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
 line 677, in process_bundle
       bundle_processor.process_bundle(instruction_id))
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 1113, in process_bundle
       input_op_by_transform_id[element.transform_id].process_encoded(
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
 line 237, in process_encoded
       self.output(decoded_value)
     File "apache_beam/runners/worker/operations.py", line 570, in 
apache_beam.runners.worker.operations.Operation.output
     File "apache_beam/runners/worker/operations.py", line 572, in 
apache_beam.runners.worker.operations.Operation.output
     File "apache_beam/runners/worker/operations.py", line 263, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 266, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 953, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 954, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1437, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1526, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1435, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 636, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
     File "apache_beam/runners/common.py", line 1621, in 
apache_beam.runners.common._OutputHandler.handle_process_outputs
     File "apache_beam/runners/common.py", line 1734, in 
apache_beam.runners.common._OutputHandler._write_value_to_tag
     File "apache_beam/runners/worker/operations.py", line 266, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 953, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 954, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1437, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1526, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1435, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 636, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
     File "apache_beam/runners/common.py", line 1621, in 
apache_beam.runners.common._OutputHandler.handle_process_outputs
     File "apache_beam/runners/common.py", line 1734, in 
apache_beam.runners.common._OutputHandler._write_value_to_tag
     File "apache_beam/runners/worker/operations.py", line 266, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 953, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 954, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1437, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1526, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1435, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 851, in 
apache_beam.runners.common.PerWindowInvoker.invoke_process
     File "apache_beam/runners/common.py", line 995, in 
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
     File "apache_beam/runners/common.py", line 1621, in 
apache_beam.runners.common._OutputHandler.handle_process_outputs
     File "apache_beam/runners/common.py", line 1734, in 
apache_beam.runners.common._OutputHandler._write_value_to_tag
     File "apache_beam/runners/worker/operations.py", line 266, in 
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
     File "apache_beam/runners/worker/operations.py", line 953, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/worker/operations.py", line 954, in 
apache_beam.runners.worker.operations.DoOperation.process
     File "apache_beam/runners/common.py", line 1437, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 1547, in 
apache_beam.runners.common.DoFnRunner._reraise_augmented
     File "apache_beam/runners/common.py", line 1435, in 
apache_beam.runners.common.DoFnRunner.process
     File "apache_beam/runners/common.py", line 637, in 
apache_beam.runners.common.SimpleInvoker.invoke_process
     File 
"/Users/lgeiger/data-infrastructure/.venv/lib/python3.11/site-packages/apache_beam/transforms/core.py",
 line 1963, in <lambda>
       wrapper = lambda x: [fn(x)]
                            ^^^^^
     File "/Users/lgeiger/data-infrastructure/test.py", line 60, in cleanup
       client.delete_batch(blobs)
     File 
"/usr/local/lib/python3.11/site-packages/apache_beam/io/gcp/gcsio.py", line 
217, in delete_batch
       current_paths = paths[s:s + MAX_BATCH_OPERATION_SIZE]
                       ~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   TypeError: unhashable type: 'slice'
   ```
   
   I am running the pipeline on Dataflow using Python 3.11.
   
   Beam 2.52.0 used to chunk the paths using `itertools.islice` which avoids 
having to explicitly use slicing: 
https://github.com/apache/beam/blob/7c8a99779bac68dfadbd7077fe6cbb76d26d04b1/sdks/python/apache_beam/io/gcp/gcsio.py#L304
   
   @BjornPrime I guess something similar here would fix the above issue, or am 
I missing something?
   
   Potentially the current code might already work in Python 3.12 since slices 
will be hashable, but as far as I know Beam doesn't yet provide Python 3.12 
containers.
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [X] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to