[ 
https://issues.apache.org/jira/browse/PARQUET-1549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16800553#comment-16800553
 ] 

Gustavo Figueiredo edited comment on PARQUET-1549 at 3/25/19 10:30 AM:
-----------------------------------------------------------------------

[~gszadovszky], thanks for pointing it out.

I've submitted a PR about this Jira with the solution for this case. Actually 
all the information we need are the actual filename used for each PARQUET file 
being created and the schema. We don't need to derive this information again 
from the configurations. I've just had to append something else to the filename 
in order to keep the filenames different, in such a way there would not be 
conflicts with files being created by other concurrent tasks. I believe that 
with this approach we can attend all the use cases (e.g: different files being 
created in different tasks, or different files being created in different 
locations by means of MultipleOutput, etc.).

It would become completely transparent to the MapReduce application.

The reason for doing this is that sometimes I may have high skew on different 
tasks loads, making some of them producing very large PARQUET files compared to 
the others. In many scenarios it's possible to fix this by choosing proper 
input splits or making some arrangements on input data. But sometimes we just 
have to deal with the skewed input data by other means.

I agree with you that it should not matter if we have different files with one 
rowgroup each or different row-groups with each one in a HDFS block, since in 
most cases the data split is block based. But, nonetheless, I've seen poor 
performance with the second scenario using Impala. I believe many users of 
Impala come to use Parquet, just like me, so it seems a good idea to help this 
audience with something that can help a lot with the later use performance.


was (Author: gustavohbf):
[~gszadovszky], thanks for pointing it out.

I've submitted a PR about this Jira with the solution for this case. Actually 
all the information we need are the actual filename used for each PARQUET file 
being created and the schema. We don't need to derive this information again 
from the configurations. I've just had to append something else to the filename 
in order to keep the filenames different, in such a way there would not be 
conflicts with files being created by other concurrent tasks. I believe that 
with this approach we can attend all the use cases (e.g: different files being 
created in different tasks, or different files being created in different 
locations by means of MultipleOutput, etc.).

It would become completely transparent to the MapReduce application.

The reason for doing this is that sometimes I may have high skew on different 
tasks loads, making some of them producing very large PARQUET files compared to 
the others. In many scenarios it's possible to fix this by choosing proper 
input splits or making some arrangements on input data. But sometimes we just 
have to deal with the skewed input data by other means.

I agree with you that it should not matter if we have different files in one 
rowgroup each or different row-groups with each one in a HDFS block, since in 
most cases the data split is block based. But, nonetheless, I've seen poor 
performance with the second scenario using Impala. I believe many users of 
Impala come to use Parquet, just like my, so it seems a good idea to help this 
audience with something that can help a lot with the later use performance.

> Option for one block per file in MapReduce output
> -------------------------------------------------
>
>                 Key: PARQUET-1549
>                 URL: https://issues.apache.org/jira/browse/PARQUET-1549
>             Project: Parquet
>          Issue Type: New Feature
>          Components: parquet-mr
>    Affects Versions: 1.10.0
>            Reporter: Gustavo Figueiredo
>            Priority: Minor
>
> When we create PARQUET files using a MapReduce application with current 
> ParquetOutputFormat implementation, we don't have any option to reliably 
> limit the number of blocks (row groups) we want to generate per file.
> The implemented configuration option 'parquet.block.size' 
> (ParquetOutputFormat.BLOCK_SIZE) refers to the amount of data that goes into 
> one block of data, but there are no guarantees that this will be the only 
> block in a file. If one sets this configuration option to a very high value, 
> it's likely there will be a single block per PARQUET file. However, this 
> approach might lead to undesirably big files, so this would not be a good 
> option in some scenarios.
> This behaviour can't be achieved by the client's 'mapper' either. Although 
> there are some helpfull classes in Hadoop API, such as 'MultipleOutputs', we 
> don't have enough information available at 'mapper' code in order to have 
> this kind of control, unless one uses unsafe 'hacks' to gather information 
> from private fields.
> By instance, suppose we have an ETL application that loads data from HBASE 
> regions (might be one or more MAPs per region) and produces PARQUET files to 
> be consumed in IMPALA tables (might be one or more PARQUET files per MAP 
> task). To simplify, let's say there is no 'REDUCE' task in this application.
> For concreteness, lets say one could use for such job 
> 'org.apache.hadoop.hbase.mapreduce.TableInputFormat' as input and 
> 'org.apache.parquet.hadoop.ParquetOutputFormat' as output. 
> Following the guidelines for maximum query performance in Impala queries in 
> HADOOP ecosystem, each PARQUET file should be approximately equal in size to 
> a HDFS block and there should be only one single block of data (row group) in 
> each of them (see 
> https://impala.apache.org/docs/build/html/topics/impala_perf_cookbook.html#perf_cookbook__perf_cookbook_parquet_block_size).
> Currently we are only able to do this by trial and error with different 
> configuration options.
> It would be nice to have a new boolean configuration option (lets call it 
> 'parquet.split.file.per.block') related to the existing one 
> 'parquet.block.size'. If it's set to false (default value), we would have the 
> current behaviour. If it's to true, we would have one different PARQUET file 
> being generated for each 'block' created, all coming from the same 
> ParquetRecordWriter.
> In doing so, we would only have to worry about tuning the 
> 'parquet.block.size' parameter in order to generate PARQUET files with one 
> single block per file whose size is closer to the configured HDFS block size.
>  
> In order to implement this new feature, we only need to change a few classes 
> in 'org.apache.parquet.hadoop' package, namely:
>  InternalParquetRecordWriter
>  ParquetFileWriter
>  ParquetOutputFormat
>  ParquetRecordWriter
> Briefly, these are the changes needed:
>  InternalParquetRecordWriter:
>  The field 'ParquetFileWriter parquetFileWriter' should not be 'final' 
> anymore, since we want to be able to change this throughout the task.
>  The method 'checkBlockSizeReached' should call a new function 'startNewFile' 
> just after a call to 'flushRowGroupToStore'.
>  The new method 'startNewFile' should have all the logic for closing the 
> current file and starting a new one at the same location with a proper 
> filename.
>  
>  ParquetFileWriter
>  The constructor argument 'OutputFile file' should be persisted as a new 
> member field and made available by a new public method. This information is 
> usefull for the 'startNewFile' implementation mentioned above.
>  The field 'MessageType schema' should be available by a new public method. 
> This information is also usefull for the 'startNewFile' implementation.
>  
>  ParquetOutputFormat
>  The existing private method 'getMaxPaddingSize' should be made 'public' or 
> at least 'package protected'. This information is usefull for the 
> 'startNewFile' implementation mentioned above.
>  The new configuration option 'parquet.split.file.per.block' should be 
> specified here like the other ones. The new behaviour in 
> 'InternalParquetRecordWriter' is conditioned on this configuration option.
>  
>  ParquetRecordWriter
>  Just pass away the configuration option to the internal 
> InternalParquetRecordWriter instance.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to