In use cases that actually need the filename / topic name / etc, it mandatory information. It isn't overhead or a performance hit.
Before SDF, FileIO was somewhat of a special case because it read globs and directories. Most other IOs knew the names of their data source statically anyhow so reifying it in the elements didn't add anything that you couldn't do another way. It is SDF that makes this a universally-relevant feature. Kenn On Mon, Feb 11, 2019 at 3:14 AM Alexey Romanenko <aromanenko....@gmail.com> wrote: > Talking about KafkaIO, it’s already possible to have this since* > "apply(KafkaIO.<K, V>read())"* returns *"PCollection<KafkaRecord<K, V>>”* > where *KafkaRecord* contains message metadata (topic, partition, etc). > Though, it works _only_ if* “withoutMetadata()”* was not used before - > in this case it will return simple *KV<K, V>*. > > In the same time, I agree that it would be useful to have some general way > to obtain meta information of records across all Beam IOs. > > On 7 Feb 2019, at 18:25, Yi Pan <nickpa...@gmail.com> wrote: > > Shouldn't this apply to more generic scenario for any BeamIO? For example, > I am using KafkaIO and wanted to get the topic and partition from which the > message was received. Some IOContext associated with each data unit from > BeamIO may be useful here? > > -Yi > > On Thu, Feb 7, 2019 at 6:29 AM Kenneth Knowles <k...@apache.org> wrote: > >> This comes up a lot, wanting file names alongside the data that came from >> the file. It is a historical quirk that none of our connectors used to have >> the file names. What is the change needed for FileIO + parse Avro to be >> really easy to use? >> >> Kenn >> >> On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas <jklu...@mozilla.com> wrote: >> >>> I haven't needed to do this with Beam before, but I've definitely had >>> similar needs in the past. Spark, for example, provides an input_file_name >>> function that can be applied to a dataframe to add the input file as an >>> additional column. It's not clear to me how that's implemented, though. >>> >>> Perhaps others have suggestions, but I'm not aware of a way to do this >>> conveniently in Beam today. To my knowledge, today you would have to use >>> FileIO.match() and FileIO.readMatches() to get a collection of >>> ReadableFile. You'd then have to FlatMapElements to pull out the metadata >>> and the bytes of the file, and you'd be responsible for parsing those bytes >>> into avro records. You'd be able to output something like a KV<String, T> >>> that groups the file name together with the parsed avro record. >>> >>> Seems like something worth providing better support for in Beam itself >>> if this indeed doesn't already exist. >>> >>> On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel <ch...@behalf.com> wrote: >>> >>>> Hi, >>>> I am working on a pipeline that listens to a topic on pubsub to get >>>> files that have changes in the storage. Then i read avro files, and >>>> would like to write them to bigquery based on the file name (to >>>> different tables). >>>> My problem is that the transformer that reads the avro does not give >>>> me back the files name (like a tuple or something like that). I seem >>>> to have this pattern come back a lot. >>>> Can you think of any solutions? >>>> >>>> Chaim >>>> >>>> -- >>>> >>>> >>>> Loans are funded by >>>> FinWise Bank, a Utah-chartered bank located in Sandy, >>>> Utah, member FDIC, Equal >>>> Opportunity Lender. Merchant Cash Advances are >>>> made by Behalf. For more >>>> information on ECOA, click here >>>> <https://www.behalf.com/legal/ecoa/>. For important information about >>>> opening a new >>>> account, review Patriot Act procedures here >>>> <https://www.behalf.com/legal/patriot/>. >>>> Visit Legal >>>> <https://www.behalf.com/legal/> to >>>> review our comprehensive program terms, >>>> conditions, and disclosures. >>>> >>> >