On Thu, Oct 12, 2023 at 4:59 PM Robert Bradshaw <rober...@google.com> wrote:
> OK, so how about this for a concrete proposal: > > sink: > type: WriteToParquet > config: > path: > "/beam/filesytem/{record.my_col}-{timestamp.year}{timestamp.month}{timestamp.day}" > This is an example, right ? So basically the path can be parameterized using the fields of the Beam schema of input elements ? > suffix: ".parquet" > > The eventual path would be <path><sharding_info><suffix>. The suffix > would be optional, and there could be a default for the specific file > format. A file format could inspect a provided suffix like ".csv.gz" to > infer compression as well. > > Note that this doesn't have any special indicators for being dynamic other > than the {}'s. Also, my_col would be written as part of the data (but we > could add an extra "elide" config parameter that takes a list of columns to > exclude if desired). > I think this is already dynamic given that the path can be parameterized using the input. For example, path: "/beam/filesytem/{record.destination_col}/-{timestamp.year}{timestamp.month}{timestamp.day}" > We could call this "prefix" rather than path. (Path is symmetric with > reading, but prefix is a bit more direct.) Anyone want to voice > their opinion here? > I'm fine with either. Thanks, Cham > > > > > On Wed, Oct 11, 2023 at 9:01 AM Chamikara Jayalath <chamik...@google.com> > wrote: > >> >> >> On Wed, Oct 11, 2023 at 6:55 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> So, top-posting because the threading got to be a lot for me and I think >>> it forked a bit too... I may even be restating something someone said, so >>> apologies for that. >>> >>> Very very good point about *required* parameters where if you don't use >>> them then you will end up with two writers writing to the same file. The >>> easiest example to work with might be if you omitted SHARD_NUM so all >>> shards end up clobbering the same file. >>> >>> I think there's a unifying perspective between prefix/suffix and the >>> need to be sure to include critical sharding variables. Essentially it is >>> my point about it being a "big data fileset". It is perhaps unrealistic but >>> ideally the user names the big data fileset and then the mandatory other >>> pieces are added outside of their control. For example if I name my big >>> data fileset "foo" then that implicitly means that "foo" consists of all >>> the files named "foo/${SHARD_NUM}-of-${SHARD_TOTAL}". And yes now that I >>> re-read I see you basically said the same thing. In some cases the required >>> fields will include $WINDOW, $KEY, and $PANE_INDEX, yes? Even though the >>> user can think of it as a textual template, if we can use a library that >>> yields an abstract syntax tree for the expression we can easily check these >>> requirements in a robust way - or we could do it in a non-robust way be >>> string-scraping ourselves. >>> >> >> Yes. I think we are talking about the same thing. Users should not have >> full control over the filename since that could lead to conflicts and data >> loss when data is being written in parallel from multiple workers. Users >> can refer to the big data fileset being written using the glob "<path>/**". >> In addition users have control over the filename <prefix> and <suffix> >> (file extension, for example) which can be useful for some downstream >> use-cases. Rest of the filename will be filled out by the SDK (window, pane >> etc.) to make sure that the files written by different workers do not >> conflict. >> >> Thanks, >> Cham >> >> >>> >>> We actually are very close to this in FileIO. I think the interpretation >>> of "prefix" is that it is the filename "foo" as above, and "suffix" is >>> really something like ".txt" that you stick on the end of everything for >>> whatever reason. >>> >>> Kenn >>> >>> On Tue, Oct 10, 2023 at 7:12 PM Robert Bradshaw via dev < >>> dev@beam.apache.org> wrote: >>> >>>> On Tue, Oct 10, 2023 at 4:05 PM Chamikara Jayalath < >>>> chamik...@google.com> wrote: >>>> >>>>> >>>>> On Tue, Oct 10, 2023 at 4:02 PM Robert Bradshaw <rober...@google.com> >>>>> wrote: >>>>> >>>>>> On Tue, Oct 10, 2023 at 3:53 PM Chamikara Jayalath < >>>>>> chamik...@google.com> wrote: >>>>>> >>>>>>> >>>>>>> On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> I suspect some simple pattern templating would solve most use >>>>>>>> cases. We probably would want to support timestamp formatting (e.g. >>>>>>>> $YYYY >>>>>>>> $M $D) as well. >>>>>>>> >>>>>>>> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw < >>>>>>>> rober...@google.com> wrote: >>>>>>>> >>>>>>>>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath < >>>>>>>>> chamik...@google.com> wrote: >>>>>>>>> >>>>>>>>>> I would say: >>>>>>>>>> >>>>>>>>>> sink: >>>>>>>>>> type: WriteToParquet >>>>>>>>>> config: >>>>>>>>>> path: /beam/filesytem/dest >>>>>>>>>> prefix: <my prefix> >>>>>>>>>> suffix: <my suffix> >>>>>>>>>> >>>>>>>>>> Underlying SDK will add the middle part of the file names to make >>>>>>>>>> sure that files generated by various bundles/windows/shards do not >>>>>>>>>> conflict. >>>>>>>>>> >>>>>>>>> >>>>>>>>> What's the relationship between path and prefix? Is path the >>>>>>>>> directory part of the full path, or does prefix precede it? >>>>>>>>> >>>>>>>> >>>>>>> prefix would be the first part of the file name so each shard >>>>>>> will be named. >>>>>>> <path>/<prefix>-<shard components added by the runner>-<suffix> >>>>>>> >>>>>>> This is similar to what we do in existing SDKS. For example, Java >>>>>>> FileIO, >>>>>>> >>>>>>> >>>>>>> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L187 >>>>>>> >>>>>> >>>>>> Yeah, although there's no distinction between path and prefix. >>>>>> >>>>> >>>>> Ah, for FIleIO, path comes from the "to" call. >>>>> >>>>> >>>>> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1125 >>>>> >>>>> >>>> >>>> Ah. I guess there's some inconsistency here, e.g. text files are >>>> written to a filenamePrefix that subsumes both: >>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html#to-java.lang.String- >>>> >>>> >>>>> >>>>>> >>>>>>>>> >>>>>>>>>> This will satisfy the vast majority of use-cases I believe. Fully >>>>>>>>>> customizing the file pattern sounds like a more advanced use case >>>>>>>>>> that can >>>>>>>>>> be left for "real" SDKs. >>>>>>>>>> >>>>>>>>> >>>>>>>>> Yea, we don't have to do everything. >>>>>>>>> >>>>>>>>> >>>>>>>>>> For dynamic destinations, I think just making the "path" >>>>>>>>>> component support a lambda that is parameterized by the input >>>>>>>>>> should be >>>>>>>>>> adequate since this allows customers to direct files written to >>>>>>>>>> different >>>>>>>>>> destination directories. >>>>>>>>>> >>>>>>>>>> sink: >>>>>>>>>> type: WriteToParquet >>>>>>>>>> config: >>>>>>>>>> path: <destination lambda> >>>>>>>>>> prefix: <my prefix> >>>>>>>>>> suffix: <my suffix> >>>>>>>>>> >>>>>>>>>> I'm not sure what would be the best way to specify a lambda here >>>>>>>>>> though. Maybe a regex or the name of a Python callable ? >>>>>>>>>> >>>>>>>>> >>>>>>>>> I'd rather not require Python for a pure Java pipeline, but some >>>>>>>>> kind of a pattern template may be sufficient here. >>>>>>>>> >>>>>>>>> >>>>>>>>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev < >>>>>>>>>> dev@beam.apache.org> wrote: >>>>>>>>>> >>>>>>>>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax <re...@google.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Just FYI - the reason why names (including prefixes) in >>>>>>>>>>>> DynamicDestinations were parameterized via a lambda instead of >>>>>>>>>>>> just having >>>>>>>>>>>> the user add it via MapElements is performance. We discussed >>>>>>>>>>>> something >>>>>>>>>>>> along the lines of what you are suggesting (essentially having the >>>>>>>>>>>> user >>>>>>>>>>>> create a KV where the key contained the dynamic information). The >>>>>>>>>>>> problem >>>>>>>>>>>> was that often the size of the generated filepath was often much >>>>>>>>>>>> larger >>>>>>>>>>>> (sometimes by 2 OOM) than the information in the record, and there >>>>>>>>>>>> was a >>>>>>>>>>>> desire to avoid record blowup. e.g. the record might contain a >>>>>>>>>>>> single >>>>>>>>>>>> integer userid, and the filepath prefix would then be >>>>>>>>>>>> /long/path/to/output/users/<id>. This was especially bad in cases >>>>>>>>>>>> where the >>>>>>>>>>>> data had to be shuffled, and the existing dynamic destinations >>>>>>>>>>>> method >>>>>>>>>>>> allowed extracting the filepath only _after_ the shuffle. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> That is a consideration I hadn't thought much of, thanks for >>>>>>>>>>> bringing this up. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> Now there may not be any good way to keep this benefit in a >>>>>>>>>>>> declarative approach such as YAML (or at least a good easy way - >>>>>>>>>>>> we could >>>>>>>>>>>> always allow the user to pass in a SQL expression to extract the >>>>>>>>>>>> filename >>>>>>>>>>>> from the record!), but we should keep in mind that this might mean >>>>>>>>>>>> that >>>>>>>>>>>> YAML-generated pipelines will be less efficient for certain use >>>>>>>>>>>> cases. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Yep, it's not as straightforward to do in a declarative way. I >>>>>>>>>>> would like to avoid mixing UDFs (with their associated languages and >>>>>>>>>>> execution environments) if possible. Though I'd like the >>>>>>>>>>> performance of a >>>>>>>>>>> "straightforward" YAML pipeline to be that which one can get writing >>>>>>>>>>> straight-line Java (and possibly better, if we can leverage the >>>>>>>>>>> structure >>>>>>>>>>> of schemas everywhere) this is not an absolute requirement for all >>>>>>>>>>> features. >>>>>>>>>>> >>>>>>>>>>> I wonder if separating out a constant prefix vs. the dynamic >>>>>>>>>>> stuff could be sufficient to mitigate the blow-up of pre-computing >>>>>>>>>>> this in >>>>>>>>>>> most cases (especially in the context of a larger pipeline). >>>>>>>>>>> Alternatively, >>>>>>>>>>> rather than just a sharding pattern, one could have a full >>>>>>>>>>> filepattern that >>>>>>>>>>> includes format parameters for dynamically computed bits as well as >>>>>>>>>>> the >>>>>>>>>>> shard number, windowing info, etc. (There are pros and cons to >>>>>>>>>>> this.) >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev < >>>>>>>>>>>> dev@beam.apache.org> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Currently the various file writing configurations take a >>>>>>>>>>>>> single parameter, path, which indicates where the (sharded) >>>>>>>>>>>>> output should >>>>>>>>>>>>> be placed. In other words, one can write something like >>>>>>>>>>>>> >>>>>>>>>>>>> pipeline: >>>>>>>>>>>>> ... >>>>>>>>>>>>> sink: >>>>>>>>>>>>> type: WriteToParquet >>>>>>>>>>>>> config: >>>>>>>>>>>>> path: /beam/filesytem/dest >>>>>>>>>>>>> >>>>>>>>>>>>> and one gets files like "/beam/filesystem/dest-X-of-N" >>>>>>>>>>>>> >>>>>>>>>>>>> Of course, in practice file writing is often much more >>>>>>>>>>>>> complicated than this (especially when it comes to Streaming). For >>>>>>>>>>>>> reference, I've included links to our existing offerings in the >>>>>>>>>>>>> various >>>>>>>>>>>>> SDKs below. I'd like to start a discussion about what else should >>>>>>>>>>>>> go in the >>>>>>>>>>>>> "config" parameter and how it should be expressed in YAML. >>>>>>>>>>>>> >>>>>>>>>>>>> The primary concern is around naming. This can generally be >>>>>>>>>>>>> split into (1) the prefix, which must be provided by the users >>>>>>>>>>>>> (2) the >>>>>>>>>>>>> sharing information, includes both shard counts (e.g. (the >>>>>>>>>>>>> -X-of-N suffix) >>>>>>>>>>>>> but also windowing information (for streaming pipelines) which we >>>>>>>>>>>>> may want >>>>>>>>>>>>> to allow the user to customize the formatting of, and (3) a >>>>>>>>>>>>> suffix like >>>>>>>>>>>>> .json or .avro that is useful for both humans and tooling and can >>>>>>>>>>>>> often be >>>>>>>>>>>>> inferred but should allow customization as well. >>>>>>>>>>>>> >>>>>>>>>>>>> An interesting case is that of dynamic destinations, where the >>>>>>>>>>>>> prefix (or other parameters) may themselves be functions of the >>>>>>>>>>>>> records >>>>>>>>>>>>> themselves. (I am excluding the case where the format itself is >>>>>>>>>>>>> variable--such cases are probably better handled by explicitly >>>>>>>>>>>>> partitioning >>>>>>>>>>>>> the data and doing multiple writes, as this introduces significant >>>>>>>>>>>>> complexities and the set of possible formats is generally finite >>>>>>>>>>>>> and known >>>>>>>>>>>>> ahead of time.) I propose that we leverage the fact that we have >>>>>>>>>>>>> structured >>>>>>>>>>>>> data to be able to pull out these dynamic parameters. For >>>>>>>>>>>>> example, if we >>>>>>>>>>>>> have an input data set with a string column my_col we could allow >>>>>>>>>>>>> something >>>>>>>>>>>>> like >>>>>>>>>>>>> >>>>>>>>>>>>> config: >>>>>>>>>>>>> path: {dynamic: my_col} >>>>>>>>>>>>> >>>>>>>>>>>>> which would pull this information out at runtime. (With the >>>>>>>>>>>>> MapToFields transform, it is very easy to compute/append >>>>>>>>>>>>> additional fields >>>>>>>>>>>>> to existing records.) Generally this field would then be stripped >>>>>>>>>>>>> from the >>>>>>>>>>>>> written data, which would only see the subset of non-dynamically >>>>>>>>>>>>> referenced >>>>>>>>>>>>> columns (though this could be configurable: we could add an >>>>>>>>>>>>> attribute like >>>>>>>>>>>>> {dynamic: my_col, Keep: true} or require the set of columns to be >>>>>>>>>>>>> actually >>>>>>>>>>>>> written (or elided) to be enumerated in the config or >>>>>>>>>>>>> allow/require the >>>>>>>>>>>>> actual data to be written to be in a designated field of the >>>>>>>>>>>>> "full" input >>>>>>>>>>>>> records as arranged by a preceding transform). It'd be great to >>>>>>>>>>>>> get >>>>>>>>>>>>> input/impressions from a wide range of people here on what would >>>>>>>>>>>>> be the >>>>>>>>>>>>> most natural. Often just writing out snippets of various >>>>>>>>>>>>> alternatives can >>>>>>>>>>>>> be quite informative (though I'm avoiding putting them here for >>>>>>>>>>>>> the moment >>>>>>>>>>>>> to avoid biasing ideas right off the bat). >>>>>>>>>>>>> >>>>>>>>>>>>> For streaming pipelines it is often essential to write data >>>>>>>>>>>>> out in a time-partitioned manner. The typical way to do this is >>>>>>>>>>>>> to add the >>>>>>>>>>>>> windowing information into the shard specification itself, and a >>>>>>>>>>>>> (set of) >>>>>>>>>>>>> file(s) is written on each window closing. Beam YAML already >>>>>>>>>>>>> supports any >>>>>>>>>>>>> transform being given a "windowing" configuration which will >>>>>>>>>>>>> cause a >>>>>>>>>>>>> WindowInto transform to be applied to its input(s) before >>>>>>>>>>>>> application which >>>>>>>>>>>>> can sit naturally on a sink. We may want to consider if >>>>>>>>>>>>> non-windowed writes >>>>>>>>>>>>> make sense as well (though how this interacts with the watermark >>>>>>>>>>>>> and >>>>>>>>>>>>> underlying implementations are a large open question, so this is >>>>>>>>>>>>> a larger >>>>>>>>>>>>> change that might make sense to defer). >>>>>>>>>>>>> >>>>>>>>>>>>> Note that I am explicitly excluding "coders" here. All data in >>>>>>>>>>>>> YAML should be schema'd, and writers should know how to write this >>>>>>>>>>>>> structured data. We may want to allow a "schema" field to allow a >>>>>>>>>>>>> user to >>>>>>>>>>>>> specify the desired schema in a manner compatible with the sink >>>>>>>>>>>>> format >>>>>>>>>>>>> itself (e.g. avro, json, whatever) that could be used both for >>>>>>>>>>>>> validation >>>>>>>>>>>>> and possibly resolving ambiguities (e.g. if the sink has an enum >>>>>>>>>>>>> format >>>>>>>>>>>>> that is not expressed in the schema of the input PCollection). >>>>>>>>>>>>> >>>>>>>>>>>>> Some other configuration options are that some formats >>>>>>>>>>>>> (especially text-based ones) allow for specification of an >>>>>>>>>>>>> external >>>>>>>>>>>>> compression type (which may be inferable from the suffix), >>>>>>>>>>>>> whether to write >>>>>>>>>>>>> a single shard if the input collection is empty or no shards at >>>>>>>>>>>>> all (an >>>>>>>>>>>>> occasional user request that's supported for some Beam sinks >>>>>>>>>>>>> now), whether >>>>>>>>>>>>> to allow fixed sharing (generally discouraged, as it disables >>>>>>>>>>>>> things like >>>>>>>>>>>>> automatic shading based on input size, let alone dynamic work >>>>>>>>>>>>> rebalancing, >>>>>>>>>>>>> though sometimes this is useful if the input is known to be small >>>>>>>>>>>>> and a >>>>>>>>>>>>> single output is desired regardless of the restriction in >>>>>>>>>>>>> parallelism), or >>>>>>>>>>>>> other sharding parameters (e.g. limiting the number of total >>>>>>>>>>>>> elements or >>>>>>>>>>>>> (approximately) total number of bytes per output shard). Some of >>>>>>>>>>>>> these >>>>>>>>>>>>> options may not be available/implemented for all >>>>>>>>>>>>> formats--consideration >>>>>>>>>>>>> should be given as to how to handle this inconsistency (runtime >>>>>>>>>>>>> errors for >>>>>>>>>>>>> unsupported combinations or simply not allowing them on any until >>>>>>>>>>>>> all are >>>>>>>>>>>>> supported). >>>>>>>>>>>>> >>>>>>>>>>>>> A final consideration: we do not anticipate exposing the full >>>>>>>>>>>>> complexity of Beam in the YAML offering. For advanced users using >>>>>>>>>>>>> a "real" >>>>>>>>>>>>> SDK will often be preferable, and we intend to provide a >>>>>>>>>>>>> migration path >>>>>>>>>>>>> from YAML to a language of your choice (codegen) as a migration >>>>>>>>>>>>> path. So we >>>>>>>>>>>>> should balance simplicity with completeness and utility here. >>>>>>>>>>>>> >>>>>>>>>>>>> Sure, we could just pick something, but given that the >>>>>>>>>>>>> main point of YAML is not capability, but expressibility and >>>>>>>>>>>>> ease-of-use, I >>>>>>>>>>>>> think it's worth trying to get the expression of these concepts >>>>>>>>>>>>> right. I'm >>>>>>>>>>>>> sure many of you have written a pipeline to files at some point >>>>>>>>>>>>> in time; >>>>>>>>>>>>> I'd welcome any thoughts anyone has on the matter. >>>>>>>>>>>>> >>>>>>>>>>>>> - Robert >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> P.S. A related consideration: how should we consider the plain >>>>>>>>>>>>> Read (where that file pattern is given at pipeline construction) >>>>>>>>>>>>> from the >>>>>>>>>>>>> ReadAll variants? Should they be separate transforms, or should >>>>>>>>>>>>> we instead >>>>>>>>>>>>> allow the same named transform (e.g. ReadFromParquet) support >>>>>>>>>>>>> both modes, >>>>>>>>>>>>> depending on whether an input PCollection or explicit file path >>>>>>>>>>>>> is given >>>>>>>>>>>>> (the two being mutually exclusive, with exactly one required, and >>>>>>>>>>>>> good >>>>>>>>>>>>> error messaging of course)? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Java: >>>>>>>>>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html >>>>>>>>>>>>> Python: >>>>>>>>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText >>>>>>>>>>>>> Go: >>>>>>>>>>>>> https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write >>>>>>>>>>>>> Typescript: >>>>>>>>>>>>> https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html >>>>>>>>>>>>> Scio: >>>>>>>>>>>>> https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html >>>>>>>>>>>>> >>>>>>>>>>>>>