Hi Enrico,

Sorry for the late reply. I think your understanding is correct.
The best way to do it is to write your own ParquetBulkWriter and the
corresponding factory.

Out of curiosity, I guess that in the BucketingSink you were using the
AvroKeyValueSinkWriter, right?

Cheers,
Kostas

On Fri, Aug 30, 2019 at 10:23 AM Enrico Agnoli
<enrico.agn...@workday.com> wrote:
>
> StreamingFile limitations
>
> Hi community,
>
> I'm working toward the porting of our code from `BucketingSink<>` to 
> `StreamingFileSink`.
> In this case we use the sink to write AVRO via Parquet and the suggested 
> implementation of the Sink should be something like:
>
> ```
> val parquetWriterFactory = ParquetAvroWriters.forSpecificRecord(mySchemaClass)
> StreamingFileSink.forBulkFormat(basePath, 
> parquetWriterFactory).withBucketAssigner(dataLakeBucketAssigner)
> ```
>
> In this design the BucketAssigner is concatenated after the bulkFormat step. 
> The problem that I'm having with this design is that I have an object that 
> contains information that should be used to construct the path and a 
> sub-object that contains the data to serialize. A simple example
>
> myClass
> |- country
> |- cityClass extends SpecificRecordBase)
>
> Let's say I receive myClass as a stream and I want to serialize the cityClass 
> data via the logic above. The problem is that the `forBulkFormat(..)` needs 
> to run on a subType of `SpecificRecordBase`, so myClass doesn't work.
> If I extract cityClass from myClass then I will not have country available in 
> the `withBucketAssigner(..)` to be able to store the data in the right 
> folder...
>
>
> Am I missing something or I do have to write my own version of the 
> `ParquetBulkWriter<T>` class so to be able to handle `myClass`?
>
> Thanks for any idea and suggestion.
> Enrico

Reply via email to