[
https://issues.apache.org/jira/browse/DRILL-5266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15868909#comment-15868909
]
Paul Rogers commented on DRILL-5266:
------------------------------------
The code that determines how many variable-length records fits into a record
batch is seriously flawed. Consider the execution flow:
{code}
public class ParquetRecordReader extends AbstractRecordReader {
...
public int next() {
...
recordsToRead = Math.min(recordsToRead, numRecordsToRead);
if (allFieldsFixedLength) {
readAllFixedFields(recordsToRead);
} else { // variable length columns
long fixedRecordsToRead = varLengthReader.readFields(recordsToRead,
firstColumnStatus);
readAllFixedFields(fixedRecordsToRead);
}
{code}
This suggests that we read all the variable length fields, then the fixed
length ones. We tell it how many records to read.
Then:
{code}
public class VarLenBinaryReader {
...
public long readFields(long recordsToReadInThisPass, ColumnReader<?>
firstColumnStatus) throws IOException {
long recordsReadInCurrentPass = 0;
...
recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass);
if(useAsyncTasks){
readRecordsParallel(recordsReadInCurrentPass);
}else{
readRecordsSerial(recordsReadInCurrentPass);
}
{code}
Here, we determine how many records to read, then read them. OK.
{code}
private long determineSizesSerial(long recordsToReadInThisPass) throws
IOException {
...
int recordsReadInCurrentPass = 0;
top: do {
for (VarLengthColumn<?> columnReader : columns) {
// Return status is "done reading", meaning stop if true.
if (columnReader.determineSize(recordsReadInCurrentPass, 0 /* unused */
))
break top;
...
{code}
This says that we determine the size of each column, one record at at time,
stopping when the batch is full. This is NOT what happens. Let's follow down:
{code}
public boolean determineSize(long recordsReadInCurrentPass, Integer
lengthVarFieldsInCurrentRecord) throws IOException {
...
if (processPageData((int) recordsReadInCurrentPass)) {
return true;
}
public abstract class VarLengthColumn<V extends ValueVector> extends
ColumnReader<V> {
...
protected boolean processPageData(int recordsToReadInThisPass) throws
IOException {
return readAndStoreValueSizeInformation();
}
public abstract class NullableVarLengthValuesColumn<V extends ValueVector>
extends VarLengthValuesColumn<V> {
...
return ! setSafe(valuesReadInCurrentPass + pageReader.valuesReadyToRead,
pageReader.pageData,
(int) pageReader.readyToReadPosInBytes + 4, dataTypeLengthInBits);
public static class NullableVarCharColumn extends
NullableVarLengthValuesColumn<NullableVarCharVector> {
...
public boolean setSafe(int index, DrillBuf value, int start, int length) {
if (index >= vector.getValueCapacity()) {
return false;
}
if (usingDictionary) {
ByteBuffer buf = currDictValToWrite.toByteBuffer();
mutator.setSafe(index, buf, buf.position(),
currDictValToWrite.length());
} else {
mutator.setSafe(index, 1, start, start + length, value);
}
return true;
}
{code}
The above says, let's see when the data to be read would exceed the allocated
vector length. But, that's not what it does. It actually checks if the *number
of values* exceeds the length. The actual data vector length is grown
automatically as needed. But, we don't need to check value count here; we do
that elsewhere. The whole idea of this code chain is to check data capacity --
which is why we have to go down this path rather than simply doing a count
check earlier in the stack.
But, since we always extend capacity, we will never stop reading
variable-length data based on the amount of data, only the number of values.
Crazy...
> Parquet Reader produces "low density" record batches
> ----------------------------------------------------
>
> Key: DRILL-5266
> URL: https://issues.apache.org/jira/browse/DRILL-5266
> Project: Apache Drill
> Issue Type: Bug
> Components: Storage - Parquet
> Affects Versions: 1.10
> Reporter: Paul Rogers
>
> Testing with the managed sort revealed that, for at least one file, Parquet
> produces "low-density" batches: batches in which only 5% of each value vector
> contains actual data, with the rest being unused space. When fed into the
> sort, we end up buffering 95% of wasted space, using only 5% of available
> memory to hold actual query data. The result is poor performance of the sort
> as it must spill far more frequently than expected.
> The managed sort analyzes incoming batches to prepare good memory use
> estimates. The following the the output from the Parquet file in question:
> {code}
> Actual batch schema & sizes {
> T1¦¦cs_sold_date_sk(std col. size: 4, actual col. size: 4, total size:
> 196608, vector size: 131072, data size: 4516, row capacity: 32768, density: 4)
> T1¦¦cs_sold_time_sk(std col. size: 4, actual col. size: 4, total size:
> 196608, vector size: 131072, data size: 4516, row capacity: 32768, density: 4)
> T1¦¦cs_ship_date_sk(std col. size: 4, actual col. size: 4, total size:
> 196608, vector size: 131072, data size: 4516, row capacity: 32768, density: 4)
> ...
> c_email_address(std col. size: 54, actual col. size: 27, total size: 53248,
> vector size: 49152, data size: 30327, row capacity: 4095, density: 62)
> Records: 1129, Total size: 32006144, Row width:28350, Density:5}
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)