liferoad commented on issue #21432:
URL: https://github.com/apache/beam/issues/21432#issuecomment-2365290834
Tried to reproduce this issue with Dataflow. Here is my E2E code:
```
# standard libraries
import json
import logging
# third party libraries
import apache_beam as beam
import google.auth
from apache_beam import Map
from apache_beam.io import ReadFromPubSub
from apache_beam.options.pipeline_options import GoogleCloudOptions,
PipelineOptions
from apache_beam.runners import DataflowRunner
# data
# {'ride_id': 'e7b87b46-2174-4029-bb32-6498d01367c6', 'point_idx': 1298,
# 'latitude': 40.74747, 'longitude': -73.84869, 'timestamp':
'2024-09-21T14:47:29.20965-04:00',
# 'meter_reading': 30.108835, 'meter_increment': 0.023196328, 'ride_status':
'enroute', 'passenger_count': 1}
def run():
class ReadPubSubOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
"--topic",
# Run on Dataflow or authenticate to not get
# 403 PermissionDenied
default="projects/pubsub-public-data/topics/taxirides-realtime",
help="PubSub topic to read",
)
options = ReadPubSubOptions(streaming=True)
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = "us-central1"
dataflow_gcs_location = "gs://tmp_xqhu/dataflow"
# Dataflow Staging Location. This location is used to stage the Dataflow
Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = (
"%s/staging" % dataflow_gcs_location
)
# Dataflow Temp Location. This location is used to store temporary files
or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = (
"%s/temp" % dataflow_gcs_location
)
with beam.Pipeline(options=options, runner=DataflowRunner()) as p:
# When reading from a topic, a new subscription is created.
(
p
| "Read PubSub topic" >> ReadFromPubSub(topic=options.topic)
| "Message" >> Map(lambda msg: json.loads(msg.decode("utf-8")))
| "Convert" >> Map(lambda msg: (msg["timestamp"],
[msg["passenger_count"]]))
| "Combine" >> beam.CombineValues(sum)
| Map(logging.info)
)
```
I do not see any error:
<img width="1630" alt="image"
src="https://github.com/user-attachments/assets/eaa04e82-0cf4-4f7a-9bfd-f7cc89485a2e">
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]