[
https://issues.apache.org/jira/browse/BEAM-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16060177#comment-16060177
]
Guillermo RodrÃguez Cano commented on BEAM-2490:
------------------------------------------------
I am having a similar issue if not the same, although I am using the
DirectRunner instead (but I believe that in a previous trial I was also using
DataflowRunner and even gzip files).
In the following pipeline (I am trying to work with sessions):
{{ with beam.Pipeline(options=pipeline_options) as p:
raw_events = p | 'Read input' >> ReadFromText(known_args.input)
sessions = (raw_events
| 'Extract event and timestamp' >>
beam.ParDo(ExtractEventAndTimestampDoFn())
| 'Compute sessions window' >>
WindowInto(Sessions(gap_size=known_args.session_idle_gap))
| 'Group by key' >> beam.GroupByKey()
)
output = sessions | 'Format output' >>
beam.ParDo(FormatSessionOutputDoFn())
output | 'Write results' >> WriteToText(known_args.output)}}
where the input is a list of uncompressed JSON files in the same directory, I
get the same output whether I use the glob operator (*) for all the files or I
set the first file in that list.
When running the pipeline like this (for files of about 200M size each):
{{python sessions_manager.py --input ./input/test/* --output ./output/ --runner
DirectRunner}}
The following shows up as process:
{{python sessions_manager.py --input ./input/test/xaa.json
./input/test/xab.json ./input/test/xac.json ./input/test/xad.json
./input/test/xae.json ./input/test/xaf.json ./input/test/xag --output ./output/
--runner DirectRunner}}
And if I run just this:
{{python sessions_manager.py --input ./input/test/xaa.json --output ./output/
--runner DirectRunner}}
The output is precisely the same as the one with the glob operator (and quite
different if I merge the files into one and run again the pipeline with the
merged files into one).
> ReadFromText function is not taking all data with glob operator (*)
> --------------------------------------------------------------------
>
> Key: BEAM-2490
> URL: https://issues.apache.org/jira/browse/BEAM-2490
> Project: Beam
> Issue Type: Bug
> Components: sdk-py
> Affects Versions: 2.0.0
> Environment: Usage with Google Cloud Platform: Dataflow runner
> Reporter: Olivier NGUYEN QUOC
> Assignee: Chamikara Jayalath
> Fix For: 2.1.0
>
>
> I run a very simple pipeline:
> * Read my files from Google Cloud Storage
> * Split with '\n' char
> * Write in on a Google Cloud Storage
> I have 8 files that match with the pattern:
> * my_files_2016090116_20160902_060051_xxxxxxxxxx.csv.gz (229.25 MB)
> * my_files_2016090117_20160902_060051_xxxxxxxxxx.csv.gz (184.1 MB)
> * my_files_2016090118_20160902_060051_xxxxxxxxxx.csv.gz (171.73 MB)
> * my_files_2016090119_20160902_060051_xxxxxxxxxx.csv.gz (151.34 MB)
> * my_files_2016090120_20160902_060051_xxxxxxxxxx.csv.gz (129.69 MB)
> * my_files_2016090121_20160902_060051_xxxxxxxxxx.csv.gz (151.7 MB)
> * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (346.46 MB)
> * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (222.57 MB)
> This code should take them all:
> {code:python}
> beam.io.ReadFromText(
> "gs://XXXX_folder1/my_files_20160901*.csv.gz",
> skip_header_lines=1,
> compression_type=beam.io.filesystem.CompressionTypes.GZIP
> )
> {code}
> It runs well but there is only a 288.62 MB file in output of this pipeline
> (instead of a 1.5 GB file).
> The whole pipeline code:
> {code:python}
> data = (p | 'ReadMyFiles' >> beam.io.ReadFromText(
> "gs://XXXX_folder1/my_files_20160901*.csv.gz",
> skip_header_lines=1,
> compression_type=beam.io.filesystem.CompressionTypes.GZIP
> )
> | 'SplitLines' >> beam.FlatMap(lambda x: x.split('\n'))
> )
> output = (
> data| "Write" >> beam.io.WriteToText('gs://XXX_folder2/test.csv',
> num_shards=1)
> )
> {code}
> Dataflow indicates me that the estimated size of the output after the
> ReadFromText step is 602.29 MB only, which not correspond to any unique input
> file size nor the overall file size matching with the pattern.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)