Hi, Initial thoughts were to go for a WAL based approach where the operator would first write POJO's to the WAL and then a separate thread would do the task of reading from the WAL and writing the destination files based on the block size.
There is a ticket open for a pluggable spooling implementation with output operators which can be leveraged for this, https://issues.apache.org/jira/browse/APEXMALHAR-2037 Since work is already being done on that front, we can plug in the spooler with the existing implementation of the ParquetFileWriter at that point and remove the first operator - ParquetFileOutputOperator. Thanks, Dev On Tue, Jun 14, 2016 at 7:21 PM, Thomas Weise <[email protected]> wrote: > What's the reason for not considering the WAL based approach? > > What are the pros and cons? > > > On Tue, Jun 14, 2016 at 6:54 PM, Devendra Tagare < > [email protected]> > wrote: > > > Hi All, > > > > We can focus on the below 2 problems, > > 1.Avoid the small files problem which could arise due a flush at every > > endWindow, since there wouldn't be significant data in a window. > > 2.Fault Tolerance. > > > > *Proposal* : Create a module in which there are 2 operators, > > > > *Operator 1 : ParquetFileOutputOperator* > > This operator will be an implementation of the > AbstractFileOutputOperator. > > It will write data to a HDFS location and leverage the fault-tolerance > > semantics of the AbstractFileOutputOperator. > > > > This operator will implement the CheckpointNotificationListener and will > > emit the finalizedFiles from the beforeCheckpoint method. > > Map<windowId,Set<files finalized in the window>> > > > > *Operator 2 : ParquetFileWriter* > > This operator will receive a Set<files finalized in the window> from the > > ParquetFileOutputOperator on its input port. > > Once it receives this map, it will do the below things, > > > > 1.Save the input received to a Map<windowId,Set<InputFiles>> > inputFilesMap > > > > 2.Instantiate a new ParquetWriter > > 2.a. Get a unique file name. > > 2.b. Add a configurable writer that extends the ParquetWriter and > include > > a write support for writing various supported formats like Avro,thrift > etc. > > > > 3.For each file from the inputFilesMap, > > 3.a Read the file and write the record using the writer created in (2) > > 3.b Check if the block size (configurable) is reached.If yes then close > > the file and add its entry to a > > Map<windowId,CompletedFiles>completedFilesMap.Remove the entry from > > inputFilesMap. > > If the writes fail then the files can be reprocessed from the > > inputFilesMap. > > 3.c In the committed callback remove the completed files from the > directory > > and prune the completedFilesMap for that window. > > > > Points to note, > > 1.The block size check will be approximate since the data is in memory > and > > ParquetWriter does not expose a flush. > > 2.This is at best a temporary implementation in the absence of a WAL > based > > approach. > > > > I would like to take a crack at this operator based on community > feedback. > > > > Thoughts ? > > > > Thanks, > > Dev > > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Apr 25, 2016 at 12:36 PM, Tushar Gosavi <[email protected]> > > wrote: > > > > > Hi Shubham, > > > > > > +1 for the Parquet writer. > > > > > > I doubt if we could leverage on recovery mechanism provided by > > > AbstractFileOutputOperator as Parquet Writer does not expose flush, and > > > could write to underline stream at any time. To simplify recovery you > can > > > write a single file in each checkpoint duration. If this is not an > > option, > > > then > > > you need to make use of WAL for recovery, and not use operator > > > check-pointing for storing not persisted tuples, as checkpointing huge > > > state every 30 seconds is costly. > > > > > > Regards, > > > -Tushar. > > > > > > > > > On Mon, Apr 25, 2016 at 11:38 PM, Shubham Pathak < > > [email protected]> > > > wrote: > > > > > > > Hello Community, > > > > > > > > Apache Parquet <https://parquet.apache.org/documentation/latest/> > is a > > > > columnar oriented binary file format designed to be extremely > efficient > > > and > > > > interoperable across Hadoop ecosystem. It has integrations with most > of > > > the > > > > Hadoop processing frameworks ( Impala, Hive, Pig, Spark.. ) and > > > > serialization models (Thrift, Avro, Protobuf) making it easy to use > in > > > ETL > > > > and processing pipelines. > > > > > > > > Having an operator to write data to Parquet files would certainly be > a > > > good > > > > addition to the Malhar library. > > > > > > > > The underlying implementation > > > > < > > > > > > > > > > http://blog.cloudera.com/blog/2014/05/how-to-convert-existing-data-into-parquet/ > > > > > > > > > for writing data as Parquet, requires a subclass of > > > > parquet.hadoop.api.WriteSupport that knows how to take an in-memory > > > object > > > > and write Parquet primitives through parquet.io.api.RecordConsumer*.* > > > > Currently, there are several WriteSupport implementations, including > > > > ThriftWriteSupport, > > > > AvroWriteSupport, and ProtoWriteSupport. > > > > These WriteSupport implementations are then wrapped as ParquetWriter > > > > objects for writing. > > > > > > > > Parquet Writers do not expose a handle to the underlying stream. In > > order > > > > to write data to a Parquet file, all the records ( that belong to > > file ) > > > > must be buffered in memory. These records are then compressed and > later > > > > flushed to the file. > > > > > > > > To start with, we could support following features in the operator > > > > > > > > - *Ability to provide a WriteSupport Implementation* : The user > > should > > > > be able to use existing implementations of parquet.hadoop.api. > > > > WriteSupport or provide his/her own implementation. > > > > - *Ability to configure Page Size : *Refers to the amount of > > > > uncompressed data for a single column that is read before it is > > > > compressed > > > > as a unit and buffered in memory to be written out as a “page”. > > > Default > > > > value : 1MB > > > > - *Ability to configure Parquet Block Size : *Refers to the amount > > of > > > > compressed data that should be buffered in memory before a row > group > > > is > > > > written out to disk. Larger block sizes require more memory to > > buffer > > > > the > > > > data; Recommended is 128 MB / 256 MB > > > > - *Flushing files periodically* :Operator would have to flush > files > > > > periodically in a specified directory as per configured block > size . > > > > This > > > > could be time-based / number of events based / size based > > > > > > > > To implement the operator, here's one approach : > > > > > > > > 1. Extend existing AbstractFileOutputOperator > > > > 2. Provide methods to add write support implementations. > > > > 3. In process method, hold the data in memory till we reach a > > > configured > > > > size and then flush the contents to a file during endWindow(). > > > > > > > > Please send across your thoughts on this. I would also like to know > if > > we > > > > would be able to leverage recovery mechanisms provided by > > > > AbstractFileOutputOperator using this approach? > > > > > > > > > > > > Thanks, > > > > Shubham > > > > > > > > > >
