Anecdotal evidence is that most people are reading the csv files
line-by-line with TextIO and then parsing into columns in a subsequent
DoFn, ignoring (or asserting) that quoted newlines won't occur in their
data.

On Mon, Jun 18, 2018 at 11:27 AM Austin Bennett <[email protected]>
wrote:

> Hi Beam Users/Dev,
>
> How are people handling currently handling CSVs as input to Beam (or not
> really doing so)?
>
> I see the things listed at the start of this thread -- any others?
>
> I have many batch workflows involve getting multi-GB CSV files from third
> party data aggregators (ex: hourly) and ingesting.  Currently this goes to
> S3/Redshift, and have written some spark so s3/Parquet.  It'd be great to
> take the csv.gz and write to BigQuery.  Is Beam not up to the task yet (and
> then should use something else and transform to newline json, Avro, parquet
> on GS and run bq load from there)?  Is there much thought on development to
> support/formalize these workflows?
>
> Thanks for any additional info beyond what is already in this thread (and
> thanks to Peter for prelim conversation),
>
> Austin
>
>
>
>
> On Wed, Apr 25, 2018 at 1:01 PM, Peter Brumblay <[email protected]>
> wrote:
>
>> This blog post was an excellent find. If I had infinite time I'd take a
>> stab at implementing this. They basically outline an algorithm which
>> *might* be appropriate for a generalized solution. It certainly beats my
>> "try to parse 3 records and if you do pretend you're good" method.
>>
>> Peter
>>
>> On Tue, Apr 24, 2018 at 4:46 PM, Eugene Kirpichov <[email protected]>
>> wrote:
>>
>>> Actually, you're right, this is not a pathological case. If we take a
>>> regular 1TB-sized CSV file that actually doesn't have any quotes, and start
>>> looking somewhere in the middle of it, there is no way to know whether
>>> we're currently inside or outside quotes without scanning the whole file -
>>> in theory there might be a quote lurking a few GB back. I suppose this can
>>> be addressed with specifying limits on field sizes in bytes: e.g. with a
>>> limit of 1kb, if there's no quotes in the preceding 1kb, then we're
>>> definitely in an unquoted context. However, if there is a quote, it may be
>>> either opening or closing the quoted context. There might be some way to
>>> resolve the ambiguity,
>>> https://blog.etleap.com/2016/11/27/distributed-csv-parsing/ seems to
>>> discuss this in detail.
>>>
>>> On Tue, Apr 24, 2018 at 3:26 PM Eugene Kirpichov <[email protected]>
>>> wrote:
>>>
>>>> Robert - you're right, but this is a pathological case. It signals that
>>>> there *might* be cases where we'll need to scan the whole file, however for
>>>> practical purposes it's more important whether we need to scan the whole
>>>> file in *all* (or most) cases - i.e. whether no amount of backward scanning
>>>> of a non-pathological file can give us confidence that we're truly located
>>>> a record boundary.
>>>>
>>>> On Tue, Apr 24, 2018 at 3:21 PM Robert Bradshaw <[email protected]>
>>>> wrote:
>>>>
>>>>> On Tue, Apr 24, 2018 at 3:18 PM Eugene Kirpichov <[email protected]
>>>>> >
>>>>> wrote:
>>>>>
>>>>> > I think the first question that has to be answered here is: Is it
>>>>> possible *at all* to implement parallel reading of RFC 4180?
>>>>>
>>>>> No. Consider a multi-record CSV file with no quotes. Placing a quote
>>>>> at the
>>>>> start and end gives a new CSV file with exactly one element.
>>>>>
>>>>> > I.e., given a start byte offset, is it possible to reliably locate
>>>>> the
>>>>> first record boundary at or after that offset while scanning only a
>>>>> small
>>>>> amount of data?
>>>>> > If it is possible, then that's what the SDF (or BoundedSource, etc.)
>>>>> should do - split into blind byte ranges, and use this algorithm to
>>>>> assign
>>>>> consistent meaning to byte ranges.
>>>>>
>>>>> > To answer your questions 2 and 3: think of it this way.
>>>>> > The SDF's ProcessElement takes an element and a restriction.
>>>>> > ProcessElement must make only one promise: that it will correctly
>>>>> perform
>>>>> exactly the work associated with this element and restriction.
>>>>> > The challenge is that the restriction can become smaller while
>>>>> ProcessElement runs - in which case, ProcessElement must also do fewer
>>>>> work. This can happen concurrently to ProcessElement running, so
>>>>> really the
>>>>> guarantee should be rephrased as "By the time ProcessElement
>>>>> completes, it
>>>>> should have performed exactly the work associated with the element and
>>>>> tracker.currentRestriction() at the moment of completion".
>>>>>
>>>>> > This is all that is asked of ProcessElement. If Beam decides to ask
>>>>> the
>>>>> tracker to split itself into two ranges (making the current one -
>>>>> "primary"
>>>>> - smaller, and producing an additional one - "residual"), Beam of
>>>>> course
>>>>> takes the responsibility for executing the residual restriction
>>>>> somewhere
>>>>> else: it won't be lost.
>>>>>
>>>>> > E.g. if ProcessElement was invoked with [a, b), but while it was
>>>>> invoked
>>>>> it was split into [a, b-100) and [b-100, b), then the current
>>>>> ProcessElement call must process [a, b-100), and Beam guarantees that
>>>>> it
>>>>> will fire up another ProcessElement call for [b-100, b) (Of course,
>>>>> both of
>>>>> these calls may end up being recursively split further).
>>>>>
>>>>> > I'm not quite sure what you mean by "recombining" - please let me
>>>>> know if
>>>>> the explanation above makes things clear enough or not.
>>>>>
>>>>> > On Tue, Apr 24, 2018 at 2:55 PM Peter Brumblay <
>>>>> [email protected]>
>>>>> wrote:
>>>>>
>>>>> >> Hi Eugene, thank you for the feedback!
>>>>>
>>>>> >> TextIO.read() can't handle RFC 4180 in full (at least I don't think
>>>>> it
>>>>> does!) - we have a lot of source data with embedded newlines. These
>>>>> records
>>>>> get split improperly because TextIO.read() blindly looks for newline
>>>>> characters. We need something which natively understands embedded
>>>>> newlines
>>>>> in quoted fields ... like so:
>>>>>
>>>>> >> foo,bar,"this has an\r\nembedded newline",192928\r\n
>>>>>
>>>>> >> As for the other feedback:
>>>>>
>>>>> >> 1. Claiming the entire range - yes, I figured this was a major
>>>>> mistake.
>>>>> Thanks for the confirmation.
>>>>> >> 2. The code for initial splitting of the restriction seems very
>>>>> complex...
>>>>>
>>>>> >> Follow-up question: if I process (and claim) only a subset of a
>>>>> range,
>>>>> say [a, b - 100), and [b - 100, b) represents an incomplete block, will
>>>>> beam SDF dynamically recombine ranges such that [b - 100, b + N) is
>>>>> sent to
>>>>> a worker with a (potentially) complete block?
>>>>>
>>>>> >> 3. Fine-tuning the evenness .... if beam SDF re-combines ranges for
>>>>> split blocks then it sounds like arbitrary splits in splitFunction()
>>>>> makes
>>>>> more sense.
>>>>>
>>>>> >> I'll try to take another pass at this with your feedback in mind.
>>>>>
>>>>> >> Peter
>>>>>
>>>>>
>>>>>
>>>>> >> On Tue, Apr 24, 2018 at 3:08 PM, Eugene Kirpichov <
>>>>> [email protected]>
>>>>> wrote:
>>>>>
>>>>> >>> Hi Peter,
>>>>>
>>>>> >>> Thanks for experimenting with SDF! However, in this particular
>>>>> case:
>>>>> any reason why you can't just use TextIO.read() and parse each line as
>>>>> CSV?
>>>>> Seems like that would require considerably less code.
>>>>>
>>>>> >>> A few comments on this code per se:
>>>>> >>> - The ProcessElement implementation immediately claims the entire
>>>>> range, which means that there can be no dynamic splitting and the code
>>>>> behaves equivalently to a regular DoFn
>>>>> >>> - The code for initial splitting of the restriction seems very
>>>>> complex
>>>>> - can you just split it blindly into a bunch of byte ranges of about
>>>>> equal
>>>>> size? Looking at the actual data while splitting should be never
>>>>> necessary
>>>>> - you should be able to just look at the file size (say, 100MB) and
>>>>> split
>>>>> it into a bunch of splits, say, [0, 10MB), [10MB, 20MB) etc.
>>>>> >>> - It seems that the splitting code tries to align splits with
>>>>> record
>>>>> boundaries - this is not useful: it does not matter whether the split
>>>>> boundaries fall onto record boundaries or not; instead, the reading
>>>>> code
>>>>> should be able to read an arbitrary range of bytes in a meaningful way.
>>>>> That typically means that reading [a, b) means "start at the first
>>>>> record
>>>>> boundary located at or after "a", end at the first record boundary
>>>>> located
>>>>> at or after "b""
>>>>> >>> - Fine-tuning the evenness of initial splitting is also not useful:
>>>>> dynamic splitting will even things out anyway; moreover, even if you
>>>>> are
>>>>> able to achieve an equal amount of data read by different
>>>>> restrictions, it
>>>>> does not translate into equal time to process the data with the ParDo's
>>>>> fused into the same bundle (and that time is unpredictable).
>>>>>
>>>>>
>>>>> >>> On Tue, Apr 24, 2018 at 1:24 PM Peter Brumblay
>>>>> >>> <[email protected]>
>>>>> wrote:
>>>>>
>>>>> >>>> Hi All,
>>>>>
>>>>> >>>> I noticed that there is no support for CSV file reading (e.g.
>>>>> rfc4180)
>>>>> in Apache Beam - at least no native transform. There's an issue to add
>>>>> this
>>>>> support: https://issues.apache.org/jira/browse/BEAM-51.
>>>>>
>>>>> >>>> I've seen examples which use the apache commons csv parser. I
>>>>> took a
>>>>> shot at implementing a SplittableDoFn transform. I have the full code
>>>>> and
>>>>> some questions in a gist here:
>>>>> https://gist.github.com/pbrumblay/9474dcc6cd238c3f1d26d869a20e863d.
>>>>>
>>>>> >>>> I suspect it could be improved quite a bit. If anyone has time to
>>>>> provide feedback I would really appreciate it.
>>>>>
>>>>> >>>> Regards,
>>>>>
>>>>> >>>> Peter Brumblay
>>>>> >>>> Fearless Technology Group, Inc.
>>>>>
>>>>
>>
>

Reply via email to