nyoungstudios commented on issue #25917: URL: https://github.com/apache/beam/issues/25917#issuecomment-1546782591
@liferoad @tvalentyn I have linked a pr to add a new Avro example and remove the old one. I found some data on New York City trip data (like Uber/Lyft/etc. ride data). Here is a script I wrote to download the parquet files. ```bash #!/bin/bash # Downloads parquet data from nyc.gov # Parquet data contains the trip data for NYC ride sharing services (HVFHV) # The source website is: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page set -eu START_YEAR=2019 START_MONTH=9 END_YEAR=2023 END_MONTH=2 OUTPUT_FOLDER="${1:-}" if [[ -z "$OUTPUT_FOLDER" ]]; then echo "Must provide output folder as command line argument" exit 1 fi mkdir -p "$OUTPUT_FOLDER" for year in $(seq $START_YEAR $END_YEAR); do for month in $(seq 1 12); do case $year in $START_YEAR) if [[ $START_YEAR -eq $END_YEAR ]]; then if [[ $month -ge $START_MONTH && $month -le $END_MONTH ]]; then flag=1 else flag=0 fi else if [[ $month -ge $START_MONTH ]]; then flag=1 else flag=0 fi fi ;; $END_YEAR) if [[ $month -le $END_MONTH ]]; then flag=1 else flag=0 fi ;; *) flag=1 ;; esac if [[ "$flag" -eq 1 ]]; then name="fhvhv_tripdata_$year-$(printf "%02g" $month).parquet" url="https://d37ci6vzurychx.cloudfront.net/trip-data/$name" curl "$url" -o "$OUTPUT_FOLDER/$name" fi done done ``` And here is the Beam code to convert the parquet files to Avro files: ```python import argparse import logging import apache_beam as beam from apache_beam.io import ReadFromParquet, WriteToAvro from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions SCHEMA = { 'fields': [ {'name': 'hvfhs_license_num', 'type': ['null', 'string']}, {'name': 'dispatching_base_num', 'type': ['null', 'string']}, {'name': 'originating_base_num', 'type': ['null', 'string']}, {'name': 'request_datetime', 'logicalType': 'timestamp-millis', 'type': ['null', 'long']}, {'name': 'on_scene_datetime', 'logicalType': 'timestamp-millis', 'type': ['null', 'long']}, {'name': 'pickup_datetime', 'logicalType': 'timestamp-millis', 'type': ['null', 'long']}, {'name': 'dropoff_datetime', 'logicalType': 'timestamp-millis', 'type': ['null', 'long']}, {'name': 'PULocationID', 'type': ['null', 'long']}, {'name': 'DOLocationID', 'type': ['null', 'long']}, {'name': 'trip_miles', 'type': ['null', 'double']}, {'name': 'trip_time', 'type': ['null', 'long']}, {'name': 'base_passenger_fare', 'type': ['null', 'double']}, {'name': 'tolls', 'type': ['null', 'double']}, {'name': 'bcf', 'type': ['null', 'double']}, {'name': 'sales_tax', 'type': ['null', 'double']}, {'name': 'congestion_surcharge', 'type': ['null', 'double']}, {'name': 'airport_fee', 'type': ['null', 'long']}, {'name': 'tips', 'type': ['null', 'double']}, {'name': 'driver_pay', 'type': ['null', 'double']}, {'name': 'shared_request_flag', 'type': ['null', 'string']}, {'name': 'shared_match_flag', 'type': ['null', 'string']}, {'name': 'access_a_ride_flag', 'type': ['null', 'string']}, {'name': 'wav_request_flag', 'type': ['null', 'string']}, {'name': 'wav_match_flag', 'type': ['null', 'long']}, ], 'name': 'nyc_fhv_trips', 'type': 'record', } class ConvertTimestampsDoFn(beam.DoFn): def process(self, record: dict): for timestamp_key in ('request_datetime', 'on_scene_datetime', 'pickup_datetime', 'dropoff_datetime'): if record[timestamp_key]: # convert datetime object to milliseconds record[timestamp_key] = int(record[timestamp_key].timestamp() * 1000) yield record def run(argv=None): parser = argparse.ArgumentParser() parser.add_argument( '--input', dest='input', help='Input file to process.', required=True, ) parser.add_argument( '--output', dest='output', help='Output file to write results to.', required=True, ) known_args, pipeline_args = parser.parse_known_args(argv) pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True with beam.Pipeline(options=pipeline_options) as p: ( p | ReadFromParquet(known_args.input) | beam.ParDo(ConvertTimestampsDoFn()) | WriteToAvro(known_args.output, SCHEMA, file_name_suffix='.avro') ) if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run() ``` If you could run these scripts and upload the converted Avro files to one of those public facing Google Cloud Storage buckets, that would be great. And I can subsequently update the GCS URI in my pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
