OK, so how about this for a concrete proposal:

    sink:
      type: WriteToParquet
      config:
        path:
"/beam/filesytem/{record.my_col}-{timestamp.year}{timestamp.month}{timestamp.day}"
        suffix: ".parquet"

The eventual path would be <path><sharding_info><suffix>. The suffix
would be optional, and there could be a default for the specific file
format. A file format could inspect a provided suffix like ".csv.gz" to
infer compression as well.

Note that this doesn't have any special indicators for being dynamic other
than the {}'s. Also, my_col would be written as part of the data (but we
could add an extra "elide" config parameter that takes a list of columns to
exclude if desired).

We could call this "prefix" rather than path. (Path is symmetric with
reading, but prefix is a bit more direct.) Anyone want to voice
their opinion here?




On Wed, Oct 11, 2023 at 9:01 AM Chamikara Jayalath <chamik...@google.com>
wrote:

>
>
> On Wed, Oct 11, 2023 at 6:55 AM Kenneth Knowles <k...@apache.org> wrote:
>
>> So, top-posting because the threading got to be a lot for me and I think
>> it forked a bit too... I may even be restating something someone said, so
>> apologies for that.
>>
>> Very very good point about *required* parameters where if you don't use
>> them then you will end up with two writers writing to the same file. The
>> easiest example to work with might be if you omitted SHARD_NUM so all
>> shards end up clobbering the same file.
>>
>> I think there's a unifying perspective between prefix/suffix and the need
>> to be sure to include critical sharding variables. Essentially it is my
>> point about it being a "big data fileset". It is perhaps unrealistic but
>> ideally the user names the big data fileset and then the mandatory other
>> pieces are added outside of their control. For example if I name my big
>> data fileset "foo" then that implicitly means that "foo" consists of all
>> the files named "foo/${SHARD_NUM}-of-${SHARD_TOTAL}". And yes now that I
>> re-read I see you basically said the same thing. In some cases the required
>> fields will include $WINDOW, $KEY, and $PANE_INDEX, yes? Even though the
>> user can think of it as a textual template, if we can use a library that
>> yields an abstract syntax tree for the expression we can easily check these
>> requirements in a robust way - or we could do it in a non-robust way be
>> string-scraping ourselves.
>>
>
> Yes. I think we are talking about the same thing. Users should not have
> full control over the filename since that could lead to conflicts and data
> loss when data is being written in parallel from multiple workers. Users
> can refer to the big data fileset being written using the glob "<path>/**".
> In addition users have control over the filename <prefix> and <suffix>
> (file extension, for example) which can be useful for some downstream
> use-cases. Rest of the filename will be filled out by the SDK (window, pane
> etc.) to make sure that the files written by different workers do not
> conflict.
>
> Thanks,
> Cham
>
>
>>
>> We actually are very close to this in FileIO. I think the interpretation
>> of "prefix" is that it is the filename "foo" as above, and "suffix" is
>> really something like ".txt" that you stick on the end of everything for
>> whatever reason.
>>
>> Kenn
>>
>> On Tue, Oct 10, 2023 at 7:12 PM Robert Bradshaw via dev <
>> dev@beam.apache.org> wrote:
>>
>>> On Tue, Oct 10, 2023 at 4:05 PM Chamikara Jayalath <chamik...@google.com>
>>> wrote:
>>>
>>>>
>>>> On Tue, Oct 10, 2023 at 4:02 PM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>>
>>>>> On Tue, Oct 10, 2023 at 3:53 PM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>>
>>>>>> On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> I suspect some simple pattern templating would solve most use cases.
>>>>>>> We probably would want to support timestamp formatting (e.g. $YYYY $M 
>>>>>>> $D)
>>>>>>> as well.
>>>>>>>
>>>>>>> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw <rober...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath <
>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>
>>>>>>>>> I would say:
>>>>>>>>>
>>>>>>>>>     sink:
>>>>>>>>>       type: WriteToParquet
>>>>>>>>>       config:
>>>>>>>>>         path: /beam/filesytem/dest
>>>>>>>>>         prefix: <my prefix>
>>>>>>>>>         suffix: <my suffix>
>>>>>>>>>
>>>>>>>>> Underlying SDK will add the middle part of the file names to make
>>>>>>>>> sure that files generated by various bundles/windows/shards do not 
>>>>>>>>> conflict.
>>>>>>>>>
>>>>>>>>
>>>>>>>> What's the relationship between path and prefix? Is path the
>>>>>>>> directory part of the full path, or does prefix precede it?
>>>>>>>>
>>>>>>>
>>>>>> prefix would be the first part of the file name so each shard will be
>>>>>> named.
>>>>>> <path>/<prefix>-<shard components added by the runner>-<suffix>
>>>>>>
>>>>>> This is similar to what we do in existing SDKS. For example, Java
>>>>>> FileIO,
>>>>>>
>>>>>>
>>>>>> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L187
>>>>>>
>>>>>
>>>>> Yeah, although there's no distinction between path and prefix.
>>>>>
>>>>
>>>> Ah, for FIleIO, path comes from the "to" call.
>>>>
>>>>
>>>> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1125
>>>>
>>>>
>>>
>>> Ah. I guess there's some inconsistency here, e.g. text files are written
>>> to a filenamePrefix that subsumes both:
>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html#to-java.lang.String-
>>>
>>>
>>>>
>>>>>
>>>>>>>>
>>>>>>>>> This will satisfy the vast majority of use-cases I believe. Fully
>>>>>>>>> customizing the file pattern sounds like a more advanced use case 
>>>>>>>>> that can
>>>>>>>>> be left for "real" SDKs.
>>>>>>>>>
>>>>>>>>
>>>>>>>> Yea, we don't have to do everything.
>>>>>>>>
>>>>>>>>
>>>>>>>>> For dynamic destinations, I think just making the "path" component
>>>>>>>>> support  a lambda that is parameterized by the input should be 
>>>>>>>>> adequate
>>>>>>>>> since this allows customers to direct files written to different
>>>>>>>>> destination directories.
>>>>>>>>>
>>>>>>>>>     sink:
>>>>>>>>>       type: WriteToParquet
>>>>>>>>>       config:
>>>>>>>>>         path: <destination lambda>
>>>>>>>>>         prefix: <my prefix>
>>>>>>>>>         suffix: <my suffix>
>>>>>>>>>
>>>>>>>>> I'm not sure what would be the best way to specify a lambda here
>>>>>>>>> though. Maybe a regex or the name of a Python callable ?
>>>>>>>>>
>>>>>>>>
>>>>>>>> I'd rather not require Python for a pure Java pipeline, but some
>>>>>>>> kind of a pattern template may be sufficient here.
>>>>>>>>
>>>>>>>>
>>>>>>>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
>>>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Just FYI - the reason why names (including prefixes) in
>>>>>>>>>>> DynamicDestinations were parameterized via a lambda instead of just 
>>>>>>>>>>> having
>>>>>>>>>>> the user add it via MapElements is performance. We discussed 
>>>>>>>>>>> something
>>>>>>>>>>> along the lines of what you are suggesting (essentially having the 
>>>>>>>>>>> user
>>>>>>>>>>> create a KV where the key contained the dynamic information). The 
>>>>>>>>>>> problem
>>>>>>>>>>> was that often the size of the generated filepath was often much 
>>>>>>>>>>> larger
>>>>>>>>>>> (sometimes by 2 OOM) than the information in the record, and there 
>>>>>>>>>>> was a
>>>>>>>>>>> desire to avoid record blowup. e.g. the record might contain a 
>>>>>>>>>>> single
>>>>>>>>>>> integer userid, and the filepath prefix would then be
>>>>>>>>>>> /long/path/to/output/users/<id>. This was especially bad in cases 
>>>>>>>>>>> where the
>>>>>>>>>>> data had to be shuffled, and the existing dynamic destinations 
>>>>>>>>>>> method
>>>>>>>>>>> allowed extracting the filepath only _after_  the shuffle.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> That is a consideration I hadn't thought much of, thanks for
>>>>>>>>>> bringing this up.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> Now there may not be any good way to keep this benefit in a
>>>>>>>>>>> declarative approach such as YAML (or at least a good easy way - we 
>>>>>>>>>>> could
>>>>>>>>>>> always allow the user to pass in a SQL expression to extract the 
>>>>>>>>>>> filename
>>>>>>>>>>> from the record!), but we should keep in mind that this might mean 
>>>>>>>>>>> that
>>>>>>>>>>> YAML-generated pipelines will be less efficient for certain use 
>>>>>>>>>>> cases.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Yep, it's not as straightforward to do in a declarative way. I
>>>>>>>>>> would like to avoid mixing UDFs (with their associated languages and
>>>>>>>>>> execution environments) if possible. Though I'd like the performance 
>>>>>>>>>> of a
>>>>>>>>>> "straightforward" YAML pipeline to be that which one can get writing
>>>>>>>>>> straight-line Java (and possibly better, if we can leverage the 
>>>>>>>>>> structure
>>>>>>>>>> of schemas everywhere) this is not an absolute requirement for all
>>>>>>>>>> features.
>>>>>>>>>>
>>>>>>>>>> I wonder if separating out a constant prefix vs. the dynamic
>>>>>>>>>> stuff could be sufficient to mitigate the blow-up of pre-computing 
>>>>>>>>>> this in
>>>>>>>>>> most cases (especially in the context of a larger pipeline). 
>>>>>>>>>> Alternatively,
>>>>>>>>>> rather than just a sharding pattern, one could have a full 
>>>>>>>>>> filepattern that
>>>>>>>>>> includes format parameters for dynamically computed bits as well as 
>>>>>>>>>> the
>>>>>>>>>> shard number, windowing info, etc. (There are pros and cons to this.)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev <
>>>>>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Currently the various file writing configurations take a single
>>>>>>>>>>>> parameter, path, which indicates where the (sharded) output should 
>>>>>>>>>>>> be
>>>>>>>>>>>> placed. In other words, one can write something like
>>>>>>>>>>>>
>>>>>>>>>>>>   pipeline:
>>>>>>>>>>>>     ...
>>>>>>>>>>>>     sink:
>>>>>>>>>>>>       type: WriteToParquet
>>>>>>>>>>>>       config:
>>>>>>>>>>>>         path: /beam/filesytem/dest
>>>>>>>>>>>>
>>>>>>>>>>>> and one gets files like "/beam/filesystem/dest-X-of-N"
>>>>>>>>>>>>
>>>>>>>>>>>> Of course, in practice file writing is often much more
>>>>>>>>>>>> complicated than this (especially when it comes to Streaming). For
>>>>>>>>>>>> reference, I've included links to our existing offerings in the 
>>>>>>>>>>>> various
>>>>>>>>>>>> SDKs below. I'd like to start a discussion about what else should 
>>>>>>>>>>>> go in the
>>>>>>>>>>>> "config" parameter and how it should be expressed in YAML.
>>>>>>>>>>>>
>>>>>>>>>>>> The primary concern is around naming. This can generally be
>>>>>>>>>>>> split into (1) the prefix, which must be provided by the users (2) 
>>>>>>>>>>>> the
>>>>>>>>>>>> sharing information, includes both shard counts (e.g. (the -X-of-N 
>>>>>>>>>>>> suffix)
>>>>>>>>>>>> but also windowing information (for streaming pipelines) which we 
>>>>>>>>>>>> may want
>>>>>>>>>>>> to allow the user to customize the formatting of, and (3) a suffix 
>>>>>>>>>>>> like
>>>>>>>>>>>> .json or .avro that is useful for both humans and tooling and can 
>>>>>>>>>>>> often be
>>>>>>>>>>>> inferred but should allow customization as well.
>>>>>>>>>>>>
>>>>>>>>>>>> An interesting case is that of dynamic destinations, where the
>>>>>>>>>>>> prefix (or other parameters) may themselves be functions of the 
>>>>>>>>>>>> records
>>>>>>>>>>>> themselves. (I am excluding the case where the format itself is
>>>>>>>>>>>> variable--such cases are probably better handled by explicitly 
>>>>>>>>>>>> partitioning
>>>>>>>>>>>> the data and doing multiple writes, as this introduces significant
>>>>>>>>>>>> complexities and the set of possible formats is generally finite 
>>>>>>>>>>>> and known
>>>>>>>>>>>> ahead of time.) I propose that we leverage the fact that we have 
>>>>>>>>>>>> structured
>>>>>>>>>>>> data to be able to pull out these dynamic parameters. For example, 
>>>>>>>>>>>> if we
>>>>>>>>>>>> have an input data set with a string column my_col we could allow 
>>>>>>>>>>>> something
>>>>>>>>>>>> like
>>>>>>>>>>>>
>>>>>>>>>>>>   config:
>>>>>>>>>>>>     path: {dynamic: my_col}
>>>>>>>>>>>>
>>>>>>>>>>>> which would pull this information out at runtime. (With the
>>>>>>>>>>>> MapToFields transform, it is very easy to compute/append 
>>>>>>>>>>>> additional fields
>>>>>>>>>>>> to existing records.) Generally this field would then be stripped 
>>>>>>>>>>>> from the
>>>>>>>>>>>> written data, which would only see the subset of non-dynamically 
>>>>>>>>>>>> referenced
>>>>>>>>>>>> columns (though this could be configurable: we could add an 
>>>>>>>>>>>> attribute like
>>>>>>>>>>>> {dynamic: my_col, Keep: true} or require the set of columns to be 
>>>>>>>>>>>> actually
>>>>>>>>>>>> written (or elided) to be enumerated in the config or 
>>>>>>>>>>>> allow/require the
>>>>>>>>>>>> actual data to be written to be in a designated field of the 
>>>>>>>>>>>> "full" input
>>>>>>>>>>>> records as arranged by a preceding transform). It'd be great to get
>>>>>>>>>>>> input/impressions from a wide range of people here on what would 
>>>>>>>>>>>> be the
>>>>>>>>>>>> most natural. Often just writing out snippets of various 
>>>>>>>>>>>> alternatives can
>>>>>>>>>>>> be quite informative (though I'm avoiding putting them here for 
>>>>>>>>>>>> the moment
>>>>>>>>>>>> to avoid biasing ideas right off the bat).
>>>>>>>>>>>>
>>>>>>>>>>>> For streaming pipelines it is often essential to write data out
>>>>>>>>>>>> in a time-partitioned manner. The typical way to do this is to add 
>>>>>>>>>>>> the
>>>>>>>>>>>> windowing information into the shard specification itself, and a 
>>>>>>>>>>>> (set of)
>>>>>>>>>>>> file(s) is written on each window closing. Beam YAML already 
>>>>>>>>>>>> supports any
>>>>>>>>>>>> transform being given a "windowing" configuration which will cause 
>>>>>>>>>>>> a
>>>>>>>>>>>> WindowInto transform to be applied to its input(s) before 
>>>>>>>>>>>> application which
>>>>>>>>>>>> can sit naturally on a sink. We may want to consider if 
>>>>>>>>>>>> non-windowed writes
>>>>>>>>>>>> make sense as well (though how this interacts with the watermark 
>>>>>>>>>>>> and
>>>>>>>>>>>> underlying implementations are a large open question, so this is a 
>>>>>>>>>>>> larger
>>>>>>>>>>>> change that might make sense to defer).
>>>>>>>>>>>>
>>>>>>>>>>>> Note that I am explicitly excluding "coders" here. All data in
>>>>>>>>>>>> YAML should be schema'd, and writers should know how to write this
>>>>>>>>>>>> structured data. We may want to allow a "schema" field to allow a 
>>>>>>>>>>>> user to
>>>>>>>>>>>> specify the desired schema in a manner compatible with the sink 
>>>>>>>>>>>> format
>>>>>>>>>>>> itself (e.g. avro, json, whatever) that could be used both for 
>>>>>>>>>>>> validation
>>>>>>>>>>>> and possibly resolving ambiguities (e.g. if the sink has an enum 
>>>>>>>>>>>> format
>>>>>>>>>>>> that is not expressed in the schema of the input PCollection).
>>>>>>>>>>>>
>>>>>>>>>>>> Some other configuration options are that some formats
>>>>>>>>>>>> (especially text-based ones) allow for specification of an external
>>>>>>>>>>>> compression type (which may be inferable from the suffix), whether 
>>>>>>>>>>>> to write
>>>>>>>>>>>> a single shard if the input collection is empty or no shards at 
>>>>>>>>>>>> all (an
>>>>>>>>>>>> occasional user request that's supported for some Beam sinks now), 
>>>>>>>>>>>> whether
>>>>>>>>>>>> to allow fixed sharing (generally discouraged, as it disables 
>>>>>>>>>>>> things like
>>>>>>>>>>>> automatic shading based on input size, let alone dynamic work 
>>>>>>>>>>>> rebalancing,
>>>>>>>>>>>> though sometimes this is useful if the input is known to be small 
>>>>>>>>>>>> and a
>>>>>>>>>>>> single output is desired regardless of the restriction in 
>>>>>>>>>>>> parallelism), or
>>>>>>>>>>>> other sharding parameters (e.g. limiting the number of total 
>>>>>>>>>>>> elements or
>>>>>>>>>>>> (approximately) total number of bytes per output shard). Some of 
>>>>>>>>>>>> these
>>>>>>>>>>>> options may not be available/implemented for all 
>>>>>>>>>>>> formats--consideration
>>>>>>>>>>>> should be given as to how to handle this inconsistency (runtime 
>>>>>>>>>>>> errors for
>>>>>>>>>>>> unsupported combinations or simply not allowing them on any until 
>>>>>>>>>>>> all are
>>>>>>>>>>>> supported).
>>>>>>>>>>>>
>>>>>>>>>>>> A final consideration: we do not anticipate exposing the full
>>>>>>>>>>>> complexity of Beam in the YAML offering. For advanced users using 
>>>>>>>>>>>> a "real"
>>>>>>>>>>>> SDK will often be preferable, and we intend to provide a migration 
>>>>>>>>>>>> path
>>>>>>>>>>>> from YAML to a language of your choice (codegen) as a migration 
>>>>>>>>>>>> path. So we
>>>>>>>>>>>> should balance simplicity with completeness and utility here.
>>>>>>>>>>>>
>>>>>>>>>>>> Sure, we could just pick something, but given that the
>>>>>>>>>>>> main point of YAML is not capability, but expressibility and 
>>>>>>>>>>>> ease-of-use, I
>>>>>>>>>>>> think it's worth trying to get the expression of these concepts 
>>>>>>>>>>>> right. I'm
>>>>>>>>>>>> sure many of you have written a pipeline to files at some point in 
>>>>>>>>>>>> time;
>>>>>>>>>>>> I'd welcome any thoughts anyone has on the matter.
>>>>>>>>>>>>
>>>>>>>>>>>> - Robert
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> P.S. A related consideration: how should we consider the plain
>>>>>>>>>>>> Read (where that file pattern is given at pipeline construction) 
>>>>>>>>>>>> from the
>>>>>>>>>>>> ReadAll variants? Should they be separate transforms, or should we 
>>>>>>>>>>>> instead
>>>>>>>>>>>> allow the same named transform (e.g. ReadFromParquet) support both 
>>>>>>>>>>>> modes,
>>>>>>>>>>>> depending on whether an input PCollection or explicit file path is 
>>>>>>>>>>>> given
>>>>>>>>>>>> (the two being mutually exclusive, with exactly one required, and 
>>>>>>>>>>>> good
>>>>>>>>>>>> error messaging of course)?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Java:
>>>>>>>>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html
>>>>>>>>>>>> Python:
>>>>>>>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText
>>>>>>>>>>>> Go:
>>>>>>>>>>>> https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write
>>>>>>>>>>>> Typescript:
>>>>>>>>>>>> https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html
>>>>>>>>>>>> Scio:
>>>>>>>>>>>> https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html
>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to