liferoad commented on issue #21432:
URL: https://github.com/apache/beam/issues/21432#issuecomment-2365290834

   Tried to reproduce this issue with Dataflow. Here is my E2E code:
   ```
   # standard libraries
   import json
   import logging
   
   # third party libraries
   import apache_beam as beam
   import google.auth
   from apache_beam import Map
   from apache_beam.io import ReadFromPubSub
   from apache_beam.options.pipeline_options import GoogleCloudOptions, 
PipelineOptions
   from apache_beam.runners import DataflowRunner
   
   
   # data
   # {'ride_id': 'e7b87b46-2174-4029-bb32-6498d01367c6', 'point_idx': 1298,
   # 'latitude': 40.74747, 'longitude': -73.84869, 'timestamp': 
'2024-09-21T14:47:29.20965-04:00',
   # 'meter_reading': 30.108835, 'meter_increment': 0.023196328, 'ride_status': 
'enroute', 'passenger_count': 1}
   def run():
       class ReadPubSubOptions(PipelineOptions):
           @classmethod
           def _add_argparse_args(cls, parser):
               parser.add_argument(
                   "--topic",
                   # Run on Dataflow or authenticate to not get
                   # 403 PermissionDenied
                   
default="projects/pubsub-public-data/topics/taxirides-realtime",
                   help="PubSub topic to read",
               )
   
       options = ReadPubSubOptions(streaming=True)
       _, options.view_as(GoogleCloudOptions).project = google.auth.default()
       # Sets the Google Cloud Region in which Cloud Dataflow runs.
       options.view_as(GoogleCloudOptions).region = "us-central1"
   
       dataflow_gcs_location = "gs://tmp_xqhu/dataflow"
       # Dataflow Staging Location. This location is used to stage the Dataflow 
Pipeline and SDK binary.
       options.view_as(GoogleCloudOptions).staging_location = (
           "%s/staging" % dataflow_gcs_location
       )
   
       # Dataflow Temp Location. This location is used to store temporary files 
or intermediate results before finally outputting to the sink.
       options.view_as(GoogleCloudOptions).temp_location = (
           "%s/temp" % dataflow_gcs_location
       )
   
       with beam.Pipeline(options=options, runner=DataflowRunner()) as p:
           # When reading from a topic, a new subscription is created.
           (
               p
               | "Read PubSub topic" >> ReadFromPubSub(topic=options.topic)
               | "Message" >> Map(lambda msg: json.loads(msg.decode("utf-8")))
               | "Convert" >> Map(lambda msg: (msg["timestamp"], 
[msg["passenger_count"]]))
               | "Combine" >> beam.CombineValues(sum)
               | Map(logging.info)
           )
   ```
   
   I do not see any error:
   
   <img width="1630" alt="image" 
src="https://github.com/user-attachments/assets/eaa04e82-0cf4-4f7a-9bfd-f7cc89485a2e";>
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to