OK, so how about this for a concrete proposal: sink: type: WriteToParquet config: path: "/beam/filesytem/{record.my_col}-{timestamp.year}{timestamp.month}{timestamp.day}" suffix: ".parquet"
The eventual path would be <path><sharding_info><suffix>. The suffix would be optional, and there could be a default for the specific file format. A file format could inspect a provided suffix like ".csv.gz" to infer compression as well. Note that this doesn't have any special indicators for being dynamic other than the {}'s. Also, my_col would be written as part of the data (but we could add an extra "elide" config parameter that takes a list of columns to exclude if desired). We could call this "prefix" rather than path. (Path is symmetric with reading, but prefix is a bit more direct.) Anyone want to voice their opinion here? On Wed, Oct 11, 2023 at 9:01 AM Chamikara Jayalath <chamik...@google.com> wrote: > > > On Wed, Oct 11, 2023 at 6:55 AM Kenneth Knowles <k...@apache.org> wrote: > >> So, top-posting because the threading got to be a lot for me and I think >> it forked a bit too... I may even be restating something someone said, so >> apologies for that. >> >> Very very good point about *required* parameters where if you don't use >> them then you will end up with two writers writing to the same file. The >> easiest example to work with might be if you omitted SHARD_NUM so all >> shards end up clobbering the same file. >> >> I think there's a unifying perspective between prefix/suffix and the need >> to be sure to include critical sharding variables. Essentially it is my >> point about it being a "big data fileset". It is perhaps unrealistic but >> ideally the user names the big data fileset and then the mandatory other >> pieces are added outside of their control. For example if I name my big >> data fileset "foo" then that implicitly means that "foo" consists of all >> the files named "foo/${SHARD_NUM}-of-${SHARD_TOTAL}". And yes now that I >> re-read I see you basically said the same thing. In some cases the required >> fields will include $WINDOW, $KEY, and $PANE_INDEX, yes? Even though the >> user can think of it as a textual template, if we can use a library that >> yields an abstract syntax tree for the expression we can easily check these >> requirements in a robust way - or we could do it in a non-robust way be >> string-scraping ourselves. >> > > Yes. I think we are talking about the same thing. Users should not have > full control over the filename since that could lead to conflicts and data > loss when data is being written in parallel from multiple workers. Users > can refer to the big data fileset being written using the glob "<path>/**". > In addition users have control over the filename <prefix> and <suffix> > (file extension, for example) which can be useful for some downstream > use-cases. Rest of the filename will be filled out by the SDK (window, pane > etc.) to make sure that the files written by different workers do not > conflict. > > Thanks, > Cham > > >> >> We actually are very close to this in FileIO. I think the interpretation >> of "prefix" is that it is the filename "foo" as above, and "suffix" is >> really something like ".txt" that you stick on the end of everything for >> whatever reason. >> >> Kenn >> >> On Tue, Oct 10, 2023 at 7:12 PM Robert Bradshaw via dev < >> dev@beam.apache.org> wrote: >> >>> On Tue, Oct 10, 2023 at 4:05 PM Chamikara Jayalath <chamik...@google.com> >>> wrote: >>> >>>> >>>> On Tue, Oct 10, 2023 at 4:02 PM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >>>>> On Tue, Oct 10, 2023 at 3:53 PM Chamikara Jayalath < >>>>> chamik...@google.com> wrote: >>>>> >>>>>> >>>>>> On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> I suspect some simple pattern templating would solve most use cases. >>>>>>> We probably would want to support timestamp formatting (e.g. $YYYY $M >>>>>>> $D) >>>>>>> as well. >>>>>>> >>>>>>> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw <rober...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath < >>>>>>>> chamik...@google.com> wrote: >>>>>>>> >>>>>>>>> I would say: >>>>>>>>> >>>>>>>>> sink: >>>>>>>>> type: WriteToParquet >>>>>>>>> config: >>>>>>>>> path: /beam/filesytem/dest >>>>>>>>> prefix: <my prefix> >>>>>>>>> suffix: <my suffix> >>>>>>>>> >>>>>>>>> Underlying SDK will add the middle part of the file names to make >>>>>>>>> sure that files generated by various bundles/windows/shards do not >>>>>>>>> conflict. >>>>>>>>> >>>>>>>> >>>>>>>> What's the relationship between path and prefix? Is path the >>>>>>>> directory part of the full path, or does prefix precede it? >>>>>>>> >>>>>>> >>>>>> prefix would be the first part of the file name so each shard will be >>>>>> named. >>>>>> <path>/<prefix>-<shard components added by the runner>-<suffix> >>>>>> >>>>>> This is similar to what we do in existing SDKS. For example, Java >>>>>> FileIO, >>>>>> >>>>>> >>>>>> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L187 >>>>>> >>>>> >>>>> Yeah, although there's no distinction between path and prefix. >>>>> >>>> >>>> Ah, for FIleIO, path comes from the "to" call. >>>> >>>> >>>> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1125 >>>> >>>> >>> >>> Ah. I guess there's some inconsistency here, e.g. text files are written >>> to a filenamePrefix that subsumes both: >>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html#to-java.lang.String- >>> >>> >>>> >>>>> >>>>>>>> >>>>>>>>> This will satisfy the vast majority of use-cases I believe. Fully >>>>>>>>> customizing the file pattern sounds like a more advanced use case >>>>>>>>> that can >>>>>>>>> be left for "real" SDKs. >>>>>>>>> >>>>>>>> >>>>>>>> Yea, we don't have to do everything. >>>>>>>> >>>>>>>> >>>>>>>>> For dynamic destinations, I think just making the "path" component >>>>>>>>> support a lambda that is parameterized by the input should be >>>>>>>>> adequate >>>>>>>>> since this allows customers to direct files written to different >>>>>>>>> destination directories. >>>>>>>>> >>>>>>>>> sink: >>>>>>>>> type: WriteToParquet >>>>>>>>> config: >>>>>>>>> path: <destination lambda> >>>>>>>>> prefix: <my prefix> >>>>>>>>> suffix: <my suffix> >>>>>>>>> >>>>>>>>> I'm not sure what would be the best way to specify a lambda here >>>>>>>>> though. Maybe a regex or the name of a Python callable ? >>>>>>>>> >>>>>>>> >>>>>>>> I'd rather not require Python for a pure Java pipeline, but some >>>>>>>> kind of a pattern template may be sufficient here. >>>>>>>> >>>>>>>> >>>>>>>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev < >>>>>>>>> dev@beam.apache.org> wrote: >>>>>>>>> >>>>>>>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax <re...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Just FYI - the reason why names (including prefixes) in >>>>>>>>>>> DynamicDestinations were parameterized via a lambda instead of just >>>>>>>>>>> having >>>>>>>>>>> the user add it via MapElements is performance. We discussed >>>>>>>>>>> something >>>>>>>>>>> along the lines of what you are suggesting (essentially having the >>>>>>>>>>> user >>>>>>>>>>> create a KV where the key contained the dynamic information). The >>>>>>>>>>> problem >>>>>>>>>>> was that often the size of the generated filepath was often much >>>>>>>>>>> larger >>>>>>>>>>> (sometimes by 2 OOM) than the information in the record, and there >>>>>>>>>>> was a >>>>>>>>>>> desire to avoid record blowup. e.g. the record might contain a >>>>>>>>>>> single >>>>>>>>>>> integer userid, and the filepath prefix would then be >>>>>>>>>>> /long/path/to/output/users/<id>. This was especially bad in cases >>>>>>>>>>> where the >>>>>>>>>>> data had to be shuffled, and the existing dynamic destinations >>>>>>>>>>> method >>>>>>>>>>> allowed extracting the filepath only _after_ the shuffle. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> That is a consideration I hadn't thought much of, thanks for >>>>>>>>>> bringing this up. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> Now there may not be any good way to keep this benefit in a >>>>>>>>>>> declarative approach such as YAML (or at least a good easy way - we >>>>>>>>>>> could >>>>>>>>>>> always allow the user to pass in a SQL expression to extract the >>>>>>>>>>> filename >>>>>>>>>>> from the record!), but we should keep in mind that this might mean >>>>>>>>>>> that >>>>>>>>>>> YAML-generated pipelines will be less efficient for certain use >>>>>>>>>>> cases. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Yep, it's not as straightforward to do in a declarative way. I >>>>>>>>>> would like to avoid mixing UDFs (with their associated languages and >>>>>>>>>> execution environments) if possible. Though I'd like the performance >>>>>>>>>> of a >>>>>>>>>> "straightforward" YAML pipeline to be that which one can get writing >>>>>>>>>> straight-line Java (and possibly better, if we can leverage the >>>>>>>>>> structure >>>>>>>>>> of schemas everywhere) this is not an absolute requirement for all >>>>>>>>>> features. >>>>>>>>>> >>>>>>>>>> I wonder if separating out a constant prefix vs. the dynamic >>>>>>>>>> stuff could be sufficient to mitigate the blow-up of pre-computing >>>>>>>>>> this in >>>>>>>>>> most cases (especially in the context of a larger pipeline). >>>>>>>>>> Alternatively, >>>>>>>>>> rather than just a sharding pattern, one could have a full >>>>>>>>>> filepattern that >>>>>>>>>> includes format parameters for dynamically computed bits as well as >>>>>>>>>> the >>>>>>>>>> shard number, windowing info, etc. (There are pros and cons to this.) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev < >>>>>>>>>>> dev@beam.apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>>> Currently the various file writing configurations take a single >>>>>>>>>>>> parameter, path, which indicates where the (sharded) output should >>>>>>>>>>>> be >>>>>>>>>>>> placed. In other words, one can write something like >>>>>>>>>>>> >>>>>>>>>>>> pipeline: >>>>>>>>>>>> ... >>>>>>>>>>>> sink: >>>>>>>>>>>> type: WriteToParquet >>>>>>>>>>>> config: >>>>>>>>>>>> path: /beam/filesytem/dest >>>>>>>>>>>> >>>>>>>>>>>> and one gets files like "/beam/filesystem/dest-X-of-N" >>>>>>>>>>>> >>>>>>>>>>>> Of course, in practice file writing is often much more >>>>>>>>>>>> complicated than this (especially when it comes to Streaming). For >>>>>>>>>>>> reference, I've included links to our existing offerings in the >>>>>>>>>>>> various >>>>>>>>>>>> SDKs below. I'd like to start a discussion about what else should >>>>>>>>>>>> go in the >>>>>>>>>>>> "config" parameter and how it should be expressed in YAML. >>>>>>>>>>>> >>>>>>>>>>>> The primary concern is around naming. This can generally be >>>>>>>>>>>> split into (1) the prefix, which must be provided by the users (2) >>>>>>>>>>>> the >>>>>>>>>>>> sharing information, includes both shard counts (e.g. (the -X-of-N >>>>>>>>>>>> suffix) >>>>>>>>>>>> but also windowing information (for streaming pipelines) which we >>>>>>>>>>>> may want >>>>>>>>>>>> to allow the user to customize the formatting of, and (3) a suffix >>>>>>>>>>>> like >>>>>>>>>>>> .json or .avro that is useful for both humans and tooling and can >>>>>>>>>>>> often be >>>>>>>>>>>> inferred but should allow customization as well. >>>>>>>>>>>> >>>>>>>>>>>> An interesting case is that of dynamic destinations, where the >>>>>>>>>>>> prefix (or other parameters) may themselves be functions of the >>>>>>>>>>>> records >>>>>>>>>>>> themselves. (I am excluding the case where the format itself is >>>>>>>>>>>> variable--such cases are probably better handled by explicitly >>>>>>>>>>>> partitioning >>>>>>>>>>>> the data and doing multiple writes, as this introduces significant >>>>>>>>>>>> complexities and the set of possible formats is generally finite >>>>>>>>>>>> and known >>>>>>>>>>>> ahead of time.) I propose that we leverage the fact that we have >>>>>>>>>>>> structured >>>>>>>>>>>> data to be able to pull out these dynamic parameters. For example, >>>>>>>>>>>> if we >>>>>>>>>>>> have an input data set with a string column my_col we could allow >>>>>>>>>>>> something >>>>>>>>>>>> like >>>>>>>>>>>> >>>>>>>>>>>> config: >>>>>>>>>>>> path: {dynamic: my_col} >>>>>>>>>>>> >>>>>>>>>>>> which would pull this information out at runtime. (With the >>>>>>>>>>>> MapToFields transform, it is very easy to compute/append >>>>>>>>>>>> additional fields >>>>>>>>>>>> to existing records.) Generally this field would then be stripped >>>>>>>>>>>> from the >>>>>>>>>>>> written data, which would only see the subset of non-dynamically >>>>>>>>>>>> referenced >>>>>>>>>>>> columns (though this could be configurable: we could add an >>>>>>>>>>>> attribute like >>>>>>>>>>>> {dynamic: my_col, Keep: true} or require the set of columns to be >>>>>>>>>>>> actually >>>>>>>>>>>> written (or elided) to be enumerated in the config or >>>>>>>>>>>> allow/require the >>>>>>>>>>>> actual data to be written to be in a designated field of the >>>>>>>>>>>> "full" input >>>>>>>>>>>> records as arranged by a preceding transform). It'd be great to get >>>>>>>>>>>> input/impressions from a wide range of people here on what would >>>>>>>>>>>> be the >>>>>>>>>>>> most natural. Often just writing out snippets of various >>>>>>>>>>>> alternatives can >>>>>>>>>>>> be quite informative (though I'm avoiding putting them here for >>>>>>>>>>>> the moment >>>>>>>>>>>> to avoid biasing ideas right off the bat). >>>>>>>>>>>> >>>>>>>>>>>> For streaming pipelines it is often essential to write data out >>>>>>>>>>>> in a time-partitioned manner. The typical way to do this is to add >>>>>>>>>>>> the >>>>>>>>>>>> windowing information into the shard specification itself, and a >>>>>>>>>>>> (set of) >>>>>>>>>>>> file(s) is written on each window closing. Beam YAML already >>>>>>>>>>>> supports any >>>>>>>>>>>> transform being given a "windowing" configuration which will cause >>>>>>>>>>>> a >>>>>>>>>>>> WindowInto transform to be applied to its input(s) before >>>>>>>>>>>> application which >>>>>>>>>>>> can sit naturally on a sink. We may want to consider if >>>>>>>>>>>> non-windowed writes >>>>>>>>>>>> make sense as well (though how this interacts with the watermark >>>>>>>>>>>> and >>>>>>>>>>>> underlying implementations are a large open question, so this is a >>>>>>>>>>>> larger >>>>>>>>>>>> change that might make sense to defer). >>>>>>>>>>>> >>>>>>>>>>>> Note that I am explicitly excluding "coders" here. All data in >>>>>>>>>>>> YAML should be schema'd, and writers should know how to write this >>>>>>>>>>>> structured data. We may want to allow a "schema" field to allow a >>>>>>>>>>>> user to >>>>>>>>>>>> specify the desired schema in a manner compatible with the sink >>>>>>>>>>>> format >>>>>>>>>>>> itself (e.g. avro, json, whatever) that could be used both for >>>>>>>>>>>> validation >>>>>>>>>>>> and possibly resolving ambiguities (e.g. if the sink has an enum >>>>>>>>>>>> format >>>>>>>>>>>> that is not expressed in the schema of the input PCollection). >>>>>>>>>>>> >>>>>>>>>>>> Some other configuration options are that some formats >>>>>>>>>>>> (especially text-based ones) allow for specification of an external >>>>>>>>>>>> compression type (which may be inferable from the suffix), whether >>>>>>>>>>>> to write >>>>>>>>>>>> a single shard if the input collection is empty or no shards at >>>>>>>>>>>> all (an >>>>>>>>>>>> occasional user request that's supported for some Beam sinks now), >>>>>>>>>>>> whether >>>>>>>>>>>> to allow fixed sharing (generally discouraged, as it disables >>>>>>>>>>>> things like >>>>>>>>>>>> automatic shading based on input size, let alone dynamic work >>>>>>>>>>>> rebalancing, >>>>>>>>>>>> though sometimes this is useful if the input is known to be small >>>>>>>>>>>> and a >>>>>>>>>>>> single output is desired regardless of the restriction in >>>>>>>>>>>> parallelism), or >>>>>>>>>>>> other sharding parameters (e.g. limiting the number of total >>>>>>>>>>>> elements or >>>>>>>>>>>> (approximately) total number of bytes per output shard). Some of >>>>>>>>>>>> these >>>>>>>>>>>> options may not be available/implemented for all >>>>>>>>>>>> formats--consideration >>>>>>>>>>>> should be given as to how to handle this inconsistency (runtime >>>>>>>>>>>> errors for >>>>>>>>>>>> unsupported combinations or simply not allowing them on any until >>>>>>>>>>>> all are >>>>>>>>>>>> supported). >>>>>>>>>>>> >>>>>>>>>>>> A final consideration: we do not anticipate exposing the full >>>>>>>>>>>> complexity of Beam in the YAML offering. For advanced users using >>>>>>>>>>>> a "real" >>>>>>>>>>>> SDK will often be preferable, and we intend to provide a migration >>>>>>>>>>>> path >>>>>>>>>>>> from YAML to a language of your choice (codegen) as a migration >>>>>>>>>>>> path. So we >>>>>>>>>>>> should balance simplicity with completeness and utility here. >>>>>>>>>>>> >>>>>>>>>>>> Sure, we could just pick something, but given that the >>>>>>>>>>>> main point of YAML is not capability, but expressibility and >>>>>>>>>>>> ease-of-use, I >>>>>>>>>>>> think it's worth trying to get the expression of these concepts >>>>>>>>>>>> right. I'm >>>>>>>>>>>> sure many of you have written a pipeline to files at some point in >>>>>>>>>>>> time; >>>>>>>>>>>> I'd welcome any thoughts anyone has on the matter. >>>>>>>>>>>> >>>>>>>>>>>> - Robert >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> P.S. A related consideration: how should we consider the plain >>>>>>>>>>>> Read (where that file pattern is given at pipeline construction) >>>>>>>>>>>> from the >>>>>>>>>>>> ReadAll variants? Should they be separate transforms, or should we >>>>>>>>>>>> instead >>>>>>>>>>>> allow the same named transform (e.g. ReadFromParquet) support both >>>>>>>>>>>> modes, >>>>>>>>>>>> depending on whether an input PCollection or explicit file path is >>>>>>>>>>>> given >>>>>>>>>>>> (the two being mutually exclusive, with exactly one required, and >>>>>>>>>>>> good >>>>>>>>>>>> error messaging of course)? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Java: >>>>>>>>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html >>>>>>>>>>>> Python: >>>>>>>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText >>>>>>>>>>>> Go: >>>>>>>>>>>> https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write >>>>>>>>>>>> Typescript: >>>>>>>>>>>> https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html >>>>>>>>>>>> Scio: >>>>>>>>>>>> https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html >>>>>>>>>>>> >>>>>>>>>>>>