[ 
https://issues.apache.org/jira/browse/BEAM-11472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17249822#comment-17249822
 ] 

Brian Hulette commented on BEAM-11472:
--------------------------------------

This is the full example:

{code}
from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection
from apache_beam.dataframe.io import read_csv
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions


def run(argv=None):
  """Main entry point; defines and runs the wordcount pipeline."""
  parser = argparse.ArgumentParser()
  parser.add_argument(
      '--input',
      dest='input',
      
default='gs://apache-beam-samples/nyc_taxi/2019/yellow_tripdata_2019-01.csv',
      help='Input file to process.')
  parser.add_argument(
      '--output',
      dest='output',
      required=True,
      help='Output file to write results to.')
  known_args, pipeline_args = parser.parse_known_args(argv)

  # The pipeline will be run on exiting the with block.
  with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:
    df = p | read_csv(known_args.input, usecols=['passenger_count', 
'DOLocationID'])

    # Count the number of passengers dropped off per LocationID
    agg = df.groupby('DOLocationID').sum()
    agg.to_csv(known_args.output)


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()
{code}

> taxi_dataframe example broken
> -----------------------------
>
>                 Key: BEAM-11472
>                 URL: https://issues.apache.org/jira/browse/BEAM-11472
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py-core
>            Reporter: Brian Hulette
>            Assignee: Robert Bradshaw
>            Priority: P1
>             Fix For: 2.27.0
>
>
> I just tried running the taxi_dataframe example at HEAD on Dataflow and found 
> that it fails with the following error:
> {code}
> INFO:apache_beam.runners.dataflow.dataflow_runner:2020-12-15T16:27:55.653Z: 
> JOB_MESSAGE_ERROR: Traceback (most recent call last):
>   File "apache_beam/runners/common.py", line 1214, in 
> apache_beam.runners.common.DoFnRunner.process
>   File "apache_beam/runners/common.py", line 721, in 
> apache_beam.runners.common.PerWindowInvoker.invoke_process
>   File "apache_beam/runners/common.py", line 753, in 
> apache_beam.runners.common.PerWindowInvoker._should_process_window_for_sdf
>   File "apache_beam/runners/common.py", line 548, in 
> apache_beam.runners.common.DoFnInvoker.invoke_create_tracker
>   File "/usr/local/lib/python3.8/site-packages/apache_beam/dataframe/io.py", 
> line 313, in create_tracker
>     tracker = 
> beam.io.restriction_trackers.OffsetRestrictionTracker(restriction)
>   File 
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/restriction_trackers.py",
>  line 91, in __init__
>     assert isinstance(offset_range, OffsetRange)
> AssertionError 
> {code}
> https://github.com/apache/beam/pull/13443 Seems like the most likely culprit, 
> and I confirmed that running with Beam before that commit (at 
> bd825f574e342cfa83fb09767c7d5a19a3accc55) does not fail.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to