damianr13 opened a new issue, #28716:
URL: https://github.com/apache/beam/issues/28716
### What happened?
I am running the following code:
```
import argparse
import apache_beam as beam
import structlog
from apache_beam.options.pipeline_options import PipelineOptions
logger = structlog.getLogger()
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
# Parse the arguments from the command line as defined in the options
class
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
with (beam.Pipeline(options=pipeline_options) as p):
(
p
| "Read Vertex embedder output" >> beam.Create(["a", "b", "c"])
| "Assign dummy keys" >> beam.Map(lambda x: (None, x))
| "Batch up to 30 elements" >> beam.GroupIntoBatches(30)
| "Flat map" >> beam.FlatMap(lambda x: x)
| "Write to GCS"
>> beam.io.WriteToText(
"gs://my_bucket/apache_beam_test/output",
file_name_suffix=".jsonl",
)
)
if __name__ == "__main__":
run()
```
As you can see this code generates 3 dummy values. Let's say I want to take
my elements in batches of 30, so I assign the same dummy key to all of them.
After working with my batches, I want to go back to single elements, and then
publish the results to Google Cloud Storage.
Running the code above fails with the following error:
```
ERROR:apache_beam.runners.direct.executor:Giving up after 4 attempts.
WARNING:apache_beam.runners.direct.executor:A task failed with exception:
Traceback (most recent call last):
File
"/Users/robert-andreidamian/Workspace/panprices/product_image_hasher/after_embeddings.py",
line 34, in <module>
run()
File
"/Users/robert-andreidamian/Workspace/panprices/product_image_hasher/after_embeddings.py",
line 18, in run
with (beam.Pipeline(options=pipeline_options) as p):
File
"/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/pipeline.py",
line 601, in __exit__
self.result.wait_until_finish()
File
"/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/direct_runner.py",
line 585, in wait_until_finish
self._executor.await_completion()
File
"/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/executor.py",
line 432, in await_completion
self._executor.await_completion()
File
"/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/executor.py",
line 480, in await_completion
raise update.exception
File
"/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/executor.py",
line 370, in call
self.attempt_call(
File
"/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/executor.py",
line 413, in attempt_call
evaluator.process_element(value)
File
"/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/transform_evaluator.py",
line 939, in process_element
assert not self.global_state.get_state(
AssertionError
```
I am running the code in a pipenv environent, with python version 3.11. I
attached the requirements.
[requirements.txt](https://github.com/apache/beam/files/12750960/requirements.txt)
Here is my Pipfile:
```
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
google-cloud-bigquery-storage = "*"
google-cloud-bigquery = {extras = ["bqstorage", "pandas"], version = "*"}
google-cloud-tasks = "*"
firebase-admin = "*"
Pillow = "*"
imagehash = "*"
structlog = "*"
functions-framework = "*"
opencv-python = "*"
numpy = "*"
asyncio = "*"
pydantic = {extras = ["dotenv"], version="~=1.10"}
psycopg2-binary = "*"
google-cloud-aiplatform = "*"
gcloud-aio-storage = "*"
gcloud-rest-storage = "*"
validators = "*"
avif = {extras = ["pillow"], version="*"}
pillow-avif-plugin = "*"
wheel = "*"
apache-beam = {extras = ["gcp"], version = "*"}
cloud-sql-python-connector = {extras = ["pg8000"], version = "*"}
[dev-packages]
flask = {extras = ["async"], version="*"}
[requires]
python_version = "3.11"
```
### Issue Priority
Priority: 3 (minor)
### Issue Components
- [X] Component: Python SDK
- [ ] Component: Java SDK
- [ ] Component: Go SDK
- [ ] Component: Typescript SDK
- [ ] Component: IO connector
- [ ] Component: Beam YAML
- [ ] Component: Beam examples
- [ ] Component: Beam playground
- [ ] Component: Beam katas
- [ ] Component: Website
- [ ] Component: Spark Runner
- [ ] Component: Flink Runner
- [ ] Component: Samza Runner
- [ ] Component: Twister2 Runner
- [ ] Component: Hazelcast Jet Runner
- [ ] Component: Google Cloud Dataflow Runner
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]