On Tue, Oct 10, 2023 at 7:22 AM Byron Ellis via dev <dev@beam.apache.org>
wrote:

> FWIW dbt (which is also YAML and has this problem for other reasons) does
> something like this. It also chooses to assume that everything is a string
> but allows users to use the Jinja templating language to make those strings
> dynamic where needed.
>

Are these only for values that are filled in at runtime (i.e. jinja is a
pre-processor used before the yaml file is passed to dbt) or can they be
plugged in (possibly on a per-record basis) from the data itself? (FWIW, I
think we also want to allow some kind of templating like this to allow for
parameterized composite PTransforms to be define in YAML and additionally
we'll need it for YAML-defined templates (not to be confused with the YAML
template which is a single flext template whose single parameter is the
YAML file itself).


> Syntactically I think that's a bit nicer to look at than the shell script
> style and saves having to remember the difference between $() and ${}
>

+1


> On Tue, Oct 10, 2023 at 7:10 AM Kenneth Knowles <k...@apache.org> wrote:
>
>> Since I've been in GHA files lately...
>>
>> I think they have a very useful pattern which we could borrow from or
>> learn from, where setting up the variables happens separately, like
>> https://github.com/apache/beam/blob/57821c191d322f9f21c01a34c55e0c40eda44f1e/.github/workflows/build_release_candidate.yml#L270
>>
>> If we called the section "vars" and then the config could use the vars in
>> the destination. I'm making this example deliberately a little gross:
>>
>>  - vars:
>>     - USER_REGION: $.user.metadata.region
>>     - USER_GROUP: $.user.groups[0].name
>>  - config:
>>     - path:
>> gs://output-bucket-${vars.USER_REGION}/files/${vars.USER_GROUP}-${fileio.SHARD_NUM}-${fileio.WINDOW}
>>
>> I think it strikes a good balance between arbitrary lambdas and just a
>> prefix/suffix control, giving a really easy place where we can say "the
>> whole value of this YAML field is a path expression into the structured
>> data"
>>
>> Kenn
>>
>> On Mon, Oct 9, 2023 at 6:09 PM Chamikara Jayalath via dev <
>> dev@beam.apache.org> wrote:
>>
>>> I would say:
>>>
>>>     sink:
>>>       type: WriteToParquet
>>>       config:
>>>         path: /beam/filesytem/dest
>>>         prefix: <my prefix>
>>>         suffix: <my suffix>
>>>
>>> Underlying SDK will add the middle part of the file names to make sure
>>> that files generated by various bundles/windows/shards do not conflict.
>>>
>>> This will satisfy the vast majority of use-cases I believe. Fully
>>> customizing the file pattern sounds like a more advanced use case that can
>>> be left for "real" SDKs.
>>>
>>> For dynamic destinations, I think just making the "path" component
>>> support  a lambda that is parameterized by the input should be adequate
>>> since this allows customers to direct files written to different
>>> destination directories.
>>>
>>>     sink:
>>>       type: WriteToParquet
>>>       config:
>>>         path: <destination lambda>
>>>         prefix: <my prefix>
>>>         suffix: <my suffix>
>>>
>>> I'm not sure what would be the best way to specify a lambda here though.
>>> Maybe a regex or the name of a Python callable ?
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
>>> dev@beam.apache.org> wrote:
>>>
>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Just FYI - the reason why names (including prefixes) in
>>>>> DynamicDestinations were parameterized via a lambda instead of just having
>>>>> the user add it via MapElements is performance. We discussed something
>>>>> along the lines of what you are suggesting (essentially having the user
>>>>> create a KV where the key contained the dynamic information). The problem
>>>>> was that often the size of the generated filepath was often much larger
>>>>> (sometimes by 2 OOM) than the information in the record, and there was a
>>>>> desire to avoid record blowup. e.g. the record might contain a single
>>>>> integer userid, and the filepath prefix would then be
>>>>> /long/path/to/output/users/<id>. This was especially bad in cases where 
>>>>> the
>>>>> data had to be shuffled, and the existing dynamic destinations method
>>>>> allowed extracting the filepath only _after_  the shuffle.
>>>>>
>>>>
>>>> That is a consideration I hadn't thought much of, thanks for
>>>> bringing this up.
>>>>
>>>>
>>>>> Now there may not be any good way to keep this benefit in a
>>>>> declarative approach such as YAML (or at least a good easy way - we could
>>>>> always allow the user to pass in a SQL expression to extract the filename
>>>>> from the record!), but we should keep in mind that this might mean that
>>>>> YAML-generated pipelines will be less efficient for certain use cases.
>>>>>
>>>>
>>>> Yep, it's not as straightforward to do in a declarative way. I would
>>>> like to avoid mixing UDFs (with their associated languages and execution
>>>> environments) if possible. Though I'd like the performance of a
>>>> "straightforward" YAML pipeline to be that which one can get writing
>>>> straight-line Java (and possibly better, if we can leverage the structure
>>>> of schemas everywhere) this is not an absolute requirement for all
>>>> features.
>>>>
>>>> I wonder if separating out a constant prefix vs. the dynamic stuff
>>>> could be sufficient to mitigate the blow-up of pre-computing this in most
>>>> cases (especially in the context of a larger pipeline). Alternatively,
>>>> rather than just a sharding pattern, one could have a full filepattern that
>>>> includes format parameters for dynamically computed bits as well as the
>>>> shard number, windowing info, etc. (There are pros and cons to this.)
>>>>
>>>>
>>>>> On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> Currently the various file writing configurations take a single
>>>>>> parameter, path, which indicates where the (sharded) output should be
>>>>>> placed. In other words, one can write something like
>>>>>>
>>>>>>   pipeline:
>>>>>>     ...
>>>>>>     sink:
>>>>>>       type: WriteToParquet
>>>>>>       config:
>>>>>>         path: /beam/filesytem/dest
>>>>>>
>>>>>> and one gets files like "/beam/filesystem/dest-X-of-N"
>>>>>>
>>>>>> Of course, in practice file writing is often much more complicated
>>>>>> than this (especially when it comes to Streaming). For reference, I've
>>>>>> included links to our existing offerings in the various SDKs below. I'd
>>>>>> like to start a discussion about what else should go in the "config"
>>>>>> parameter and how it should be expressed in YAML.
>>>>>>
>>>>>> The primary concern is around naming. This can generally be split
>>>>>> into (1) the prefix, which must be provided by the users (2) the sharing
>>>>>> information, includes both shard counts (e.g. (the -X-of-N suffix) but 
>>>>>> also
>>>>>> windowing information (for streaming pipelines) which we may want to 
>>>>>> allow
>>>>>> the user to customize the formatting of, and (3) a suffix like .json or
>>>>>> .avro that is useful for both humans and tooling and can often be 
>>>>>> inferred
>>>>>> but should allow customization as well.
>>>>>>
>>>>>> An interesting case is that of dynamic destinations, where the prefix
>>>>>> (or other parameters) may themselves be functions of the records
>>>>>> themselves. (I am excluding the case where the format itself is
>>>>>> variable--such cases are probably better handled by explicitly 
>>>>>> partitioning
>>>>>> the data and doing multiple writes, as this introduces significant
>>>>>> complexities and the set of possible formats is generally finite and 
>>>>>> known
>>>>>> ahead of time.) I propose that we leverage the fact that we have 
>>>>>> structured
>>>>>> data to be able to pull out these dynamic parameters. For example, if we
>>>>>> have an input data set with a string column my_col we could allow 
>>>>>> something
>>>>>> like
>>>>>>
>>>>>>   config:
>>>>>>     path: {dynamic: my_col}
>>>>>>
>>>>>> which would pull this information out at runtime. (With the
>>>>>> MapToFields transform, it is very easy to compute/append additional 
>>>>>> fields
>>>>>> to existing records.) Generally this field would then be stripped from 
>>>>>> the
>>>>>> written data, which would only see the subset of non-dynamically 
>>>>>> referenced
>>>>>> columns (though this could be configurable: we could add an attribute 
>>>>>> like
>>>>>> {dynamic: my_col, Keep: true} or require the set of columns to be 
>>>>>> actually
>>>>>> written (or elided) to be enumerated in the config or allow/require the
>>>>>> actual data to be written to be in a designated field of the "full" input
>>>>>> records as arranged by a preceding transform). It'd be great to get
>>>>>> input/impressions from a wide range of people here on what would be the
>>>>>> most natural. Often just writing out snippets of various alternatives can
>>>>>> be quite informative (though I'm avoiding putting them here for the 
>>>>>> moment
>>>>>> to avoid biasing ideas right off the bat).
>>>>>>
>>>>>> For streaming pipelines it is often essential to write data out in a
>>>>>> time-partitioned manner. The typical way to do this is to add the 
>>>>>> windowing
>>>>>> information into the shard specification itself, and a (set of) file(s) 
>>>>>> is
>>>>>> written on each window closing. Beam YAML already supports any transform
>>>>>> being given a "windowing" configuration which will cause a WindowInto
>>>>>> transform to be applied to its input(s) before application which can sit
>>>>>> naturally on a sink. We may want to consider if non-windowed writes make
>>>>>> sense as well (though how this interacts with the watermark and 
>>>>>> underlying
>>>>>> implementations are a large open question, so this is a larger change 
>>>>>> that
>>>>>> might make sense to defer).
>>>>>>
>>>>>> Note that I am explicitly excluding "coders" here. All data in YAML
>>>>>> should be schema'd, and writers should know how to write this structured
>>>>>> data. We may want to allow a "schema" field to allow a user to specify 
>>>>>> the
>>>>>> desired schema in a manner compatible with the sink format itself (e.g.
>>>>>> avro, json, whatever) that could be used both for validation and possibly
>>>>>> resolving ambiguities (e.g. if the sink has an enum format that is not
>>>>>> expressed in the schema of the input PCollection).
>>>>>>
>>>>>> Some other configuration options are that some formats (especially
>>>>>> text-based ones) allow for specification of an external compression type
>>>>>> (which may be inferable from the suffix), whether to write a single shard
>>>>>> if the input collection is empty or no shards at all (an occasional user
>>>>>> request that's supported for some Beam sinks now), whether to allow fixed
>>>>>> sharing (generally discouraged, as it disables things like automatic
>>>>>> shading based on input size, let alone dynamic work rebalancing, though
>>>>>> sometimes this is useful if the input is known to be small and a single
>>>>>> output is desired regardless of the restriction in parallelism), or other
>>>>>> sharding parameters (e.g. limiting the number of total elements or
>>>>>> (approximately) total number of bytes per output shard). Some of these
>>>>>> options may not be available/implemented for all formats--consideration
>>>>>> should be given as to how to handle this inconsistency (runtime errors 
>>>>>> for
>>>>>> unsupported combinations or simply not allowing them on any until all are
>>>>>> supported).
>>>>>>
>>>>>> A final consideration: we do not anticipate exposing the full
>>>>>> complexity of Beam in the YAML offering. For advanced users using a 
>>>>>> "real"
>>>>>> SDK will often be preferable, and we intend to provide a migration path
>>>>>> from YAML to a language of your choice (codegen) as a migration path. So 
>>>>>> we
>>>>>> should balance simplicity with completeness and utility here.
>>>>>>
>>>>>> Sure, we could just pick something, but given that the main point of
>>>>>> YAML is not capability, but expressibility and ease-of-use, I think it's
>>>>>> worth trying to get the expression of these concepts right. I'm sure many
>>>>>> of you have written a pipeline to files at some point in time; I'd 
>>>>>> welcome
>>>>>> any thoughts anyone has on the matter.
>>>>>>
>>>>>> - Robert
>>>>>>
>>>>>>
>>>>>> P.S. A related consideration: how should we consider the plain Read
>>>>>> (where that file pattern is given at pipeline construction) from the
>>>>>> ReadAll variants? Should they be separate transforms, or should we 
>>>>>> instead
>>>>>> allow the same named transform (e.g. ReadFromParquet) support both modes,
>>>>>> depending on whether an input PCollection or explicit file path is given
>>>>>> (the two being mutually exclusive, with exactly one required, and good
>>>>>> error messaging of course)?
>>>>>>
>>>>>>
>>>>>> Java:
>>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html
>>>>>> Python:
>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText
>>>>>> Go:
>>>>>> https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write
>>>>>> Typescript:
>>>>>> https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html
>>>>>> Scio:
>>>>>> https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html
>>>>>>
>>>>>>

Reply via email to