damianr13 opened a new issue, #28716:
URL: https://github.com/apache/beam/issues/28716

   ### What happened?
   
   I am running the following code: 
   
   ```
   import argparse
   
   import apache_beam as beam
   import structlog
   from apache_beam.options.pipeline_options import PipelineOptions
   
   logger = structlog.getLogger()
   
   
   def run(argv=None, save_main_session=True):
       parser = argparse.ArgumentParser()
   
       # Parse the arguments from the command line as defined in the options 
class
       known_args, pipeline_args = parser.parse_known_args(argv)
   
       pipeline_options = PipelineOptions(pipeline_args)
   
       with (beam.Pipeline(options=pipeline_options) as p):
           (
               p
               | "Read Vertex embedder output" >> beam.Create(["a", "b", "c"])
               | "Assign dummy keys" >> beam.Map(lambda x: (None, x))
               | "Batch up to 30 elements" >> beam.GroupIntoBatches(30)
               | "Flat map" >> beam.FlatMap(lambda x: x)
               | "Write to GCS"
               >> beam.io.WriteToText(
                   "gs://my_bucket/apache_beam_test/output",
                   file_name_suffix=".jsonl",
               )
           )
   
   
   if __name__ == "__main__":
       run()
   ```
   
   As you can see this code generates 3 dummy values. Let's say I want to take 
my elements in batches of 30, so I assign the same dummy key to all of them. 
After working with my batches, I want to go back to single elements, and then 
publish the results to Google Cloud Storage. 
   
   Running the code above fails with the following error: 
   ```
   ERROR:apache_beam.runners.direct.executor:Giving up after 4 attempts.
   WARNING:apache_beam.runners.direct.executor:A task failed with exception: 
   Traceback (most recent call last):
     File 
"/Users/robert-andreidamian/Workspace/panprices/product_image_hasher/after_embeddings.py",
 line 34, in <module>
       run()
     File 
"/Users/robert-andreidamian/Workspace/panprices/product_image_hasher/after_embeddings.py",
 line 18, in run
       with (beam.Pipeline(options=pipeline_options) as p):
     File 
"/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/pipeline.py",
 line 601, in __exit__
       self.result.wait_until_finish()
     File 
"/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/direct_runner.py",
 line 585, in wait_until_finish
       self._executor.await_completion()
     File 
"/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/executor.py",
 line 432, in await_completion
       self._executor.await_completion()
     File 
"/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/executor.py",
 line 480, in await_completion
       raise update.exception
     File 
"/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/executor.py",
 line 370, in call
       self.attempt_call(
     File 
"/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/executor.py",
 line 413, in attempt_call
       evaluator.process_element(value)
     File 
"/Users/robert-andreidamian/.local/share/virtualenvs/product_image_hasher-uOTOvNp0/lib/python3.11/site-packages/apache_beam/runners/direct/transform_evaluator.py",
 line 939, in process_element
       assert not self.global_state.get_state(
   AssertionError
   ```
   
   I am running the code in a pipenv environent, with python version 3.11. I 
attached the requirements.  
   
[requirements.txt](https://github.com/apache/beam/files/12750960/requirements.txt)
   
   Here is my Pipfile: 
   ```
   [[source]]
   url = "https://pypi.org/simple";
   verify_ssl = true
   name = "pypi"
   
   [packages]
   google-cloud-bigquery-storage = "*"
   google-cloud-bigquery = {extras = ["bqstorage", "pandas"], version = "*"}
   google-cloud-tasks = "*"
   firebase-admin = "*"
   Pillow = "*"
   imagehash = "*"
   structlog = "*"
   functions-framework = "*"
   opencv-python = "*"
   numpy = "*"
   asyncio = "*"
   pydantic = {extras = ["dotenv"], version="~=1.10"}
   psycopg2-binary = "*"
   google-cloud-aiplatform = "*"
   gcloud-aio-storage = "*"
   gcloud-rest-storage = "*"
   validators = "*"
   avif = {extras = ["pillow"], version="*"}
   pillow-avif-plugin = "*"
   wheel = "*"
   apache-beam = {extras = ["gcp"], version = "*"}
   cloud-sql-python-connector = {extras = ["pg8000"], version = "*"}
   
   [dev-packages]
   flask = {extras = ["async"], version="*"}
   
   [requires]
   python_version = "3.11"
   ``` 
   
   
   ### Issue Priority
   
   Priority: 3 (minor)
   
   ### Issue Components
   
   - [X] Component: Python SDK
   - [ ] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to