Valliappa Lakshmanan created BEAM-11513:
-------------------------------------------

             Summary: Unable to load Pandas dataframe from BigQuery
                 Key: BEAM-11513
                 URL: https://issues.apache.org/jira/browse/BEAM-11513
             Project: Beam
          Issue Type: Bug
          Components: beam-model
    Affects Versions: 2.26.0
            Reporter: Valliappa Lakshmanan


Doing this:
{code:java}
query = """
SELECT  
  airline,
  departure_airport,
  arrival_airport,
  departure_delay,
  arrival_delay
FROM `bigquery-samples.airline_ontime_data.flights`
"""
with beam.Pipeline() as p:
    tbl = p | 'read table' >> beam.io.ReadFromBigQuery(query=query)
    tbl = tbl | 'assign ts' >> beam.Map(
        lambda x: beam.window.TimestampedValue(x, to_unixtime(x['date'])))
    daily = tbl | 'daily windows' >> 
beam.WindowInto(beam.window.FixedWindows(60*60*24))
    df = to_dataframe(daily)
    result = df.groupby('airline').apply(get_delay_at_top_airports)
    result.to_csv('output.csv'){code}
returns this error:
{code:java}
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-41-f47b2674428f> in <module>
     13         lambda x: beam.window.TimestampedValue(x, 
to_unixtime(x['date'])))
     14     daily = tbl | 'daily windows' >> 
beam.WindowInto(beam.window.FixedWindows(60*60*24))
---> 15     df = to_dataframe(daily)
     16     result = df.groupby('airline').apply(get_delay_at_top_airports)
     17     result.to_csv('output.csv')

/opt/conda/lib/python3.7/site-packages/apache_beam/dataframe/convert.py in 
to_dataframe(pcoll, proxy, label)
     69       # the name of these variables in the calling context.
     70       label = 'BatchElements(%s)' % _var_name(pcoll, 2)
---> 71     proxy = schemas.generate_proxy(pcoll.element_type)
     72     pcoll = pcoll | label >> schemas.BatchRowsAsDataFrame(proxy=proxy)
     73   return frame_base.DeferredFrame.wrap(

/opt/conda/lib/python3.7/site-packages/apache_beam/dataframe/schemas.py in 
generate_proxy(element_type)
    178 
    179   else:
--> 180     fields = named_fields_from_element_type(element_type)
    181     proxy = pd.DataFrame(columns=[name for name, _ in fields])
    182     for name, typehint in fields:

/opt/conda/lib/python3.7/site-packages/apache_beam/typehints/schemas.py in 
named_fields_from_element_type(element_type)
    298 def named_fields_from_element_type(
    299     element_type):  # (type) -> typing.List[typing.Tuple[unicode, type]]
--> 300   return 
named_fields_from_schema(schema_from_element_type(element_type))
    301 
    302 

/opt/conda/lib/python3.7/site-packages/apache_beam/typehints/schemas.py in 
schema_from_element_type(element_type)
    293     raise TypeError(
    294         "Attempted to determine schema for unsupported type '%s'" %
--> 295         element_type)
    296 
    297 

TypeError: Attempted to determine schema for unsupported type 'Any'{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to