On Thu, Oct 12, 2023 at 4:59 PM Robert Bradshaw <rober...@google.com> wrote:

> OK, so how about this for a concrete proposal:
>
>     sink:
>       type: WriteToParquet
>       config:
>         path:
> "/beam/filesytem/{record.my_col}-{timestamp.year}{timestamp.month}{timestamp.day}"
>

This is an example, right ? So basically the path can be parameterized
using the fields of the Beam schema of input elements ?


>         suffix: ".parquet"
>
> The eventual path would be <path><sharding_info><suffix>. The suffix
> would be optional, and there could be a default for the specific file
> format. A file format could inspect a provided suffix like ".csv.gz" to
> infer compression as well.
>
> Note that this doesn't have any special indicators for being dynamic other
> than the {}'s. Also, my_col would be written as part of the data (but we
> could add an extra "elide" config parameter that takes a list of columns to
> exclude if desired).
>

I think this is already dynamic given that the path can be
parameterized using the input. For example,

path:
"/beam/filesytem/{record.destination_col}/-{timestamp.year}{timestamp.month}{timestamp.day}"


> We could call this "prefix" rather than path. (Path is symmetric with
> reading, but prefix is a bit more direct.) Anyone want to voice
> their opinion here?
>

I'm fine with either.

Thanks,
Cham


>
>
>
>
> On Wed, Oct 11, 2023 at 9:01 AM Chamikara Jayalath <chamik...@google.com>
> wrote:
>
>>
>>
>> On Wed, Oct 11, 2023 at 6:55 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> So, top-posting because the threading got to be a lot for me and I think
>>> it forked a bit too... I may even be restating something someone said, so
>>> apologies for that.
>>>
>>> Very very good point about *required* parameters where if you don't use
>>> them then you will end up with two writers writing to the same file. The
>>> easiest example to work with might be if you omitted SHARD_NUM so all
>>> shards end up clobbering the same file.
>>>
>>> I think there's a unifying perspective between prefix/suffix and the
>>> need to be sure to include critical sharding variables. Essentially it is
>>> my point about it being a "big data fileset". It is perhaps unrealistic but
>>> ideally the user names the big data fileset and then the mandatory other
>>> pieces are added outside of their control. For example if I name my big
>>> data fileset "foo" then that implicitly means that "foo" consists of all
>>> the files named "foo/${SHARD_NUM}-of-${SHARD_TOTAL}". And yes now that I
>>> re-read I see you basically said the same thing. In some cases the required
>>> fields will include $WINDOW, $KEY, and $PANE_INDEX, yes? Even though the
>>> user can think of it as a textual template, if we can use a library that
>>> yields an abstract syntax tree for the expression we can easily check these
>>> requirements in a robust way - or we could do it in a non-robust way be
>>> string-scraping ourselves.
>>>
>>
>> Yes. I think we are talking about the same thing. Users should not have
>> full control over the filename since that could lead to conflicts and data
>> loss when data is being written in parallel from multiple workers. Users
>> can refer to the big data fileset being written using the glob "<path>/**".
>> In addition users have control over the filename <prefix> and <suffix>
>> (file extension, for example) which can be useful for some downstream
>> use-cases. Rest of the filename will be filled out by the SDK (window, pane
>> etc.) to make sure that the files written by different workers do not
>> conflict.
>>
>> Thanks,
>> Cham
>>
>>
>>>
>>> We actually are very close to this in FileIO. I think the interpretation
>>> of "prefix" is that it is the filename "foo" as above, and "suffix" is
>>> really something like ".txt" that you stick on the end of everything for
>>> whatever reason.
>>>
>>> Kenn
>>>
>>> On Tue, Oct 10, 2023 at 7:12 PM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> On Tue, Oct 10, 2023 at 4:05 PM Chamikara Jayalath <
>>>> chamik...@google.com> wrote:
>>>>
>>>>>
>>>>> On Tue, Oct 10, 2023 at 4:02 PM Robert Bradshaw <rober...@google.com>
>>>>> wrote:
>>>>>
>>>>>> On Tue, Oct 10, 2023 at 3:53 PM Chamikara Jayalath <
>>>>>> chamik...@google.com> wrote:
>>>>>>
>>>>>>>
>>>>>>> On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> I suspect some simple pattern templating would solve most use
>>>>>>>> cases. We probably would want to support timestamp formatting (e.g. 
>>>>>>>> $YYYY
>>>>>>>> $M $D) as well.
>>>>>>>>
>>>>>>>> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw <
>>>>>>>> rober...@google.com> wrote:
>>>>>>>>
>>>>>>>>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath <
>>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> I would say:
>>>>>>>>>>
>>>>>>>>>>     sink:
>>>>>>>>>>       type: WriteToParquet
>>>>>>>>>>       config:
>>>>>>>>>>         path: /beam/filesytem/dest
>>>>>>>>>>         prefix: <my prefix>
>>>>>>>>>>         suffix: <my suffix>
>>>>>>>>>>
>>>>>>>>>> Underlying SDK will add the middle part of the file names to make
>>>>>>>>>> sure that files generated by various bundles/windows/shards do not 
>>>>>>>>>> conflict.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> What's the relationship between path and prefix? Is path the
>>>>>>>>> directory part of the full path, or does prefix precede it?
>>>>>>>>>
>>>>>>>>
>>>>>>> prefix would be the first part of the file name so each shard
>>>>>>> will be named.
>>>>>>> <path>/<prefix>-<shard components added by the runner>-<suffix>
>>>>>>>
>>>>>>> This is similar to what we do in existing SDKS. For example, Java
>>>>>>> FileIO,
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L187
>>>>>>>
>>>>>>
>>>>>> Yeah, although there's no distinction between path and prefix.
>>>>>>
>>>>>
>>>>> Ah, for FIleIO, path comes from the "to" call.
>>>>>
>>>>>
>>>>> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1125
>>>>>
>>>>>
>>>>
>>>> Ah. I guess there's some inconsistency here, e.g. text files are
>>>> written to a filenamePrefix that subsumes both:
>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html#to-java.lang.String-
>>>>
>>>>
>>>>>
>>>>>>
>>>>>>>>>
>>>>>>>>>> This will satisfy the vast majority of use-cases I believe. Fully
>>>>>>>>>> customizing the file pattern sounds like a more advanced use case 
>>>>>>>>>> that can
>>>>>>>>>> be left for "real" SDKs.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Yea, we don't have to do everything.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> For dynamic destinations, I think just making the "path"
>>>>>>>>>> component support  a lambda that is parameterized by the input 
>>>>>>>>>> should be
>>>>>>>>>> adequate since this allows customers to direct files written to 
>>>>>>>>>> different
>>>>>>>>>> destination directories.
>>>>>>>>>>
>>>>>>>>>>     sink:
>>>>>>>>>>       type: WriteToParquet
>>>>>>>>>>       config:
>>>>>>>>>>         path: <destination lambda>
>>>>>>>>>>         prefix: <my prefix>
>>>>>>>>>>         suffix: <my suffix>
>>>>>>>>>>
>>>>>>>>>> I'm not sure what would be the best way to specify a lambda here
>>>>>>>>>> though. Maybe a regex or the name of a Python callable ?
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I'd rather not require Python for a pure Java pipeline, but some
>>>>>>>>> kind of a pattern template may be sufficient here.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
>>>>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax <re...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Just FYI - the reason why names (including prefixes) in
>>>>>>>>>>>> DynamicDestinations were parameterized via a lambda instead of 
>>>>>>>>>>>> just having
>>>>>>>>>>>> the user add it via MapElements is performance. We discussed 
>>>>>>>>>>>> something
>>>>>>>>>>>> along the lines of what you are suggesting (essentially having the 
>>>>>>>>>>>> user
>>>>>>>>>>>> create a KV where the key contained the dynamic information). The 
>>>>>>>>>>>> problem
>>>>>>>>>>>> was that often the size of the generated filepath was often much 
>>>>>>>>>>>> larger
>>>>>>>>>>>> (sometimes by 2 OOM) than the information in the record, and there 
>>>>>>>>>>>> was a
>>>>>>>>>>>> desire to avoid record blowup. e.g. the record might contain a 
>>>>>>>>>>>> single
>>>>>>>>>>>> integer userid, and the filepath prefix would then be
>>>>>>>>>>>> /long/path/to/output/users/<id>. This was especially bad in cases 
>>>>>>>>>>>> where the
>>>>>>>>>>>> data had to be shuffled, and the existing dynamic destinations 
>>>>>>>>>>>> method
>>>>>>>>>>>> allowed extracting the filepath only _after_  the shuffle.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> That is a consideration I hadn't thought much of, thanks for
>>>>>>>>>>> bringing this up.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> Now there may not be any good way to keep this benefit in a
>>>>>>>>>>>> declarative approach such as YAML (or at least a good easy way - 
>>>>>>>>>>>> we could
>>>>>>>>>>>> always allow the user to pass in a SQL expression to extract the 
>>>>>>>>>>>> filename
>>>>>>>>>>>> from the record!), but we should keep in mind that this might mean 
>>>>>>>>>>>> that
>>>>>>>>>>>> YAML-generated pipelines will be less efficient for certain use 
>>>>>>>>>>>> cases.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Yep, it's not as straightforward to do in a declarative way. I
>>>>>>>>>>> would like to avoid mixing UDFs (with their associated languages and
>>>>>>>>>>> execution environments) if possible. Though I'd like the 
>>>>>>>>>>> performance of a
>>>>>>>>>>> "straightforward" YAML pipeline to be that which one can get writing
>>>>>>>>>>> straight-line Java (and possibly better, if we can leverage the 
>>>>>>>>>>> structure
>>>>>>>>>>> of schemas everywhere) this is not an absolute requirement for all
>>>>>>>>>>> features.
>>>>>>>>>>>
>>>>>>>>>>> I wonder if separating out a constant prefix vs. the dynamic
>>>>>>>>>>> stuff could be sufficient to mitigate the blow-up of pre-computing 
>>>>>>>>>>> this in
>>>>>>>>>>> most cases (especially in the context of a larger pipeline). 
>>>>>>>>>>> Alternatively,
>>>>>>>>>>> rather than just a sharding pattern, one could have a full 
>>>>>>>>>>> filepattern that
>>>>>>>>>>> includes format parameters for dynamically computed bits as well as 
>>>>>>>>>>> the
>>>>>>>>>>> shard number, windowing info, etc. (There are pros and cons to 
>>>>>>>>>>> this.)
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev <
>>>>>>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Currently the various file writing configurations take a
>>>>>>>>>>>>> single parameter, path, which indicates where the (sharded) 
>>>>>>>>>>>>> output should
>>>>>>>>>>>>> be placed. In other words, one can write something like
>>>>>>>>>>>>>
>>>>>>>>>>>>>   pipeline:
>>>>>>>>>>>>>     ...
>>>>>>>>>>>>>     sink:
>>>>>>>>>>>>>       type: WriteToParquet
>>>>>>>>>>>>>       config:
>>>>>>>>>>>>>         path: /beam/filesytem/dest
>>>>>>>>>>>>>
>>>>>>>>>>>>> and one gets files like "/beam/filesystem/dest-X-of-N"
>>>>>>>>>>>>>
>>>>>>>>>>>>> Of course, in practice file writing is often much more
>>>>>>>>>>>>> complicated than this (especially when it comes to Streaming). For
>>>>>>>>>>>>> reference, I've included links to our existing offerings in the 
>>>>>>>>>>>>> various
>>>>>>>>>>>>> SDKs below. I'd like to start a discussion about what else should 
>>>>>>>>>>>>> go in the
>>>>>>>>>>>>> "config" parameter and how it should be expressed in YAML.
>>>>>>>>>>>>>
>>>>>>>>>>>>> The primary concern is around naming. This can generally be
>>>>>>>>>>>>> split into (1) the prefix, which must be provided by the users 
>>>>>>>>>>>>> (2) the
>>>>>>>>>>>>> sharing information, includes both shard counts (e.g. (the 
>>>>>>>>>>>>> -X-of-N suffix)
>>>>>>>>>>>>> but also windowing information (for streaming pipelines) which we 
>>>>>>>>>>>>> may want
>>>>>>>>>>>>> to allow the user to customize the formatting of, and (3) a 
>>>>>>>>>>>>> suffix like
>>>>>>>>>>>>> .json or .avro that is useful for both humans and tooling and can 
>>>>>>>>>>>>> often be
>>>>>>>>>>>>> inferred but should allow customization as well.
>>>>>>>>>>>>>
>>>>>>>>>>>>> An interesting case is that of dynamic destinations, where the
>>>>>>>>>>>>> prefix (or other parameters) may themselves be functions of the 
>>>>>>>>>>>>> records
>>>>>>>>>>>>> themselves. (I am excluding the case where the format itself is
>>>>>>>>>>>>> variable--such cases are probably better handled by explicitly 
>>>>>>>>>>>>> partitioning
>>>>>>>>>>>>> the data and doing multiple writes, as this introduces significant
>>>>>>>>>>>>> complexities and the set of possible formats is generally finite 
>>>>>>>>>>>>> and known
>>>>>>>>>>>>> ahead of time.) I propose that we leverage the fact that we have 
>>>>>>>>>>>>> structured
>>>>>>>>>>>>> data to be able to pull out these dynamic parameters. For 
>>>>>>>>>>>>> example, if we
>>>>>>>>>>>>> have an input data set with a string column my_col we could allow 
>>>>>>>>>>>>> something
>>>>>>>>>>>>> like
>>>>>>>>>>>>>
>>>>>>>>>>>>>   config:
>>>>>>>>>>>>>     path: {dynamic: my_col}
>>>>>>>>>>>>>
>>>>>>>>>>>>> which would pull this information out at runtime. (With the
>>>>>>>>>>>>> MapToFields transform, it is very easy to compute/append 
>>>>>>>>>>>>> additional fields
>>>>>>>>>>>>> to existing records.) Generally this field would then be stripped 
>>>>>>>>>>>>> from the
>>>>>>>>>>>>> written data, which would only see the subset of non-dynamically 
>>>>>>>>>>>>> referenced
>>>>>>>>>>>>> columns (though this could be configurable: we could add an 
>>>>>>>>>>>>> attribute like
>>>>>>>>>>>>> {dynamic: my_col, Keep: true} or require the set of columns to be 
>>>>>>>>>>>>> actually
>>>>>>>>>>>>> written (or elided) to be enumerated in the config or 
>>>>>>>>>>>>> allow/require the
>>>>>>>>>>>>> actual data to be written to be in a designated field of the 
>>>>>>>>>>>>> "full" input
>>>>>>>>>>>>> records as arranged by a preceding transform). It'd be great to 
>>>>>>>>>>>>> get
>>>>>>>>>>>>> input/impressions from a wide range of people here on what would 
>>>>>>>>>>>>> be the
>>>>>>>>>>>>> most natural. Often just writing out snippets of various 
>>>>>>>>>>>>> alternatives can
>>>>>>>>>>>>> be quite informative (though I'm avoiding putting them here for 
>>>>>>>>>>>>> the moment
>>>>>>>>>>>>> to avoid biasing ideas right off the bat).
>>>>>>>>>>>>>
>>>>>>>>>>>>> For streaming pipelines it is often essential to write data
>>>>>>>>>>>>> out in a time-partitioned manner. The typical way to do this is 
>>>>>>>>>>>>> to add the
>>>>>>>>>>>>> windowing information into the shard specification itself, and a 
>>>>>>>>>>>>> (set of)
>>>>>>>>>>>>> file(s) is written on each window closing. Beam YAML already 
>>>>>>>>>>>>> supports any
>>>>>>>>>>>>> transform being given a "windowing" configuration which will 
>>>>>>>>>>>>> cause a
>>>>>>>>>>>>> WindowInto transform to be applied to its input(s) before 
>>>>>>>>>>>>> application which
>>>>>>>>>>>>> can sit naturally on a sink. We may want to consider if 
>>>>>>>>>>>>> non-windowed writes
>>>>>>>>>>>>> make sense as well (though how this interacts with the watermark 
>>>>>>>>>>>>> and
>>>>>>>>>>>>> underlying implementations are a large open question, so this is 
>>>>>>>>>>>>> a larger
>>>>>>>>>>>>> change that might make sense to defer).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Note that I am explicitly excluding "coders" here. All data in
>>>>>>>>>>>>> YAML should be schema'd, and writers should know how to write this
>>>>>>>>>>>>> structured data. We may want to allow a "schema" field to allow a 
>>>>>>>>>>>>> user to
>>>>>>>>>>>>> specify the desired schema in a manner compatible with the sink 
>>>>>>>>>>>>> format
>>>>>>>>>>>>> itself (e.g. avro, json, whatever) that could be used both for 
>>>>>>>>>>>>> validation
>>>>>>>>>>>>> and possibly resolving ambiguities (e.g. if the sink has an enum 
>>>>>>>>>>>>> format
>>>>>>>>>>>>> that is not expressed in the schema of the input PCollection).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Some other configuration options are that some formats
>>>>>>>>>>>>> (especially text-based ones) allow for specification of an 
>>>>>>>>>>>>> external
>>>>>>>>>>>>> compression type (which may be inferable from the suffix), 
>>>>>>>>>>>>> whether to write
>>>>>>>>>>>>> a single shard if the input collection is empty or no shards at 
>>>>>>>>>>>>> all (an
>>>>>>>>>>>>> occasional user request that's supported for some Beam sinks 
>>>>>>>>>>>>> now), whether
>>>>>>>>>>>>> to allow fixed sharing (generally discouraged, as it disables 
>>>>>>>>>>>>> things like
>>>>>>>>>>>>> automatic shading based on input size, let alone dynamic work 
>>>>>>>>>>>>> rebalancing,
>>>>>>>>>>>>> though sometimes this is useful if the input is known to be small 
>>>>>>>>>>>>> and a
>>>>>>>>>>>>> single output is desired regardless of the restriction in 
>>>>>>>>>>>>> parallelism), or
>>>>>>>>>>>>> other sharding parameters (e.g. limiting the number of total 
>>>>>>>>>>>>> elements or
>>>>>>>>>>>>> (approximately) total number of bytes per output shard). Some of 
>>>>>>>>>>>>> these
>>>>>>>>>>>>> options may not be available/implemented for all 
>>>>>>>>>>>>> formats--consideration
>>>>>>>>>>>>> should be given as to how to handle this inconsistency (runtime 
>>>>>>>>>>>>> errors for
>>>>>>>>>>>>> unsupported combinations or simply not allowing them on any until 
>>>>>>>>>>>>> all are
>>>>>>>>>>>>> supported).
>>>>>>>>>>>>>
>>>>>>>>>>>>> A final consideration: we do not anticipate exposing the full
>>>>>>>>>>>>> complexity of Beam in the YAML offering. For advanced users using 
>>>>>>>>>>>>> a "real"
>>>>>>>>>>>>> SDK will often be preferable, and we intend to provide a 
>>>>>>>>>>>>> migration path
>>>>>>>>>>>>> from YAML to a language of your choice (codegen) as a migration 
>>>>>>>>>>>>> path. So we
>>>>>>>>>>>>> should balance simplicity with completeness and utility here.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Sure, we could just pick something, but given that the
>>>>>>>>>>>>> main point of YAML is not capability, but expressibility and 
>>>>>>>>>>>>> ease-of-use, I
>>>>>>>>>>>>> think it's worth trying to get the expression of these concepts 
>>>>>>>>>>>>> right. I'm
>>>>>>>>>>>>> sure many of you have written a pipeline to files at some point 
>>>>>>>>>>>>> in time;
>>>>>>>>>>>>> I'd welcome any thoughts anyone has on the matter.
>>>>>>>>>>>>>
>>>>>>>>>>>>> - Robert
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> P.S. A related consideration: how should we consider the plain
>>>>>>>>>>>>> Read (where that file pattern is given at pipeline construction) 
>>>>>>>>>>>>> from the
>>>>>>>>>>>>> ReadAll variants? Should they be separate transforms, or should 
>>>>>>>>>>>>> we instead
>>>>>>>>>>>>> allow the same named transform (e.g. ReadFromParquet) support 
>>>>>>>>>>>>> both modes,
>>>>>>>>>>>>> depending on whether an input PCollection or explicit file path 
>>>>>>>>>>>>> is given
>>>>>>>>>>>>> (the two being mutually exclusive, with exactly one required, and 
>>>>>>>>>>>>> good
>>>>>>>>>>>>> error messaging of course)?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Java:
>>>>>>>>>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html
>>>>>>>>>>>>> Python:
>>>>>>>>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText
>>>>>>>>>>>>> Go:
>>>>>>>>>>>>> https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write
>>>>>>>>>>>>> Typescript:
>>>>>>>>>>>>> https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html
>>>>>>>>>>>>> Scio:
>>>>>>>>>>>>> https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html
>>>>>>>>>>>>>
>>>>>>>>>>>>>

Reply via email to