Anecdotal evidence is that most people are reading the csv files line-by-line with TextIO and then parsing into columns in a subsequent DoFn, ignoring (or asserting) that quoted newlines won't occur in their data.
On Mon, Jun 18, 2018 at 11:27 AM Austin Bennett <[email protected]> wrote: > Hi Beam Users/Dev, > > How are people handling currently handling CSVs as input to Beam (or not > really doing so)? > > I see the things listed at the start of this thread -- any others? > > I have many batch workflows involve getting multi-GB CSV files from third > party data aggregators (ex: hourly) and ingesting. Currently this goes to > S3/Redshift, and have written some spark so s3/Parquet. It'd be great to > take the csv.gz and write to BigQuery. Is Beam not up to the task yet (and > then should use something else and transform to newline json, Avro, parquet > on GS and run bq load from there)? Is there much thought on development to > support/formalize these workflows? > > Thanks for any additional info beyond what is already in this thread (and > thanks to Peter for prelim conversation), > > Austin > > > > > On Wed, Apr 25, 2018 at 1:01 PM, Peter Brumblay <[email protected]> > wrote: > >> This blog post was an excellent find. If I had infinite time I'd take a >> stab at implementing this. They basically outline an algorithm which >> *might* be appropriate for a generalized solution. It certainly beats my >> "try to parse 3 records and if you do pretend you're good" method. >> >> Peter >> >> On Tue, Apr 24, 2018 at 4:46 PM, Eugene Kirpichov <[email protected]> >> wrote: >> >>> Actually, you're right, this is not a pathological case. If we take a >>> regular 1TB-sized CSV file that actually doesn't have any quotes, and start >>> looking somewhere in the middle of it, there is no way to know whether >>> we're currently inside or outside quotes without scanning the whole file - >>> in theory there might be a quote lurking a few GB back. I suppose this can >>> be addressed with specifying limits on field sizes in bytes: e.g. with a >>> limit of 1kb, if there's no quotes in the preceding 1kb, then we're >>> definitely in an unquoted context. However, if there is a quote, it may be >>> either opening or closing the quoted context. There might be some way to >>> resolve the ambiguity, >>> https://blog.etleap.com/2016/11/27/distributed-csv-parsing/ seems to >>> discuss this in detail. >>> >>> On Tue, Apr 24, 2018 at 3:26 PM Eugene Kirpichov <[email protected]> >>> wrote: >>> >>>> Robert - you're right, but this is a pathological case. It signals that >>>> there *might* be cases where we'll need to scan the whole file, however for >>>> practical purposes it's more important whether we need to scan the whole >>>> file in *all* (or most) cases - i.e. whether no amount of backward scanning >>>> of a non-pathological file can give us confidence that we're truly located >>>> a record boundary. >>>> >>>> On Tue, Apr 24, 2018 at 3:21 PM Robert Bradshaw <[email protected]> >>>> wrote: >>>> >>>>> On Tue, Apr 24, 2018 at 3:18 PM Eugene Kirpichov <[email protected] >>>>> > >>>>> wrote: >>>>> >>>>> > I think the first question that has to be answered here is: Is it >>>>> possible *at all* to implement parallel reading of RFC 4180? >>>>> >>>>> No. Consider a multi-record CSV file with no quotes. Placing a quote >>>>> at the >>>>> start and end gives a new CSV file with exactly one element. >>>>> >>>>> > I.e., given a start byte offset, is it possible to reliably locate >>>>> the >>>>> first record boundary at or after that offset while scanning only a >>>>> small >>>>> amount of data? >>>>> > If it is possible, then that's what the SDF (or BoundedSource, etc.) >>>>> should do - split into blind byte ranges, and use this algorithm to >>>>> assign >>>>> consistent meaning to byte ranges. >>>>> >>>>> > To answer your questions 2 and 3: think of it this way. >>>>> > The SDF's ProcessElement takes an element and a restriction. >>>>> > ProcessElement must make only one promise: that it will correctly >>>>> perform >>>>> exactly the work associated with this element and restriction. >>>>> > The challenge is that the restriction can become smaller while >>>>> ProcessElement runs - in which case, ProcessElement must also do fewer >>>>> work. This can happen concurrently to ProcessElement running, so >>>>> really the >>>>> guarantee should be rephrased as "By the time ProcessElement >>>>> completes, it >>>>> should have performed exactly the work associated with the element and >>>>> tracker.currentRestriction() at the moment of completion". >>>>> >>>>> > This is all that is asked of ProcessElement. If Beam decides to ask >>>>> the >>>>> tracker to split itself into two ranges (making the current one - >>>>> "primary" >>>>> - smaller, and producing an additional one - "residual"), Beam of >>>>> course >>>>> takes the responsibility for executing the residual restriction >>>>> somewhere >>>>> else: it won't be lost. >>>>> >>>>> > E.g. if ProcessElement was invoked with [a, b), but while it was >>>>> invoked >>>>> it was split into [a, b-100) and [b-100, b), then the current >>>>> ProcessElement call must process [a, b-100), and Beam guarantees that >>>>> it >>>>> will fire up another ProcessElement call for [b-100, b) (Of course, >>>>> both of >>>>> these calls may end up being recursively split further). >>>>> >>>>> > I'm not quite sure what you mean by "recombining" - please let me >>>>> know if >>>>> the explanation above makes things clear enough or not. >>>>> >>>>> > On Tue, Apr 24, 2018 at 2:55 PM Peter Brumblay < >>>>> [email protected]> >>>>> wrote: >>>>> >>>>> >> Hi Eugene, thank you for the feedback! >>>>> >>>>> >> TextIO.read() can't handle RFC 4180 in full (at least I don't think >>>>> it >>>>> does!) - we have a lot of source data with embedded newlines. These >>>>> records >>>>> get split improperly because TextIO.read() blindly looks for newline >>>>> characters. We need something which natively understands embedded >>>>> newlines >>>>> in quoted fields ... like so: >>>>> >>>>> >> foo,bar,"this has an\r\nembedded newline",192928\r\n >>>>> >>>>> >> As for the other feedback: >>>>> >>>>> >> 1. Claiming the entire range - yes, I figured this was a major >>>>> mistake. >>>>> Thanks for the confirmation. >>>>> >> 2. The code for initial splitting of the restriction seems very >>>>> complex... >>>>> >>>>> >> Follow-up question: if I process (and claim) only a subset of a >>>>> range, >>>>> say [a, b - 100), and [b - 100, b) represents an incomplete block, will >>>>> beam SDF dynamically recombine ranges such that [b - 100, b + N) is >>>>> sent to >>>>> a worker with a (potentially) complete block? >>>>> >>>>> >> 3. Fine-tuning the evenness .... if beam SDF re-combines ranges for >>>>> split blocks then it sounds like arbitrary splits in splitFunction() >>>>> makes >>>>> more sense. >>>>> >>>>> >> I'll try to take another pass at this with your feedback in mind. >>>>> >>>>> >> Peter >>>>> >>>>> >>>>> >>>>> >> On Tue, Apr 24, 2018 at 3:08 PM, Eugene Kirpichov < >>>>> [email protected]> >>>>> wrote: >>>>> >>>>> >>> Hi Peter, >>>>> >>>>> >>> Thanks for experimenting with SDF! However, in this particular >>>>> case: >>>>> any reason why you can't just use TextIO.read() and parse each line as >>>>> CSV? >>>>> Seems like that would require considerably less code. >>>>> >>>>> >>> A few comments on this code per se: >>>>> >>> - The ProcessElement implementation immediately claims the entire >>>>> range, which means that there can be no dynamic splitting and the code >>>>> behaves equivalently to a regular DoFn >>>>> >>> - The code for initial splitting of the restriction seems very >>>>> complex >>>>> - can you just split it blindly into a bunch of byte ranges of about >>>>> equal >>>>> size? Looking at the actual data while splitting should be never >>>>> necessary >>>>> - you should be able to just look at the file size (say, 100MB) and >>>>> split >>>>> it into a bunch of splits, say, [0, 10MB), [10MB, 20MB) etc. >>>>> >>> - It seems that the splitting code tries to align splits with >>>>> record >>>>> boundaries - this is not useful: it does not matter whether the split >>>>> boundaries fall onto record boundaries or not; instead, the reading >>>>> code >>>>> should be able to read an arbitrary range of bytes in a meaningful way. >>>>> That typically means that reading [a, b) means "start at the first >>>>> record >>>>> boundary located at or after "a", end at the first record boundary >>>>> located >>>>> at or after "b"" >>>>> >>> - Fine-tuning the evenness of initial splitting is also not useful: >>>>> dynamic splitting will even things out anyway; moreover, even if you >>>>> are >>>>> able to achieve an equal amount of data read by different >>>>> restrictions, it >>>>> does not translate into equal time to process the data with the ParDo's >>>>> fused into the same bundle (and that time is unpredictable). >>>>> >>>>> >>>>> >>> On Tue, Apr 24, 2018 at 1:24 PM Peter Brumblay >>>>> >>> <[email protected]> >>>>> wrote: >>>>> >>>>> >>>> Hi All, >>>>> >>>>> >>>> I noticed that there is no support for CSV file reading (e.g. >>>>> rfc4180) >>>>> in Apache Beam - at least no native transform. There's an issue to add >>>>> this >>>>> support: https://issues.apache.org/jira/browse/BEAM-51. >>>>> >>>>> >>>> I've seen examples which use the apache commons csv parser. I >>>>> took a >>>>> shot at implementing a SplittableDoFn transform. I have the full code >>>>> and >>>>> some questions in a gist here: >>>>> https://gist.github.com/pbrumblay/9474dcc6cd238c3f1d26d869a20e863d. >>>>> >>>>> >>>> I suspect it could be improved quite a bit. If anyone has time to >>>>> provide feedback I would really appreciate it. >>>>> >>>>> >>>> Regards, >>>>> >>>>> >>>> Peter Brumblay >>>>> >>>> Fearless Technology Group, Inc. >>>>> >>>> >> >
