nyoungstudios commented on issue #25917:
URL: https://github.com/apache/beam/issues/25917#issuecomment-1546782591

   @liferoad @tvalentyn 
   
   I have linked a pr to add a new Avro example and remove the old one. I found 
some data on New York City trip data (like Uber/Lyft/etc. ride data). Here is a 
script I wrote to download the parquet files.
   
   ```bash
   #!/bin/bash
   
   # Downloads parquet data from nyc.gov
   # Parquet data contains the trip data for NYC ride sharing services (HVFHV)
   # The source website is: 
https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
   
   set -eu
   
   START_YEAR=2019
   START_MONTH=9
   
   END_YEAR=2023
   END_MONTH=2
   
   OUTPUT_FOLDER="${1:-}"
   
   if [[ -z "$OUTPUT_FOLDER" ]]; then
     echo "Must provide output folder as command line argument"
     exit 1
   fi
   
   mkdir -p "$OUTPUT_FOLDER"
   
   
   for year in $(seq $START_YEAR $END_YEAR);
   do
     for month in $(seq 1 12);
     do
       case $year in
         $START_YEAR)
           if [[ $START_YEAR -eq $END_YEAR ]]; then
             if [[ $month -ge $START_MONTH && $month -le $END_MONTH ]]; then
               flag=1
             else
               flag=0
             fi
           else
             if [[ $month -ge $START_MONTH ]]; then
               flag=1
             else
               flag=0
             fi
           fi
           ;;
   
         $END_YEAR)
           if [[ $month -le $END_MONTH ]]; then
             flag=1
           else
             flag=0
           fi
           ;;
   
         *)
           flag=1
           ;;
       esac
   
       if [[ "$flag" -eq 1 ]]; then
         name="fhvhv_tripdata_$year-$(printf "%02g" $month).parquet"
         url="https://d37ci6vzurychx.cloudfront.net/trip-data/$name";
         curl "$url" -o "$OUTPUT_FOLDER/$name"
       fi
     done
   done
   ```
   
   And here is the Beam code to convert the parquet files to Avro files:
   ```python
   import argparse
   import logging
   
   import apache_beam as beam
   
   from apache_beam.io import ReadFromParquet, WriteToAvro
   from apache_beam.options.pipeline_options import PipelineOptions, 
SetupOptions
   
   
   SCHEMA = {
       'fields': [
           {'name': 'hvfhs_license_num', 'type': ['null', 'string']},
           {'name': 'dispatching_base_num', 'type': ['null', 'string']},
           {'name': 'originating_base_num', 'type': ['null', 'string']},
           {'name': 'request_datetime', 'logicalType': 'timestamp-millis', 
'type': ['null', 'long']},
           {'name': 'on_scene_datetime', 'logicalType': 'timestamp-millis', 
'type': ['null', 'long']},
           {'name': 'pickup_datetime', 'logicalType': 'timestamp-millis', 
'type': ['null', 'long']},
           {'name': 'dropoff_datetime', 'logicalType': 'timestamp-millis', 
'type': ['null', 'long']},
           {'name': 'PULocationID', 'type': ['null', 'long']},
           {'name': 'DOLocationID', 'type': ['null', 'long']},
           {'name': 'trip_miles', 'type': ['null', 'double']},
           {'name': 'trip_time', 'type': ['null', 'long']},
           {'name': 'base_passenger_fare', 'type': ['null', 'double']},
           {'name': 'tolls', 'type': ['null', 'double']},
           {'name': 'bcf', 'type': ['null', 'double']},
           {'name': 'sales_tax', 'type': ['null', 'double']},
           {'name': 'congestion_surcharge', 'type': ['null', 'double']},
           {'name': 'airport_fee', 'type': ['null', 'long']},
           {'name': 'tips', 'type': ['null', 'double']},
           {'name': 'driver_pay', 'type': ['null', 'double']},
           {'name': 'shared_request_flag', 'type': ['null', 'string']},
           {'name': 'shared_match_flag', 'type': ['null', 'string']},
           {'name': 'access_a_ride_flag', 'type': ['null', 'string']},
           {'name': 'wav_request_flag', 'type': ['null', 'string']},
           {'name': 'wav_match_flag', 'type': ['null', 'long']},
       ],
       'name': 'nyc_fhv_trips',
       'type': 'record',
   }
   
   
   class ConvertTimestampsDoFn(beam.DoFn):
       def process(self, record: dict):
           for timestamp_key in ('request_datetime', 'on_scene_datetime', 
'pickup_datetime', 'dropoff_datetime'):
               if record[timestamp_key]:
                   # convert datetime object to milliseconds
                   record[timestamp_key] = 
int(record[timestamp_key].timestamp() * 1000)
   
           yield record
   
   
   def run(argv=None):
       parser = argparse.ArgumentParser()
       parser.add_argument(
           '--input',
           dest='input',
           help='Input file to process.',
           required=True,
       )
       parser.add_argument(
           '--output',
           dest='output',
           help='Output file to write results to.',
           required=True,
       )
       known_args, pipeline_args = parser.parse_known_args(argv)
   
       pipeline_options = PipelineOptions(pipeline_args)
       pipeline_options.view_as(SetupOptions).save_main_session = True
   
       with beam.Pipeline(options=pipeline_options) as p:
           (
               p
               | ReadFromParquet(known_args.input)
               | beam.ParDo(ConvertTimestampsDoFn())
               | WriteToAvro(known_args.output, SCHEMA, 
file_name_suffix='.avro')
           )
   
   
   if __name__ == '__main__':
       logging.getLogger().setLevel(logging.INFO)
       run()
   ```
   
   If you could run these scripts and upload the converted Avro files to one of 
those public facing Google Cloud Storage buckets, that would be great. And I 
can subsequently update the GCS URI in my pr.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to