lgeiger opened a new issue, #30166:
URL: https://github.com/apache/beam/issues/30166
### What happened?
I am upgrading Beam from version 2.52 to 2.53 but it looks like I am running
into an issue introduced in #25676.
When trying to call `delete_batch` inside `beam.Map` something like this:
```python
def teardown(self):
def cleanup(count):
client = GcsIO()
blobs = client.list_prefix(self.resultio.path)
client.delete_batch(blobs)
return (
"Count all elements" >> beam.combiners.Count.Globally()
| "Delete blobs" >> beam.Map(cleanup)
)
```
I am seeing the following error with Beam 2.53.0 while Beam 2.52.0 works as
expected:
```
Traceback (most recent call last):
File
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 300, in _execute
response = task()
^^^^^^
File
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 375, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 639, in do_instruction
return getattr(self, request_type)(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker.py",
line 677, in process_bundle
bundle_processor.process_bundle(instruction_id))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1113, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 237, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 570, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 572, in
apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 263, in
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 266, in
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 953, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 954, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1437, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1526, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1435, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 636, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1621, in
apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1734, in
apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 266, in
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 953, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 954, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1437, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1526, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1435, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 636, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1621, in
apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1734, in
apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 266, in
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 953, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 954, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1437, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1526, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1435, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 851, in
apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 995, in
apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "apache_beam/runners/common.py", line 1621, in
apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1734, in
apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 266, in
apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 953, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 954, in
apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1437, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1547, in
apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1435, in
apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 637, in
apache_beam.runners.common.SimpleInvoker.invoke_process
File
"/Users/lgeiger/data-infrastructure/.venv/lib/python3.11/site-packages/apache_beam/transforms/core.py",
line 1963, in <lambda>
wrapper = lambda x: [fn(x)]
^^^^^
File "/Users/lgeiger/data-infrastructure/test.py", line 60, in cleanup
client.delete_batch(blobs)
File
"/usr/local/lib/python3.11/site-packages/apache_beam/io/gcp/gcsio.py", line
217, in delete_batch
current_paths = paths[s:s + MAX_BATCH_OPERATION_SIZE]
~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: unhashable type: 'slice'
```
I am running the pipeline on Dataflow using Python 3.11.
Beam 2.52.0 used to chunk the paths using `itertools.islice` which avoids
having to explicitly use slicing:
https://github.com/apache/beam/blob/7c8a99779bac68dfadbd7077fe6cbb76d26d04b1/sdks/python/apache_beam/io/gcp/gcsio.py#L304
@BjornPrime I guess something similar here would fix the above issue, or am
I missing something?
Potentially the current code might already work in Python 3.12 since slices
will be hashable, but as far as I know Beam doesn't yet provide Python 3.12
containers.
### Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
### Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [X] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]