[
https://issues.apache.org/jira/browse/SPARK-34859?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Li Xian updated SPARK-34859:
----------------------------
Description:
the current implementation has a problem. the pages returned by
`readNextFilteredRowGroup` may not be aligned, some columns may have more rows
than others.
Parquet is using `org.apache.parquet.column.impl.SynchronizingColumnReader`
with `rowIndexes` to make sure that rows are aligned.
Currently `VectorizedParquetRecordReader` doesn't have such synchronizing among
pages from different columns. Using `readNextFilteredRowGroup` may result in
incorrect result.
I have attache an example parquet file. This file is generated with
`spark.range(0, 2000).map(i => (i.toLong, i.toInt))` and the layout of this
file is listed below.
row group 0
--------------------------------------------------------------------------------
_1: INT64 SNAPPY DO:0 FPO:4 SZ:8161/16104/1.97 VC:2000 ENC:PLAIN,BIT_PACKED
[more]...
_2: INT32 SNAPPY DO:0 FPO:8165 SZ:8061/8052/1.00 VC:2000 ENC:PLAIN,BIT_PACKED
[more]...
_1 TV=2000 RL=0 DL=0
----------------------------------------------------------------------------
page 0: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN ST:[no stats for
[more]... VC:500
page 1: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN ST:[no stats for
[more]... VC:500
page 2: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN ST:[no stats for
[more]... VC:500
page 3: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN ST:[no stats for
[more]... VC:500
_2 TV=2000 RL=0 DL=0
----------------------------------------------------------------------------
page 0: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN ST:[no stats for
[more]... VC:1000
page 1: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN ST:[no stats for
[more]... VC:1000
As you can see in the row group 0, column1 has 4 data pages each with 500
values and column 2 has 2 data pages with 1000 values each.
If we want to filter the rows by values with _1 = 510 using columnindex,
parquet will return the page 1 of column _1 and page 0 of column _2. Page 1 of
column _1 starts with row 500, and page 0 of column _2 starts with row 0, and
it will be incorrect if we simply read the two values as one row.
As an example, If you try filter with _1 = 510 with column index on in current
version, it will give you the wrong result
+---+---+
|_1 |_2 |
+---+---+
|510|10 |
+---+---+
And if turn columnindex off, you can get the correct result
+---+---+
|_1 |_2 |
+---+---+
|510|510|
+---+---+
was:
the current implementation has a problem. the pages returned by
`readNextFilteredRowGroup` may not be aligned, some columns may have more rows
than others.
Parquet is using `org.apache.parquet.column.impl.SynchronizingColumnReader`
with `rowIndexes` to make sure that rows are aligned.
Currently `VectorizedParquetRecordReader` doesn't have such synchronizing among
pages from different columns. Using `readNextFilteredRowGroup` may result in
incorrect result.
> Vectorized parquet reader needs synchronization among pages for column index
> ----------------------------------------------------------------------------
>
> Key: SPARK-34859
> URL: https://issues.apache.org/jira/browse/SPARK-34859
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.2.0
> Reporter: Li Xian
> Priority: Major
> Attachments:
> part-00000-bee08cae-04cd-491c-9602-4c66791af3d0-c000.snappy.parquet
>
>
> the current implementation has a problem. the pages returned by
> `readNextFilteredRowGroup` may not be aligned, some columns may have more
> rows than others.
> Parquet is using `org.apache.parquet.column.impl.SynchronizingColumnReader`
> with `rowIndexes` to make sure that rows are aligned.
> Currently `VectorizedParquetRecordReader` doesn't have such synchronizing
> among pages from different columns. Using `readNextFilteredRowGroup` may
> result in incorrect result.
>
> I have attache an example parquet file. This file is generated with
> `spark.range(0, 2000).map(i => (i.toLong, i.toInt))` and the layout of this
> file is listed below.
> row group 0
> --------------------------------------------------------------------------------
> _1: INT64 SNAPPY DO:0 FPO:4 SZ:8161/16104/1.97 VC:2000 ENC:PLAIN,BIT_PACKED
> [more]...
> _2: INT32 SNAPPY DO:0 FPO:8165 SZ:8061/8052/1.00 VC:2000
> ENC:PLAIN,BIT_PACKED [more]...
> _1 TV=2000 RL=0 DL=0
>
> ----------------------------------------------------------------------------
> page 0: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN ST:[no stats for
> [more]... VC:500
> page 1: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN ST:[no stats for
> [more]... VC:500
> page 2: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN ST:[no stats for
> [more]... VC:500
> page 3: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN ST:[no stats for
> [more]... VC:500
> _2 TV=2000 RL=0 DL=0
>
> ----------------------------------------------------------------------------
> page 0: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN ST:[no stats for
> [more]... VC:1000
> page 1: DLE:BIT_PACKED RLE:BIT_PACKED VLE:PLAIN ST:[no stats for
> [more]... VC:1000
>
> As you can see in the row group 0, column1 has 4 data pages each with 500
> values and column 2 has 2 data pages with 1000 values each.
> If we want to filter the rows by values with _1 = 510 using columnindex,
> parquet will return the page 1 of column _1 and page 0 of column _2. Page 1
> of column _1 starts with row 500, and page 0 of column _2 starts with row 0,
> and it will be incorrect if we simply read the two values as one row.
>
> As an example, If you try filter with _1 = 510 with column index on in
> current version, it will give you the wrong result
> +---+---+
> |_1 |_2 |
> +---+---+
> |510|10 |
> +---+---+
> And if turn columnindex off, you can get the correct result
> +---+---+
> |_1 |_2 |
> +---+---+
> |510|510|
> +---+---+
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]