IMPALA-5304: reduce transfer of Parquet decompression buffers

The buffers contain the Parquet DataPages, which need to be
attached to the row batch if the rows point to var-len data
stored directly in the page. Otherwise the buffers can be
discarded once the values in the page have been materialized.

This reduces the amount of memory transferred between threads, which is
a known TCMalloc anti-pattern. It also allows us to free memory
earlier, which may help reduce memory consumption slightly.

Also fix a latent bug I noticed where needs_conversion_ is not
always initialised in the constructor.

Testing
Ran exhaustive build. Most of the Parquet tests use compressed Parquet,
which should exercise this code path.

Change-Id: I2dbd749f43078b222ff8e1ddcec840986c466de6
Reviewed-on: http://gerrit.cloudera.org:8080/6876
Reviewed-by: Tim Armstrong <[email protected]>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/00510158
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/00510158
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/00510158

Branch: refs/heads/master
Commit: 005101586ac89019f73d6bd6fe02c547fc54115e
Parents: 514e04e
Author: Tim Armstrong <[email protected]>
Authored: Wed May 10 14:56:03 2017 -0700
Committer: Impala Public Jenkins <[email protected]>
Committed: Mon May 22 10:39:12 2017 +0000

----------------------------------------------------------------------
 be/src/exec/parquet-column-readers.cc | 11 +++++++++--
 be/src/exec/parquet-column-readers.h  | 11 +++++++++--
 2 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00510158/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc 
b/be/src/exec/parquet-column-readers.cc
index cb7a5dd..8f703ec 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -212,6 +212,7 @@ class ScalarColumnReader : public BaseScalarColumnReader {
       const SlotDescriptor* slot_desc)
     : BaseScalarColumnReader(parent, node, slot_desc),
       dict_decoder_init_(false),
+      needs_conversion_(false),
       timezone_(NULL),
       is_timestamp_dependent_timezone_(false) {
     if (!MATERIALIZED) {
@@ -1027,8 +1028,14 @@ Status BaseScalarColumnReader::InitDictionary() {
 
 Status BaseScalarColumnReader::ReadDataPage() {
   // We're about to move to the next data page.  The previous data page is
-  // now complete, pass along the memory allocated for it.
-  
parent_->scratch_batch_->mem_pool()->AcquireData(decompressed_data_pool_.get(), 
false);
+  // now complete, free up any memory allocated for it. If the data page 
contained
+  // strings we need to attach it to the returned batch.
+  if (CurrentPageContainsTupleData()) {
+    parent_->scratch_batch_->mem_pool()->AcquireData(
+        decompressed_data_pool_.get(), false);
+  } else {
+    decompressed_data_pool_->FreeAll();
+  }
 
   // Read the next data page, skipping page types we don't care about.
   // We break out of this loop on the non-error case (a data page was found or 
we read all

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/00510158/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h 
b/be/src/exec/parquet-column-readers.h
index 66d8815..593699d 100644
--- a/be/src/exec/parquet-column-readers.h
+++ b/be/src/exec/parquet-column-readers.h
@@ -346,7 +346,7 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   }
 
   virtual void Close(RowBatch* row_batch) {
-    if (row_batch != nullptr) {
+    if (row_batch != nullptr && CurrentPageContainsTupleData()) {
       row_batch->tuple_data_pool()->AcquireData(decompressed_data_pool_.get(), 
false);
     } else {
       decompressed_data_pool_->FreeAll();
@@ -360,7 +360,6 @@ class BaseScalarColumnReader : public ParquetColumnReader {
     if (metadata_ == NULL) return THdfsCompression::NONE;
     return PARQUET_TO_IMPALA_CODEC[metadata_->codec];
   }
-  MemPool* decompressed_data_pool() const { return 
decompressed_data_pool_.get(); }
 
   /// Reads the next definition and repetition levels for this column. 
Initializes the
   /// next data page if necessary.
@@ -468,6 +467,14 @@ class BaseScalarColumnReader : public ParquetColumnReader {
   /// 'size' bytes remaining.
   virtual Status InitDataPage(uint8_t* data, int size) = 0;
 
+  /// Returns true if the current data page may contain strings referenced by 
returned
+  /// batches. Cases where this is not true are:
+  /// * Dictionary-compressed pages, where any string data lives in 
'dictionary_pool_'.
+  /// * Fixed-length slots, where there is no string data.
+  bool CurrentPageContainsTupleData() {
+    return page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY
+        && slot_desc_ != nullptr && slot_desc_->type().IsStringType();
+  }
 };
 
 /// Collections are not materialized directly in parquet files; only scalar 
values appear

Reply via email to