[
https://issues.apache.org/jira/browse/BEAM-11472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17249822#comment-17249822
]
Brian Hulette commented on BEAM-11472:
--------------------------------------
This is the full example:
{code}
from __future__ import absolute_import
import argparse
import logging
import apache_beam as beam
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.dataframe.convert import to_pcollection
from apache_beam.dataframe.io import read_csv
from apache_beam.io import ReadFromText
from apache_beam.options.pipeline_options import PipelineOptions
def run(argv=None):
"""Main entry point; defines and runs the wordcount pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input',
dest='input',
default='gs://apache-beam-samples/nyc_taxi/2019/yellow_tripdata_2019-01.csv',
help='Input file to process.')
parser.add_argument(
'--output',
dest='output',
required=True,
help='Output file to write results to.')
known_args, pipeline_args = parser.parse_known_args(argv)
# The pipeline will be run on exiting the with block.
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as p:
df = p | read_csv(known_args.input, usecols=['passenger_count',
'DOLocationID'])
# Count the number of passengers dropped off per LocationID
agg = df.groupby('DOLocationID').sum()
agg.to_csv(known_args.output)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
{code}
> taxi_dataframe example broken
> -----------------------------
>
> Key: BEAM-11472
> URL: https://issues.apache.org/jira/browse/BEAM-11472
> Project: Beam
> Issue Type: Bug
> Components: sdk-py-core
> Reporter: Brian Hulette
> Assignee: Robert Bradshaw
> Priority: P1
> Fix For: 2.27.0
>
>
> I just tried running the taxi_dataframe example at HEAD on Dataflow and found
> that it fails with the following error:
> {code}
> INFO:apache_beam.runners.dataflow.dataflow_runner:2020-12-15T16:27:55.653Z:
> JOB_MESSAGE_ERROR: Traceback (most recent call last):
> File "apache_beam/runners/common.py", line 1214, in
> apache_beam.runners.common.DoFnRunner.process
> File "apache_beam/runners/common.py", line 721, in
> apache_beam.runners.common.PerWindowInvoker.invoke_process
> File "apache_beam/runners/common.py", line 753, in
> apache_beam.runners.common.PerWindowInvoker._should_process_window_for_sdf
> File "apache_beam/runners/common.py", line 548, in
> apache_beam.runners.common.DoFnInvoker.invoke_create_tracker
> File "/usr/local/lib/python3.8/site-packages/apache_beam/dataframe/io.py",
> line 313, in create_tracker
> tracker =
> beam.io.restriction_trackers.OffsetRestrictionTracker(restriction)
> File
> "/usr/local/lib/python3.8/site-packages/apache_beam/io/restriction_trackers.py",
> line 91, in __init__
> assert isinstance(offset_range, OffsetRange)
> AssertionError
> {code}
> https://github.com/apache/beam/pull/13443 Seems like the most likely culprit,
> and I confirmed that running with Beam before that commit (at
> bd825f574e342cfa83fb09767c7d5a19a3accc55) does not fail.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)