I think introducing a class hierarchy for extracting metadata from IO
connectors might end up being an overkill. I think what we need to do is to
add new transforms to various IO connectors that would return associated
metadata along with each record. This will be fine performance-wise as well
since current transforms will not be affected.  Each source will end up
having it's own implementation (as needed) but file-based source transforms
will end-up sharing a bunch of code here since they share underlying code
for handling files. For example, we could add a,

*public class ReadAllViaFileBasedSource<T> extends
PTransform<PCollection<ReadableFile>, PCollection<KV<String, T>>>*

where *KV<String, T>* represents filename and the original record.

One caveat for file-based sources though is that we won't be able to
support Read transforms since we basically feed in a glob and get a bunch
of records (splitting happens within the source so composite transform is
not aware of exact files that would produce records unless we implement a
new source). ReadFiles/ReadAll transforms should be more flexible and we
should be able to adapt them to support returning file-names (and they'll
have the full power of Read transforms after SDF).

Thanks,
Cham


On Tue, Feb 12, 2019 at 12:42 PM Yi Pan <nickpa...@gmail.com> wrote:

> The "general way" is what I was hoping to convey the idea (apologize if
> KafkaIO is not a good example for that). More specifically, if KafkaIO
> returns metadata in KafkaRecord, and somehow, FileIO returns a file name
> associated with each record, it seems that it would make sense to define a
> general interface of metadata for each record across different BeamIO, as
> an optional envelope information. i.e. IOContext is the interface
> associated with each record and KafkaIO implements KafkaIOContext, and
> FileIO implements FileIOContext, etc.
>
> Best.
>
> On Mon, Feb 11, 2019 at 3:14 AM Alexey Romanenko <aromanenko....@gmail.com>
> wrote:
>
>> Talking about KafkaIO, it’s already possible to have this since*
>> "apply(KafkaIO.<K, V>read())"* returns *"PCollection<KafkaRecord<K, V>>”*
>> where  *KafkaRecord* contains message metadata (topic, partition, etc).
>> Though, it works _only_ if* “withoutMetadata()”*  was not used before -
>> in this case it will return simple *KV<K, V>*.
>>
>> In the same time, I agree that it would be useful to have some general
>> way to obtain meta information of records across all Beam IOs.
>>
>> On 7 Feb 2019, at 18:25, Yi Pan <nickpa...@gmail.com> wrote:
>>
>> Shouldn't this apply to more generic scenario for any BeamIO? For
>> example, I am using KafkaIO and wanted to get the topic and partition from
>> which the message was received. Some IOContext associated with each data
>> unit from BeamIO may be useful here?
>>
>> -Yi
>>
>> On Thu, Feb 7, 2019 at 6:29 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> This comes up a lot, wanting file names alongside the data that came
>>> from the file. It is a historical quirk that none of our connectors used to
>>> have the file names. What is the change needed for FileIO + parse Avro to
>>> be really easy to use?
>>>
>>> Kenn
>>>
>>> On Thu, Feb 7, 2019 at 6:18 AM Jeff Klukas <jklu...@mozilla.com> wrote:
>>>
>>>> I haven't needed to do this with Beam before, but I've definitely had
>>>> similar needs in the past. Spark, for example, provides an input_file_name
>>>> function that can be applied to a dataframe to add the input file as an
>>>> additional column. It's not clear to me how that's implemented, though.
>>>>
>>>> Perhaps others have suggestions, but I'm not aware of a way to do this
>>>> conveniently in Beam today. To my knowledge, today you would have to use
>>>> FileIO.match() and FileIO.readMatches() to get a collection of
>>>> ReadableFile. You'd then have to FlatMapElements to pull out the metadata
>>>> and the bytes of the file, and you'd be responsible for parsing those bytes
>>>> into avro records. You'd  be able to output something like a KV<String, T>
>>>> that groups the file name together with the parsed avro record.
>>>>
>>>> Seems like something worth providing better support for in Beam itself
>>>> if this indeed doesn't already exist.
>>>>
>>>> On Thu, Feb 7, 2019 at 7:29 AM Chaim Turkel <ch...@behalf.com> wrote:
>>>>
>>>>> Hi,
>>>>>   I am working on a pipeline that listens to a topic on pubsub to get
>>>>> files that have changes in the storage. Then i read avro files, and
>>>>> would like to write them to bigquery based on the file name (to
>>>>> different tables).
>>>>>   My problem is that the transformer that reads the avro does not give
>>>>> me back the files name (like a tuple or something like that). I seem
>>>>> to have this pattern come back a lot.
>>>>> Can you think of any solutions?
>>>>>
>>>>> Chaim
>>>>>
>>>>> --
>>>>>
>>>>>
>>>>> Loans are funded by
>>>>> FinWise Bank, a Utah-chartered bank located in Sandy,
>>>>> Utah, member FDIC, Equal
>>>>> Opportunity Lender. Merchant Cash Advances are
>>>>> made by Behalf. For more
>>>>> information on ECOA, click here
>>>>> <https://www.behalf.com/legal/ecoa/>. For important information about
>>>>> opening a new
>>>>> account, review Patriot Act procedures here
>>>>> <https://www.behalf.com/legal/patriot/>.
>>>>> Visit Legal
>>>>> <https://www.behalf.com/legal/> to
>>>>> review our comprehensive program terms,
>>>>> conditions, and disclosures.
>>>>>
>>>>
>>

Reply via email to