I think introducing a class hierarchy for extracting metadata from IO connectors might end up being an overkill. I think what we need to do is to add new transforms to various IO connectors that would return associated metadata along with each record. This will be fine performance-wise as well since current transforms will not be affected. Each source will end up having it's own implementation (as needed) but file-based source transforms will end-up sharing a bunch of code here since they share underlying code for handling files. For example, we could add a,
*public class ReadAllViaFileBasedSource<T> extends PTransform<PCollection<ReadableFile>, PCollection<KV<String, T>>>* where *KV<String, T>* represents filename and the original record. One caveat for file-based sources though is that we won't be able to support Read transforms since we basically feed in a glob and get a bunch of records (splitting happens within the source so composite transform is not aware of exact files that would produce records unless we implement a new source). ReadFiles/ReadAll transforms should be more flexible and we should be able to adapt them to support returning file-names (and they'll have the full power of Read transforms after SDF). Thanks, Cham On Tue, Feb 12, 2019 at 12:42 PM Yi Pan <nickpa...@gmail.com> wrote: > The "general way" is what I was hoping to convey the idea (apologize if > KafkaIO is not a good example for that). More specifically, if KafkaIO > returns metadata in KafkaRecord, and somehow, FileIO returns a file name > associated with each record, it seems that it would make sense to define a > general interface of metadata for each record across different BeamIO, as > an optional envelope information. i.e. IOContext is the interface > associated with each record and KafkaIO implements KafkaIOContext, and > FileIO implements FileIOContext, etc. > > Best. > > On Mon, Feb 11, 2019 at 3:14 AM Alexey Romanenko <aromanenko....@gmail.com> > wrote: > >> Talking about KafkaIO, it’s already possible to have this since* >> "apply(KafkaIO.<K, V>read())"* returns *"PCollection<KafkaRecord<K, V>>”* >> where *KafkaRecord* contains message metadata (topic, partition, etc). >> Though, it works _only_ if* “withoutMetadata()”* was not used before - >> in this case it will return simple *KV<K, V>*. >> >> In the same time, I agree that it would be useful to have some general >> way to obtain meta information of records across all Beam IOs. >> >> On 7 Feb 2019, at 18:25, Yi Pan <nickpa...@gmail.com> wrote: >> >> Shouldn't this apply to more generic scenario for any BeamIO? For >> example, I am using KafkaIO and wanted to get the topic and partition from >> which the message was received. Some IOContext associated with each data >> unit from BeamIO may be useful here? >> >> -Yi >> >> On Thu, Feb 7, 2019 at 6:29 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> This comes up a lot, wanting file names alongside the data that came >>> from the file. It is a historical quirk that none of our connectors used to >>> have the file names. What is the change needed for FileIO + parse Avro to >>> be really easy to use? >>> >>> Kenn >>> >>> On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas <jklu...@mozilla.com> wrote: >>> >>>> I haven't needed to do this with Beam before, but I've definitely had >>>> similar needs in the past. Spark, for example, provides an input_file_name >>>> function that can be applied to a dataframe to add the input file as an >>>> additional column. It's not clear to me how that's implemented, though. >>>> >>>> Perhaps others have suggestions, but I'm not aware of a way to do this >>>> conveniently in Beam today. To my knowledge, today you would have to use >>>> FileIO.match() and FileIO.readMatches() to get a collection of >>>> ReadableFile. You'd then have to FlatMapElements to pull out the metadata >>>> and the bytes of the file, and you'd be responsible for parsing those bytes >>>> into avro records. You'd be able to output something like a KV<String, T> >>>> that groups the file name together with the parsed avro record. >>>> >>>> Seems like something worth providing better support for in Beam itself >>>> if this indeed doesn't already exist. >>>> >>>> On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel <ch...@behalf.com> wrote: >>>> >>>>> Hi, >>>>> I am working on a pipeline that listens to a topic on pubsub to get >>>>> files that have changes in the storage. Then i read avro files, and >>>>> would like to write them to bigquery based on the file name (to >>>>> different tables). >>>>> My problem is that the transformer that reads the avro does not give >>>>> me back the files name (like a tuple or something like that). I seem >>>>> to have this pattern come back a lot. >>>>> Can you think of any solutions? >>>>> >>>>> Chaim >>>>> >>>>> -- >>>>> >>>>> >>>>> Loans are funded by >>>>> FinWise Bank, a Utah-chartered bank located in Sandy, >>>>> Utah, member FDIC, Equal >>>>> Opportunity Lender. Merchant Cash Advances are >>>>> made by Behalf. For more >>>>> information on ECOA, click here >>>>> <https://www.behalf.com/legal/ecoa/>. For important information about >>>>> opening a new >>>>> account, review Patriot Act procedures here >>>>> <https://www.behalf.com/legal/patriot/>. >>>>> Visit Legal >>>>> <https://www.behalf.com/legal/> to >>>>> review our comprehensive program terms, >>>>> conditions, and disclosures. >>>>> >>>> >>