nyoungstudios commented on code in PR #26689:
URL: https://github.com/apache/beam/pull/26689#discussion_r1194482614


##########
sdks/python/apache_beam/examples/avro_nyc_trips.py:
##########
@@ -0,0 +1,282 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""An example that reads New York City for hire vehicle trips.
+
+An example that reads New York City for hire vehicle trips data from Google
+Cloud Storage and calculates various statistics including the passenger's
+price per trip, price per mile, and price per minute.
+
+Data originally downloaded as parquet files from the New York City
+government
+`website <https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page>`_
+before being converted to avro format. Here is the
+`documentation
+<https://www.nyc.gov/assets/tlc/downloads/pdf/data_dictionary_trip_records_hvfhs.pdf>`_
+for the data format and schema.
+
+To execute this pipeline locally using the DirectRunner, specify an
+output prefix on GCS:::
+
+  --output gs://YOUR_OUTPUT_PREFIX
+
+To execute this pipeline using the Google Cloud Dataflow service, specify
+pipeline configuration in addition to the above:::
+
+  --job_name NAME_FOR_YOUR_JOB
+  --project YOUR_PROJECT_ID
+  --region GCE_REGION
+  --staging_location gs://YOUR_STAGING_DIRECTORY
+  --temp_location gs://YOUR_TEMPORARY_DIRECTORY
+  --runner DataflowRunner
+
+The default input is ``gs://dataflow-samples/avro_nyc_trips/*.avro`` and can
+be overridden with --input.
+"""
+
+# pytype: skip-file
+
+import argparse
+import datetime
+import logging
+
+import pytz
+
+import apache_beam as beam
+from apache_beam.io import ReadFromAvro
+from apache_beam.io import WriteToAvro
+from apache_beam.options.pipeline_options import PipelineOptions
+from apache_beam.options.pipeline_options import SetupOptions
+
+SCHEMA = {
+    'fields': [
+        {
+            'name': 'service', 'type': 'string'
+        },
+        {
+            'name': 'day', 'type': 'string'
+        },
+        {
+            'name': 'total_price', 'type': 'double'
+        },
+        {
+            'name': 'total_driver_pay', 'type': 'double'
+        },
+        {
+            'name': 'total_trip_miles', 'type': 'double'
+        },
+        {
+            'name': 'total_trip_minutes', 'type': 'double'
+        },
+        {
+            'name': 'total_number_of_trips', 'type': 'long'
+        },
+        {
+            'name': 'price_per_trip', 'type': 'double'
+        },
+        {
+            'name': 'price_per_mile', 'type': 'double'
+        },
+        {
+            'name': 'price_per_minute', 'type': 'double'
+        },
+        {
+            'name': 'driver_pay_per_trip', 'type': 'double'
+        },
+        {
+            'name': 'driver_pay_per_mile', 'type': 'double'
+        },
+        {
+            'name': 'driver_pay_per_minute', 'type': 'double'
+        },
+        {
+            'name': 'miles_per_hour', 'type': 'double'
+        },
+    ],
+    'name': 'nyc_trip_prices',
+    'type': 'record',
+}
+
+
+class CreateKeyWithServiceAndDay(beam.DoFn):
+  """Creates a key, value group.
+
+  The key is the combination of the HVFHS Code converted to the ride-sharing
+  company name and the day of the week of the ride. The value is the original
+  record dictionary object.
+  """
+  def process(self, record: dict):
+    options = {
+        'HV0002': 'Juno', 'HV0003': 'Uber', 'HV0004': 'Via', 'HV0005': 'Lyft'
+    }
+    service = options.get(record['hvfhs_license_num'])
+    if service:
+      timestamp = None
+      for k in ('request_datetime',
+                'on_scene_datetime',
+                'pickup_datetime',
+                'dropoff_datetime'):
+        timestamp = timestamp or record[k]
+        if timestamp:
+          break
+
+      day_of_the_week = datetime.datetime.fromtimestamp(
+          timestamp / 1000.0,
+          tz=pytz.timezone('America/New_York')).strftime('%a')
+
+      yield (service, day_of_the_week), record
+
+
+class CalculatePricePerAttribute(beam.CombineFn):
+  """Calculates the price for attribute.
+
+  Calculates the total driver pay, price, miles, minutes, and trips per for
+  hire vehicle service. And calculates the price per mile, minute, and trip
+  for both the driver and passenger.
+  """
+  def create_accumulator(self):
+    total_price = 0.0
+    total_driver_pay = 0.0
+    total_trip_miles = 0.0
+    total_trip_time = 0.0
+    total_number_of_trips = 0
+    accumulator = (
+        total_price,
+        total_driver_pay,
+        total_trip_miles,
+        total_trip_time,
+        total_number_of_trips,
+    )
+    return accumulator
+
+  def add_input(self, accumulator, record):
+    (
+        total_price,
+        total_driver_pay,
+        total_trip_miles,
+        total_trip_time,
+        total_number_of_trips,
+    ) = accumulator

Review Comment:
   I just tried setting the `create_accumulator` function to use 
`collections.namedtuple()` instead. However, it didn't quite work as I was 
expecting. Here is what I did:
   ```python
   def create_accumulator(self):
       accumulator = collections.namedtuple(
           'Accumulator',
           (
               'total_price',
               'total_driver_pay',
               'total_trip_miles',
               'total_trip_time',
               'total_number_of_trips',
           ),
           defaults=[0.0, 0.0, 0.0, 0.0, 0],
       )()
       return accumulator
   ```
   
   But upon accessing the accumulator in the other functions, I was of type 
`tuple` rather than an instance of the `namedtuple` object.
   
   ```python
   def add_input(self, accumulator, record):
       # prints <class 'tuple'>
       print(type(accumulator))
   
       # rather than <class '__main__.Accumulator'>
       # this statement does not work
       print(accumulator.total_price)
   ```
   
   If you wanted me to use the `collections.namedtuple` in the 
`create_accumulator` function, but still get the variables from indexing the 
tuple object in the other functions, I could do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to