On Tue, Oct 10, 2023 at 4:05 PM Chamikara Jayalath <chamik...@google.com> wrote:
> > On Tue, Oct 10, 2023 at 4:02 PM Robert Bradshaw <rober...@google.com> > wrote: > >> On Tue, Oct 10, 2023 at 3:53 PM Chamikara Jayalath <chamik...@google.com> >> wrote: >> >>> >>> On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax <re...@google.com> wrote: >>> >>>> I suspect some simple pattern templating would solve most use cases. We >>>> probably would want to support timestamp formatting (e.g. $YYYY $M $D) as >>>> well. >>>> >>>> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >>>>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath < >>>>> chamik...@google.com> wrote: >>>>> >>>>>> I would say: >>>>>> >>>>>> sink: >>>>>> type: WriteToParquet >>>>>> config: >>>>>> path: /beam/filesytem/dest >>>>>> prefix: <my prefix> >>>>>> suffix: <my suffix> >>>>>> >>>>>> Underlying SDK will add the middle part of the file names to make >>>>>> sure that files generated by various bundles/windows/shards do not >>>>>> conflict. >>>>>> >>>>> >>>>> What's the relationship between path and prefix? Is path the >>>>> directory part of the full path, or does prefix precede it? >>>>> >>>> >>> prefix would be the first part of the file name so each shard will be >>> named. >>> <path>/<prefix>-<shard components added by the runner>-<suffix> >>> >>> This is similar to what we do in existing SDKS. For example, Java FileIO, >>> >>> >>> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L187 >>> >> >> Yeah, although there's no distinction between path and prefix. >> > > Ah, for FIleIO, path comes from the "to" call. > > > https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1125 > > Ah. I guess there's some inconsistency here, e.g. text files are written to a filenamePrefix that subsumes both: https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html#to-java.lang.String- > >> >>>>> >>>>>> This will satisfy the vast majority of use-cases I believe. Fully >>>>>> customizing the file pattern sounds like a more advanced use case that >>>>>> can >>>>>> be left for "real" SDKs. >>>>>> >>>>> >>>>> Yea, we don't have to do everything. >>>>> >>>>> >>>>>> For dynamic destinations, I think just making the "path" component >>>>>> support a lambda that is parameterized by the input should be adequate >>>>>> since this allows customers to direct files written to different >>>>>> destination directories. >>>>>> >>>>>> sink: >>>>>> type: WriteToParquet >>>>>> config: >>>>>> path: <destination lambda> >>>>>> prefix: <my prefix> >>>>>> suffix: <my suffix> >>>>>> >>>>>> I'm not sure what would be the best way to specify a lambda here >>>>>> though. Maybe a regex or the name of a Python callable ? >>>>>> >>>>> >>>>> I'd rather not require Python for a pure Java pipeline, but some kind >>>>> of a pattern template may be sufficient here. >>>>> >>>>> >>>>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev < >>>>>> dev@beam.apache.org> wrote: >>>>>> >>>>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> Just FYI - the reason why names (including prefixes) in >>>>>>>> DynamicDestinations were parameterized via a lambda instead of just >>>>>>>> having >>>>>>>> the user add it via MapElements is performance. We discussed something >>>>>>>> along the lines of what you are suggesting (essentially having the user >>>>>>>> create a KV where the key contained the dynamic information). The >>>>>>>> problem >>>>>>>> was that often the size of the generated filepath was often much larger >>>>>>>> (sometimes by 2 OOM) than the information in the record, and there was >>>>>>>> a >>>>>>>> desire to avoid record blowup. e.g. the record might contain a single >>>>>>>> integer userid, and the filepath prefix would then be >>>>>>>> /long/path/to/output/users/<id>. This was especially bad in cases >>>>>>>> where the >>>>>>>> data had to be shuffled, and the existing dynamic destinations method >>>>>>>> allowed extracting the filepath only _after_ the shuffle. >>>>>>>> >>>>>>> >>>>>>> That is a consideration I hadn't thought much of, thanks for >>>>>>> bringing this up. >>>>>>> >>>>>>> >>>>>>>> Now there may not be any good way to keep this benefit in a >>>>>>>> declarative approach such as YAML (or at least a good easy way - we >>>>>>>> could >>>>>>>> always allow the user to pass in a SQL expression to extract the >>>>>>>> filename >>>>>>>> from the record!), but we should keep in mind that this might mean that >>>>>>>> YAML-generated pipelines will be less efficient for certain use cases. >>>>>>>> >>>>>>> >>>>>>> Yep, it's not as straightforward to do in a declarative way. I would >>>>>>> like to avoid mixing UDFs (with their associated languages and execution >>>>>>> environments) if possible. Though I'd like the performance of a >>>>>>> "straightforward" YAML pipeline to be that which one can get writing >>>>>>> straight-line Java (and possibly better, if we can leverage the >>>>>>> structure >>>>>>> of schemas everywhere) this is not an absolute requirement for all >>>>>>> features. >>>>>>> >>>>>>> I wonder if separating out a constant prefix vs. the dynamic stuff >>>>>>> could be sufficient to mitigate the blow-up of pre-computing this in >>>>>>> most >>>>>>> cases (especially in the context of a larger pipeline). Alternatively, >>>>>>> rather than just a sharding pattern, one could have a full filepattern >>>>>>> that >>>>>>> includes format parameters for dynamically computed bits as well as the >>>>>>> shard number, windowing info, etc. (There are pros and cons to this.) >>>>>>> >>>>>>> >>>>>>>> On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev < >>>>>>>> dev@beam.apache.org> wrote: >>>>>>>> >>>>>>>>> Currently the various file writing configurations take a single >>>>>>>>> parameter, path, which indicates where the (sharded) output should be >>>>>>>>> placed. In other words, one can write something like >>>>>>>>> >>>>>>>>> pipeline: >>>>>>>>> ... >>>>>>>>> sink: >>>>>>>>> type: WriteToParquet >>>>>>>>> config: >>>>>>>>> path: /beam/filesytem/dest >>>>>>>>> >>>>>>>>> and one gets files like "/beam/filesystem/dest-X-of-N" >>>>>>>>> >>>>>>>>> Of course, in practice file writing is often much more complicated >>>>>>>>> than this (especially when it comes to Streaming). For reference, I've >>>>>>>>> included links to our existing offerings in the various SDKs below. >>>>>>>>> I'd >>>>>>>>> like to start a discussion about what else should go in the "config" >>>>>>>>> parameter and how it should be expressed in YAML. >>>>>>>>> >>>>>>>>> The primary concern is around naming. This can generally be split >>>>>>>>> into (1) the prefix, which must be provided by the users (2) the >>>>>>>>> sharing >>>>>>>>> information, includes both shard counts (e.g. (the -X-of-N suffix) >>>>>>>>> but also >>>>>>>>> windowing information (for streaming pipelines) which we may want to >>>>>>>>> allow >>>>>>>>> the user to customize the formatting of, and (3) a suffix like .json >>>>>>>>> or >>>>>>>>> .avro that is useful for both humans and tooling and can often be >>>>>>>>> inferred >>>>>>>>> but should allow customization as well. >>>>>>>>> >>>>>>>>> An interesting case is that of dynamic destinations, where the >>>>>>>>> prefix (or other parameters) may themselves be functions of the >>>>>>>>> records >>>>>>>>> themselves. (I am excluding the case where the format itself is >>>>>>>>> variable--such cases are probably better handled by explicitly >>>>>>>>> partitioning >>>>>>>>> the data and doing multiple writes, as this introduces significant >>>>>>>>> complexities and the set of possible formats is generally finite and >>>>>>>>> known >>>>>>>>> ahead of time.) I propose that we leverage the fact that we have >>>>>>>>> structured >>>>>>>>> data to be able to pull out these dynamic parameters. For example, if >>>>>>>>> we >>>>>>>>> have an input data set with a string column my_col we could allow >>>>>>>>> something >>>>>>>>> like >>>>>>>>> >>>>>>>>> config: >>>>>>>>> path: {dynamic: my_col} >>>>>>>>> >>>>>>>>> which would pull this information out at runtime. (With the >>>>>>>>> MapToFields transform, it is very easy to compute/append additional >>>>>>>>> fields >>>>>>>>> to existing records.) Generally this field would then be stripped >>>>>>>>> from the >>>>>>>>> written data, which would only see the subset of non-dynamically >>>>>>>>> referenced >>>>>>>>> columns (though this could be configurable: we could add an attribute >>>>>>>>> like >>>>>>>>> {dynamic: my_col, Keep: true} or require the set of columns to be >>>>>>>>> actually >>>>>>>>> written (or elided) to be enumerated in the config or allow/require >>>>>>>>> the >>>>>>>>> actual data to be written to be in a designated field of the "full" >>>>>>>>> input >>>>>>>>> records as arranged by a preceding transform). It'd be great to get >>>>>>>>> input/impressions from a wide range of people here on what would be >>>>>>>>> the >>>>>>>>> most natural. Often just writing out snippets of various alternatives >>>>>>>>> can >>>>>>>>> be quite informative (though I'm avoiding putting them here for the >>>>>>>>> moment >>>>>>>>> to avoid biasing ideas right off the bat). >>>>>>>>> >>>>>>>>> For streaming pipelines it is often essential to write data out in >>>>>>>>> a time-partitioned manner. The typical way to do this is to add the >>>>>>>>> windowing information into the shard specification itself, and a (set >>>>>>>>> of) >>>>>>>>> file(s) is written on each window closing. Beam YAML already supports >>>>>>>>> any >>>>>>>>> transform being given a "windowing" configuration which will cause a >>>>>>>>> WindowInto transform to be applied to its input(s) before application >>>>>>>>> which >>>>>>>>> can sit naturally on a sink. We may want to consider if non-windowed >>>>>>>>> writes >>>>>>>>> make sense as well (though how this interacts with the watermark and >>>>>>>>> underlying implementations are a large open question, so this is a >>>>>>>>> larger >>>>>>>>> change that might make sense to defer). >>>>>>>>> >>>>>>>>> Note that I am explicitly excluding "coders" here. All data in >>>>>>>>> YAML should be schema'd, and writers should know how to write this >>>>>>>>> structured data. We may want to allow a "schema" field to allow a >>>>>>>>> user to >>>>>>>>> specify the desired schema in a manner compatible with the sink format >>>>>>>>> itself (e.g. avro, json, whatever) that could be used both for >>>>>>>>> validation >>>>>>>>> and possibly resolving ambiguities (e.g. if the sink has an enum >>>>>>>>> format >>>>>>>>> that is not expressed in the schema of the input PCollection). >>>>>>>>> >>>>>>>>> Some other configuration options are that some formats (especially >>>>>>>>> text-based ones) allow for specification of an external compression >>>>>>>>> type >>>>>>>>> (which may be inferable from the suffix), whether to write a single >>>>>>>>> shard >>>>>>>>> if the input collection is empty or no shards at all (an occasional >>>>>>>>> user >>>>>>>>> request that's supported for some Beam sinks now), whether to allow >>>>>>>>> fixed >>>>>>>>> sharing (generally discouraged, as it disables things like automatic >>>>>>>>> shading based on input size, let alone dynamic work rebalancing, >>>>>>>>> though >>>>>>>>> sometimes this is useful if the input is known to be small and a >>>>>>>>> single >>>>>>>>> output is desired regardless of the restriction in parallelism), or >>>>>>>>> other >>>>>>>>> sharding parameters (e.g. limiting the number of total elements or >>>>>>>>> (approximately) total number of bytes per output shard). Some of these >>>>>>>>> options may not be available/implemented for all >>>>>>>>> formats--consideration >>>>>>>>> should be given as to how to handle this inconsistency (runtime >>>>>>>>> errors for >>>>>>>>> unsupported combinations or simply not allowing them on any until all >>>>>>>>> are >>>>>>>>> supported). >>>>>>>>> >>>>>>>>> A final consideration: we do not anticipate exposing the full >>>>>>>>> complexity of Beam in the YAML offering. For advanced users using a >>>>>>>>> "real" >>>>>>>>> SDK will often be preferable, and we intend to provide a migration >>>>>>>>> path >>>>>>>>> from YAML to a language of your choice (codegen) as a migration path. >>>>>>>>> So we >>>>>>>>> should balance simplicity with completeness and utility here. >>>>>>>>> >>>>>>>>> Sure, we could just pick something, but given that the main point >>>>>>>>> of YAML is not capability, but expressibility and ease-of-use, I >>>>>>>>> think it's >>>>>>>>> worth trying to get the expression of these concepts right. I'm sure >>>>>>>>> many >>>>>>>>> of you have written a pipeline to files at some point in time; I'd >>>>>>>>> welcome >>>>>>>>> any thoughts anyone has on the matter. >>>>>>>>> >>>>>>>>> - Robert >>>>>>>>> >>>>>>>>> >>>>>>>>> P.S. A related consideration: how should we consider the plain >>>>>>>>> Read (where that file pattern is given at pipeline construction) from >>>>>>>>> the >>>>>>>>> ReadAll variants? Should they be separate transforms, or should we >>>>>>>>> instead >>>>>>>>> allow the same named transform (e.g. ReadFromParquet) support both >>>>>>>>> modes, >>>>>>>>> depending on whether an input PCollection or explicit file path is >>>>>>>>> given >>>>>>>>> (the two being mutually exclusive, with exactly one required, and good >>>>>>>>> error messaging of course)? >>>>>>>>> >>>>>>>>> >>>>>>>>> Java: >>>>>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html >>>>>>>>> Python: >>>>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText >>>>>>>>> Go: >>>>>>>>> https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write >>>>>>>>> Typescript: >>>>>>>>> https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html >>>>>>>>> Scio: >>>>>>>>> https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html >>>>>>>>> >>>>>>>>>