[
https://issues.apache.org/jira/browse/BEAM-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16062003#comment-16062003
]
Guillermo Rodríguez Cano commented on BEAM-2490:
------------------------------------------------
Hello again,
to corroborate my hypothesis of an issue with the compression I included a
Metric's counter in the code I have in my pipeline to process each line of each
input file after it has been read, and then at the end of the pipeline I output
the metrics. Tests done with the Direct runner though.
I clearly obtain wrong results when using the glob operator in question on gzip
files and not when using it with the same uncompressed files (as compared to
running the command-line wc program on the same uncompressed files, which is
what I am expecting since it is just counting the number of lines of the input).
As an example of the 'loss of information' that happens, see below:
{code:none}
» wc input/shortlist/*
1878646 12116709 2025493666 input/shortlist/events_20170620_22.json
1535456 10148786 1678214474 input/shortlist/events_20170620_23.json
1363218 9181848 1507687080 input/shortlist/events_20170621_00.json
1281514 8727816 1428512156 input/shortlist/events_20170621_01.json
1243974 8486567 1387581129 input/shortlist/events_20170621_02.json
1926283 12525017 1953672115 input/shortlist/events_20170621_12.json
1915997 12444970 1943291036 input/shortlist/events_20170621_13.json
11145088 73631713 11924451656 total
» python sessions.py --input './input/shortlist-gzip/*' --output ./output/
--runner DirectRunner
No handlers could be found for logger "oauth2client.contrib.multistore_file"
INFO:root:Running pipeline with DirectRunner.
INFO:root:Running pipeline with DirectRunner.
INFO:root:Number of valid events processed: 177902
INFO:root:Number of invalid events seen: 23353
» python sessions.py --input './input/shortlist/*' --output ./output/ --runner
DirectRunner
No handlers could be found for logger "oauth2client.contrib.multistore_file"
INFO:root:Running pipeline with DirectRunner.
INFO:root:Running pipeline with DirectRunner.
INFO:root:Number of valid events processed: 9737763
INFO:root:Number of invalid events seen: 1407325
{code}
Note how the sum of the two counters in the latter case correspond with the
output from the wc command.
Looking forward to assisting further to solve/clarify this issue.
> ReadFromText function is not taking all data with glob operator (*)
> --------------------------------------------------------------------
>
> Key: BEAM-2490
> URL: https://issues.apache.org/jira/browse/BEAM-2490
> Project: Beam
> Issue Type: Bug
> Components: sdk-py
> Affects Versions: 2.0.0
> Environment: Usage with Google Cloud Platform: Dataflow runner
> Reporter: Olivier NGUYEN QUOC
> Assignee: Chamikara Jayalath
> Fix For: Not applicable
>
>
> I run a very simple pipeline:
> * Read my files from Google Cloud Storage
> * Split with '\n' char
> * Write in on a Google Cloud Storage
> I have 8 files that match with the pattern:
> * my_files_2016090116_20160902_060051_xxxxxxxxxx.csv.gz (229.25 MB)
> * my_files_2016090117_20160902_060051_xxxxxxxxxx.csv.gz (184.1 MB)
> * my_files_2016090118_20160902_060051_xxxxxxxxxx.csv.gz (171.73 MB)
> * my_files_2016090119_20160902_060051_xxxxxxxxxx.csv.gz (151.34 MB)
> * my_files_2016090120_20160902_060051_xxxxxxxxxx.csv.gz (129.69 MB)
> * my_files_2016090121_20160902_060051_xxxxxxxxxx.csv.gz (151.7 MB)
> * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (346.46 MB)
> * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (222.57 MB)
> This code should take them all:
> {code:python}
> beam.io.ReadFromText(
> "gs://XXXX_folder1/my_files_20160901*.csv.gz",
> skip_header_lines=1,
> compression_type=beam.io.filesystem.CompressionTypes.GZIP
> )
> {code}
> It runs well but there is only a 288.62 MB file in output of this pipeline
> (instead of a 1.5 GB file).
> The whole pipeline code:
> {code:python}
> data = (p | 'ReadMyFiles' >> beam.io.ReadFromText(
> "gs://XXXX_folder1/my_files_20160901*.csv.gz",
> skip_header_lines=1,
> compression_type=beam.io.filesystem.CompressionTypes.GZIP
> )
> | 'SplitLines' >> beam.FlatMap(lambda x: x.split('\n'))
> )
> output = (
> data| "Write" >> beam.io.WriteToText('gs://XXX_folder2/test.csv',
> num_shards=1)
> )
> {code}
> Dataflow indicates me that the estimated size of the output after the
> ReadFromText step is 602.29 MB only, which not correspond to any unique input
> file size nor the overall file size matching with the pattern.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)