tvalentyn commented on code in PR #26689: URL: https://github.com/apache/beam/pull/26689#discussion_r1194085347
########## sdks/python/apache_beam/examples/avro_nyc_trips_it_test.py: ########## @@ -0,0 +1,213 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""End-to-end test for the avro New York City Trips example.""" + +# pytype: skip-file + +import logging +import unittest +import uuid +from io import BytesIO + +import fastavro +import pytest + +from apache_beam.examples import avro_nyc_trips +from apache_beam.io.filesystems import FileSystems +from apache_beam.testing.test_pipeline import TestPipeline + + +class AvroNycTripsIT(unittest.TestCase): + SCHEMA = { + 'fields': [ + { + 'name': 'hvfhs_license_num', 'type': ['null', 'string'] + }, + { + 'name': 'request_datetime', + 'logicalType': 'timestamp-millis', + 'type': ['null', 'long'] + }, + { + 'name': 'trip_miles', 'type': ['null', 'double'] + }, + { + 'name': 'trip_time', 'type': ['null', 'long'] + }, + { + 'name': 'base_passenger_fare', 'type': ['null', 'double'] + }, + { + 'name': 'tolls', 'type': ['null', 'double'] + }, + { + 'name': 'bcf', 'type': ['null', 'double'] + }, + { + 'name': 'sales_tax', 'type': ['null', 'double'] + }, + { + 'name': 'congestion_surcharge', 'type': ['null', 'double'] + }, + { + 'name': 'airport_fee', 'type': ['null', 'long'] + }, + { + 'name': 'tips', 'type': ['null', 'double'] + }, + { + 'name': 'driver_pay', 'type': ['null', 'double'] + }, + ], + 'name': 'nyc_fhv_trips', + 'type': 'record', + } + + RECORDS = [ + { + 'hvfhs_license_num': 'HV0003', + 'request_datetime': 1549008086000, + 'trip_miles': 2.45, + 'trip_time': 579, + 'base_passenger_fare': 9.35, + 'tolls': 0.0, + 'bcf': 0.23, + 'sales_tax': 0.83, + 'congestion_surcharge': 0.0, + 'airport_fee': None, + 'tips': 0.0, + 'driver_pay': 7.48 + }, + { + 'hvfhs_license_num': 'HV0003', + 'request_datetime': 1549009568000, + 'trip_miles': 1.71, + 'trip_time': 490, + 'base_passenger_fare': 7.91, + 'tolls': 0.0, + 'bcf': 0.2, + 'sales_tax': 0.7, + 'congestion_surcharge': 0.0, + 'airport_fee': None, + 'tips': 2.0, + 'driver_pay': 7.93 + }, + { + 'hvfhs_license_num': 'HV0005', + 'request_datetime': 1549010613000, + 'trip_miles': 11.24, + 'trip_time': 1739, + 'base_passenger_fare': 29.77, + 'tolls': 0.72, + 'bcf': 0.76, + 'sales_tax': 2.71, + 'congestion_surcharge': 0.0, + 'airport_fee': None, + 'tips': 0.0, + 'driver_pay': 22.09 + }, + { + 'hvfhs_license_num': 'HV0005', + 'request_datetime': 1549010420000, + 'trip_miles': 5.71, + 'trip_time': 1559, + 'base_passenger_fare': 21.69, + 'tolls': 0.24, + 'bcf': 0.55, + 'sales_tax': 1.95, + 'congestion_surcharge': 0.0, + 'airport_fee': None, + 'tips': 0.0, + 'driver_pay': 14.87 + }, + ] + + EXPECTED = [ + { + 'service': 'Lyft', + 'day': 'Fri', + 'total_price': 58.39, + 'total_driver_pay': 36.96, + 'total_trip_miles': 16.95, + 'total_trip_minutes': 54.96666666666667, + 'total_number_of_trips': 2, + 'price_per_trip': 29.195, + 'price_per_mile': 3.4448377581120946, + 'price_per_minute': 1.0622801697998787, + 'driver_pay_per_trip': 18.48, + 'driver_pay_per_mile': 2.1805309734513276, + 'driver_pay_per_minute': 0.6724075197089144, + 'miles_per_hour': 18.502122498483928 + }, + { + 'service': 'Uber', + 'day': 'Fri', + 'total_price': 21.22, + 'total_driver_pay': 17.41, + 'total_trip_miles': 4.16, + 'total_trip_minutes': 17.816666666666666, + 'total_number_of_trips': 2, + 'price_per_trip': 10.61, + 'price_per_mile': 5.100961538461538, + 'price_per_minute': 1.1910196445275958, + 'driver_pay_per_trip': 8.705, + 'driver_pay_per_mile': 4.185096153846153, + 'driver_pay_per_minute': 0.9771749298409729, + 'miles_per_hour': 14.00935453695042 + }, + ] + + # TODO Enable when fixed this tests for Dataflow runner Review Comment: What is missing to make this work? ########## sdks/python/apache_beam/examples/avro_nyc_trips.py: ########## @@ -0,0 +1,282 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""An example that reads New York City for hire vehicle trips. + +An example that reads New York City for hire vehicle trips data from Google +Cloud Storage and calculates various statistics including the passenger's +price per trip, price per mile, and price per minute. + +Data originally downloaded as parquet files from the New York City +government +`website <https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page>`_ +before being converted to avro format. Here is the +`documentation +<https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_hvfhs.pdf>`_ +for the data format and schema. + +To execute this pipeline locally using the DirectRunner, specify an +output prefix on GCS::: + + --output gs://YOUR_OUTPUT_PREFIX + +To execute this pipeline using the Google Cloud Dataflow service, specify +pipeline configuration in addition to the above::: + + --job_name NAME_FOR_YOUR_JOB + --project YOUR_PROJECT_ID + --region GCE_REGION + --staging_location gs://YOUR_STAGING_DIRECTORY + --temp_location gs://YOUR_TEMPORARY_DIRECTORY + --runner DataflowRunner + +The default input is ``gs://dataflow-samples/avro_nyc_trips/*.avro`` and can +be overridden with --input. +""" + +# pytype: skip-file + +import argparse +import datetime +import logging + +import pytz + +import apache_beam as beam +from apache_beam.io import ReadFromAvro +from apache_beam.io import WriteToAvro +from apache_beam.options.pipeline_options import PipelineOptions +from apache_beam.options.pipeline_options import SetupOptions + +SCHEMA = { + 'fields': [ + { + 'name': 'service', 'type': 'string' + }, + { + 'name': 'day', 'type': 'string' + }, + { + 'name': 'total_price', 'type': 'double' + }, + { + 'name': 'total_driver_pay', 'type': 'double' + }, + { + 'name': 'total_trip_miles', 'type': 'double' + }, + { + 'name': 'total_trip_minutes', 'type': 'double' + }, + { + 'name': 'total_number_of_trips', 'type': 'long' + }, + { + 'name': 'price_per_trip', 'type': 'double' + }, + { + 'name': 'price_per_mile', 'type': 'double' + }, + { + 'name': 'price_per_minute', 'type': 'double' + }, + { + 'name': 'driver_pay_per_trip', 'type': 'double' + }, + { + 'name': 'driver_pay_per_mile', 'type': 'double' + }, + { + 'name': 'driver_pay_per_minute', 'type': 'double' + }, + { + 'name': 'miles_per_hour', 'type': 'double' + }, + ], + 'name': 'nyc_trip_prices', + 'type': 'record', +} + + +class CreateKeyWithServiceAndDay(beam.DoFn): + """Creates a key, value group. + + The key is the combination of the HVFHS Code converted to the ride-sharing + company name and the day of the week of the ride. The value is the original + record dictionary object. + """ + def process(self, record: dict): + options = { + 'HV0002': 'Juno', 'HV0003': 'Uber', 'HV0004': 'Via', 'HV0005': 'Lyft' + } + service = options.get(record['hvfhs_license_num']) + if service: + timestamp = None + for k in ('request_datetime', + 'on_scene_datetime', + 'pickup_datetime', + 'dropoff_datetime'): + timestamp = timestamp or record[k] + if timestamp: + break + + day_of_the_week = datetime.datetime.fromtimestamp( + timestamp / 1000.0, + tz=pytz.timezone('America/New_York')).strftime('%a') + + yield (service, day_of_the_week), record + + +class CalculatePricePerAttribute(beam.CombineFn): + """Calculates the price for attribute. + + Calculates the total driver pay, price, miles, minutes, and trips per for + hire vehicle service. And calculates the price per mile, minute, and trip + for both the driver and passenger. + """ + def create_accumulator(self): + total_price = 0.0 + total_driver_pay = 0.0 + total_trip_miles = 0.0 + total_trip_time = 0.0 + total_number_of_trips = 0 + accumulator = ( + total_price, + total_driver_pay, + total_trip_miles, + total_trip_time, + total_number_of_trips, + ) + return accumulator + + def add_input(self, accumulator, record): + ( + total_price, + total_driver_pay, + total_trip_miles, + total_trip_time, + total_number_of_trips, + ) = accumulator Review Comment: Have you considered using `collections.namedtuple()` instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
