On Tue, Oct 10, 2023 at 4:05 PM Chamikara Jayalath <chamik...@google.com>
wrote:

>
> On Tue, Oct 10, 2023 at 4:02 PM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> On Tue, Oct 10, 2023 at 3:53 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>>
>>> On Tue, Oct 10, 2023 at 3:41 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I suspect some simple pattern templating would solve most use cases. We
>>>> probably would want to support timestamp formatting (e.g. $YYYY $M $D) as
>>>> well.
>>>>
>>>> On Tue, Oct 10, 2023 at 3:35 PM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>>
>>>>> On Mon, Oct 9, 2023 at 3:09 PM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>> I would say:
>>>>>>
>>>>>>     sink:
>>>>>>       type: WriteToParquet
>>>>>>       config:
>>>>>>         path: /beam/filesytem/dest
>>>>>>         prefix: <my prefix>
>>>>>>         suffix: <my suffix>
>>>>>>
>>>>>> Underlying SDK will add the middle part of the file names to make
>>>>>> sure that files generated by various bundles/windows/shards do not 
>>>>>> conflict.
>>>>>>
>>>>>
>>>>> What's the relationship between path and prefix? Is path the
>>>>> directory part of the full path, or does prefix precede it?
>>>>>
>>>>
>>> prefix would be the first part of the file name so each shard will be
>>> named.
>>> <path>/<prefix>-<shard components added by the runner>-<suffix>
>>>
>>> This is similar to what we do in existing SDKS. For example, Java FileIO,
>>>
>>>
>>> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L187
>>>
>>
>> Yeah, although there's no distinction between path and prefix.
>>
>
> Ah, for FIleIO, path comes from the "to" call.
>
>
> https://github.com/apache/beam/blob/65eaf45026e9eeb61a9e05412488e5858faec6de/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L1125
>
>

Ah. I guess there's some inconsistency here, e.g. text files are written to
a filenamePrefix that subsumes both:
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html#to-java.lang.String-


>
>>
>>>>>
>>>>>> This will satisfy the vast majority of use-cases I believe. Fully
>>>>>> customizing the file pattern sounds like a more advanced use case that 
>>>>>> can
>>>>>> be left for "real" SDKs.
>>>>>>
>>>>>
>>>>> Yea, we don't have to do everything.
>>>>>
>>>>>
>>>>>> For dynamic destinations, I think just making the "path" component
>>>>>> support  a lambda that is parameterized by the input should be adequate
>>>>>> since this allows customers to direct files written to different
>>>>>> destination directories.
>>>>>>
>>>>>>     sink:
>>>>>>       type: WriteToParquet
>>>>>>       config:
>>>>>>         path: <destination lambda>
>>>>>>         prefix: <my prefix>
>>>>>>         suffix: <my suffix>
>>>>>>
>>>>>> I'm not sure what would be the best way to specify a lambda here
>>>>>> though. Maybe a regex or the name of a Python callable ?
>>>>>>
>>>>>
>>>>> I'd rather not require Python for a pure Java pipeline, but some kind
>>>>> of a pattern template may be sufficient here.
>>>>>
>>>>>
>>>>>> On Mon, Oct 9, 2023 at 2:06 PM Robert Bradshaw via dev <
>>>>>> dev@beam.apache.org> wrote:
>>>>>>
>>>>>>> .On Mon, Oct 9, 2023 at 1:49 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Just FYI - the reason why names (including prefixes) in
>>>>>>>> DynamicDestinations were parameterized via a lambda instead of just 
>>>>>>>> having
>>>>>>>> the user add it via MapElements is performance. We discussed something
>>>>>>>> along the lines of what you are suggesting (essentially having the user
>>>>>>>> create a KV where the key contained the dynamic information). The 
>>>>>>>> problem
>>>>>>>> was that often the size of the generated filepath was often much larger
>>>>>>>> (sometimes by 2 OOM) than the information in the record, and there was 
>>>>>>>> a
>>>>>>>> desire to avoid record blowup. e.g. the record might contain a single
>>>>>>>> integer userid, and the filepath prefix would then be
>>>>>>>> /long/path/to/output/users/<id>. This was especially bad in cases 
>>>>>>>> where the
>>>>>>>> data had to be shuffled, and the existing dynamic destinations method
>>>>>>>> allowed extracting the filepath only _after_  the shuffle.
>>>>>>>>
>>>>>>>
>>>>>>> That is a consideration I hadn't thought much of, thanks for
>>>>>>> bringing this up.
>>>>>>>
>>>>>>>
>>>>>>>> Now there may not be any good way to keep this benefit in a
>>>>>>>> declarative approach such as YAML (or at least a good easy way - we 
>>>>>>>> could
>>>>>>>> always allow the user to pass in a SQL expression to extract the 
>>>>>>>> filename
>>>>>>>> from the record!), but we should keep in mind that this might mean that
>>>>>>>> YAML-generated pipelines will be less efficient for certain use cases.
>>>>>>>>
>>>>>>>
>>>>>>> Yep, it's not as straightforward to do in a declarative way. I would
>>>>>>> like to avoid mixing UDFs (with their associated languages and execution
>>>>>>> environments) if possible. Though I'd like the performance of a
>>>>>>> "straightforward" YAML pipeline to be that which one can get writing
>>>>>>> straight-line Java (and possibly better, if we can leverage the 
>>>>>>> structure
>>>>>>> of schemas everywhere) this is not an absolute requirement for all
>>>>>>> features.
>>>>>>>
>>>>>>> I wonder if separating out a constant prefix vs. the dynamic stuff
>>>>>>> could be sufficient to mitigate the blow-up of pre-computing this in 
>>>>>>> most
>>>>>>> cases (especially in the context of a larger pipeline). Alternatively,
>>>>>>> rather than just a sharding pattern, one could have a full filepattern 
>>>>>>> that
>>>>>>> includes format parameters for dynamically computed bits as well as the
>>>>>>> shard number, windowing info, etc. (There are pros and cons to this.)
>>>>>>>
>>>>>>>
>>>>>>>> On Mon, Oct 9, 2023 at 12:37 PM Robert Bradshaw via dev <
>>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>>
>>>>>>>>> Currently the various file writing configurations take a single
>>>>>>>>> parameter, path, which indicates where the (sharded) output should be
>>>>>>>>> placed. In other words, one can write something like
>>>>>>>>>
>>>>>>>>>   pipeline:
>>>>>>>>>     ...
>>>>>>>>>     sink:
>>>>>>>>>       type: WriteToParquet
>>>>>>>>>       config:
>>>>>>>>>         path: /beam/filesytem/dest
>>>>>>>>>
>>>>>>>>> and one gets files like "/beam/filesystem/dest-X-of-N"
>>>>>>>>>
>>>>>>>>> Of course, in practice file writing is often much more complicated
>>>>>>>>> than this (especially when it comes to Streaming). For reference, I've
>>>>>>>>> included links to our existing offerings in the various SDKs below. 
>>>>>>>>> I'd
>>>>>>>>> like to start a discussion about what else should go in the "config"
>>>>>>>>> parameter and how it should be expressed in YAML.
>>>>>>>>>
>>>>>>>>> The primary concern is around naming. This can generally be split
>>>>>>>>> into (1) the prefix, which must be provided by the users (2) the 
>>>>>>>>> sharing
>>>>>>>>> information, includes both shard counts (e.g. (the -X-of-N suffix) 
>>>>>>>>> but also
>>>>>>>>> windowing information (for streaming pipelines) which we may want to 
>>>>>>>>> allow
>>>>>>>>> the user to customize the formatting of, and (3) a suffix like .json 
>>>>>>>>> or
>>>>>>>>> .avro that is useful for both humans and tooling and can often be 
>>>>>>>>> inferred
>>>>>>>>> but should allow customization as well.
>>>>>>>>>
>>>>>>>>> An interesting case is that of dynamic destinations, where the
>>>>>>>>> prefix (or other parameters) may themselves be functions of the 
>>>>>>>>> records
>>>>>>>>> themselves. (I am excluding the case where the format itself is
>>>>>>>>> variable--such cases are probably better handled by explicitly 
>>>>>>>>> partitioning
>>>>>>>>> the data and doing multiple writes, as this introduces significant
>>>>>>>>> complexities and the set of possible formats is generally finite and 
>>>>>>>>> known
>>>>>>>>> ahead of time.) I propose that we leverage the fact that we have 
>>>>>>>>> structured
>>>>>>>>> data to be able to pull out these dynamic parameters. For example, if 
>>>>>>>>> we
>>>>>>>>> have an input data set with a string column my_col we could allow 
>>>>>>>>> something
>>>>>>>>> like
>>>>>>>>>
>>>>>>>>>   config:
>>>>>>>>>     path: {dynamic: my_col}
>>>>>>>>>
>>>>>>>>> which would pull this information out at runtime. (With the
>>>>>>>>> MapToFields transform, it is very easy to compute/append additional 
>>>>>>>>> fields
>>>>>>>>> to existing records.) Generally this field would then be stripped 
>>>>>>>>> from the
>>>>>>>>> written data, which would only see the subset of non-dynamically 
>>>>>>>>> referenced
>>>>>>>>> columns (though this could be configurable: we could add an attribute 
>>>>>>>>> like
>>>>>>>>> {dynamic: my_col, Keep: true} or require the set of columns to be 
>>>>>>>>> actually
>>>>>>>>> written (or elided) to be enumerated in the config or allow/require 
>>>>>>>>> the
>>>>>>>>> actual data to be written to be in a designated field of the "full" 
>>>>>>>>> input
>>>>>>>>> records as arranged by a preceding transform). It'd be great to get
>>>>>>>>> input/impressions from a wide range of people here on what would be 
>>>>>>>>> the
>>>>>>>>> most natural. Often just writing out snippets of various alternatives 
>>>>>>>>> can
>>>>>>>>> be quite informative (though I'm avoiding putting them here for the 
>>>>>>>>> moment
>>>>>>>>> to avoid biasing ideas right off the bat).
>>>>>>>>>
>>>>>>>>> For streaming pipelines it is often essential to write data out in
>>>>>>>>> a time-partitioned manner. The typical way to do this is to add the
>>>>>>>>> windowing information into the shard specification itself, and a (set 
>>>>>>>>> of)
>>>>>>>>> file(s) is written on each window closing. Beam YAML already supports 
>>>>>>>>> any
>>>>>>>>> transform being given a "windowing" configuration which will cause a
>>>>>>>>> WindowInto transform to be applied to its input(s) before application 
>>>>>>>>> which
>>>>>>>>> can sit naturally on a sink. We may want to consider if non-windowed 
>>>>>>>>> writes
>>>>>>>>> make sense as well (though how this interacts with the watermark and
>>>>>>>>> underlying implementations are a large open question, so this is a 
>>>>>>>>> larger
>>>>>>>>> change that might make sense to defer).
>>>>>>>>>
>>>>>>>>> Note that I am explicitly excluding "coders" here. All data in
>>>>>>>>> YAML should be schema'd, and writers should know how to write this
>>>>>>>>> structured data. We may want to allow a "schema" field to allow a 
>>>>>>>>> user to
>>>>>>>>> specify the desired schema in a manner compatible with the sink format
>>>>>>>>> itself (e.g. avro, json, whatever) that could be used both for 
>>>>>>>>> validation
>>>>>>>>> and possibly resolving ambiguities (e.g. if the sink has an enum 
>>>>>>>>> format
>>>>>>>>> that is not expressed in the schema of the input PCollection).
>>>>>>>>>
>>>>>>>>> Some other configuration options are that some formats (especially
>>>>>>>>> text-based ones) allow for specification of an external compression 
>>>>>>>>> type
>>>>>>>>> (which may be inferable from the suffix), whether to write a single 
>>>>>>>>> shard
>>>>>>>>> if the input collection is empty or no shards at all (an occasional 
>>>>>>>>> user
>>>>>>>>> request that's supported for some Beam sinks now), whether to allow 
>>>>>>>>> fixed
>>>>>>>>> sharing (generally discouraged, as it disables things like automatic
>>>>>>>>> shading based on input size, let alone dynamic work rebalancing, 
>>>>>>>>> though
>>>>>>>>> sometimes this is useful if the input is known to be small and a 
>>>>>>>>> single
>>>>>>>>> output is desired regardless of the restriction in parallelism), or 
>>>>>>>>> other
>>>>>>>>> sharding parameters (e.g. limiting the number of total elements or
>>>>>>>>> (approximately) total number of bytes per output shard). Some of these
>>>>>>>>> options may not be available/implemented for all 
>>>>>>>>> formats--consideration
>>>>>>>>> should be given as to how to handle this inconsistency (runtime 
>>>>>>>>> errors for
>>>>>>>>> unsupported combinations or simply not allowing them on any until all 
>>>>>>>>> are
>>>>>>>>> supported).
>>>>>>>>>
>>>>>>>>> A final consideration: we do not anticipate exposing the full
>>>>>>>>> complexity of Beam in the YAML offering. For advanced users using a 
>>>>>>>>> "real"
>>>>>>>>> SDK will often be preferable, and we intend to provide a migration 
>>>>>>>>> path
>>>>>>>>> from YAML to a language of your choice (codegen) as a migration path. 
>>>>>>>>> So we
>>>>>>>>> should balance simplicity with completeness and utility here.
>>>>>>>>>
>>>>>>>>> Sure, we could just pick something, but given that the main point
>>>>>>>>> of YAML is not capability, but expressibility and ease-of-use, I 
>>>>>>>>> think it's
>>>>>>>>> worth trying to get the expression of these concepts right. I'm sure 
>>>>>>>>> many
>>>>>>>>> of you have written a pipeline to files at some point in time; I'd 
>>>>>>>>> welcome
>>>>>>>>> any thoughts anyone has on the matter.
>>>>>>>>>
>>>>>>>>> - Robert
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> P.S. A related consideration: how should we consider the plain
>>>>>>>>> Read (where that file pattern is given at pipeline construction) from 
>>>>>>>>> the
>>>>>>>>> ReadAll variants? Should they be separate transforms, or should we 
>>>>>>>>> instead
>>>>>>>>> allow the same named transform (e.g. ReadFromParquet) support both 
>>>>>>>>> modes,
>>>>>>>>> depending on whether an input PCollection or explicit file path is 
>>>>>>>>> given
>>>>>>>>> (the two being mutually exclusive, with exactly one required, and good
>>>>>>>>> error messaging of course)?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Java:
>>>>>>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/TextIO.Write.html
>>>>>>>>> Python:
>>>>>>>>> https://beam.apache.org/releases/pydoc/current/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText
>>>>>>>>> Go:
>>>>>>>>> https://pkg.go.dev/github.com/apache/beam/sdks/go/pkg/beam/io/textio#Write
>>>>>>>>> Typescript:
>>>>>>>>> https://beam.apache.org/releases/typedoc/current/functions/io_textio.writeToText.html
>>>>>>>>> Scio:
>>>>>>>>> https://spotify.github.io/scio/api/com/spotify/scio/io/TextIO$$WriteParam.html
>>>>>>>>>
>>>>>>>>>

Reply via email to