IMPALA-5304: reduce transfer of Parquet decompression buffers The buffers contain the Parquet DataPages, which need to be attached to the row batch if the rows point to var-len data stored directly in the page. Otherwise the buffers can be discarded once the values in the page have been materialized.
This reduces the amount of memory transferred between threads, which is a known TCMalloc anti-pattern. It also allows us to free memory earlier, which may help reduce memory consumption slightly. Also fix a latent bug I noticed where needs_conversion_ is not always initialised in the constructor. Testing Ran exhaustive build. Most of the Parquet tests use compressed Parquet, which should exercise this code path. Change-Id: I2dbd749f43078b222ff8e1ddcec840986c466de6 Reviewed-on: http://gerrit.cloudera.org:8080/6876 Reviewed-by: Tim Armstrong <[email protected]> Tested-by: Impala Public Jenkins Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/00510158 Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/00510158 Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/00510158 Branch: refs/heads/master Commit: 005101586ac89019f73d6bd6fe02c547fc54115e Parents: 514e04e Author: Tim Armstrong <[email protected]> Authored: Wed May 10 14:56:03 2017 -0700 Committer: Impala Public Jenkins <[email protected]> Committed: Mon May 22 10:39:12 2017 +0000 ---------------------------------------------------------------------- be/src/exec/parquet-column-readers.cc | 11 +++++++++-- be/src/exec/parquet-column-readers.h | 11 +++++++++-- 2 files changed, 18 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00510158/be/src/exec/parquet-column-readers.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc index cb7a5dd..8f703ec 100644 --- a/be/src/exec/parquet-column-readers.cc +++ b/be/src/exec/parquet-column-readers.cc @@ -212,6 +212,7 @@ class ScalarColumnReader : public BaseScalarColumnReader { const SlotDescriptor* slot_desc) : BaseScalarColumnReader(parent, node, slot_desc), dict_decoder_init_(false), + needs_conversion_(false), timezone_(NULL), is_timestamp_dependent_timezone_(false) { if (!MATERIALIZED) { @@ -1027,8 +1028,14 @@ Status BaseScalarColumnReader::InitDictionary() { Status BaseScalarColumnReader::ReadDataPage() { // We're about to move to the next data page. The previous data page is - // now complete, pass along the memory allocated for it. - parent_->scratch_batch_->mem_pool()->AcquireData(decompressed_data_pool_.get(), false); + // now complete, free up any memory allocated for it. If the data page contained + // strings we need to attach it to the returned batch. + if (CurrentPageContainsTupleData()) { + parent_->scratch_batch_->mem_pool()->AcquireData( + decompressed_data_pool_.get(), false); + } else { + decompressed_data_pool_->FreeAll(); + } // Read the next data page, skipping page types we don't care about. // We break out of this loop on the non-error case (a data page was found or we read all http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00510158/be/src/exec/parquet-column-readers.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet-column-readers.h b/be/src/exec/parquet-column-readers.h index 66d8815..593699d 100644 --- a/be/src/exec/parquet-column-readers.h +++ b/be/src/exec/parquet-column-readers.h @@ -346,7 +346,7 @@ class BaseScalarColumnReader : public ParquetColumnReader { } virtual void Close(RowBatch* row_batch) { - if (row_batch != nullptr) { + if (row_batch != nullptr && CurrentPageContainsTupleData()) { row_batch->tuple_data_pool()->AcquireData(decompressed_data_pool_.get(), false); } else { decompressed_data_pool_->FreeAll(); @@ -360,7 +360,6 @@ class BaseScalarColumnReader : public ParquetColumnReader { if (metadata_ == NULL) return THdfsCompression::NONE; return PARQUET_TO_IMPALA_CODEC[metadata_->codec]; } - MemPool* decompressed_data_pool() const { return decompressed_data_pool_.get(); } /// Reads the next definition and repetition levels for this column. Initializes the /// next data page if necessary. @@ -468,6 +467,14 @@ class BaseScalarColumnReader : public ParquetColumnReader { /// 'size' bytes remaining. virtual Status InitDataPage(uint8_t* data, int size) = 0; + /// Returns true if the current data page may contain strings referenced by returned + /// batches. Cases where this is not true are: + /// * Dictionary-compressed pages, where any string data lives in 'dictionary_pool_'. + /// * Fixed-length slots, where there is no string data. + bool CurrentPageContainsTupleData() { + return page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY + && slot_desc_ != nullptr && slot_desc_->type().IsStringType(); + } }; /// Collections are not materialized directly in parquet files; only scalar values appear
