This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 49f0ab1d0 IMPALA-14794: Implement small string optimization in Parquet 
scanner
49f0ab1d0 is described below

commit 49f0ab1d09541f960cee12e9a5e6aa38ec21565a
Author: Balazs Hevele <[email protected]>
AuthorDate: Thu Mar 5 12:47:04 2026 +0100

    IMPALA-14794: Implement small string optimization in Parquet scanner
    
    When decoding a string in a parquet file, it is now always smallified
    if possible.
    With plain encoding, if a column's data page only contains smallified
    strings, the data page's memory is no longer attached to the tuple,
    since no string points into it.
    With dictionary encoding, string column readers always allocate a copy
    of the dictionary page so strings can point into it. If all strings are
    smallified, this copy is freed after decoding all data, because no
    strings point into it.
    
    Measurements:
    Measured time of a join query where lots of small strings are sent from
    the reader:
      select l1.* from tpch_parquet.lineitem l1
        join tpch_parquet.lineitem l2 on l1.l_shipmode = l2.l_shipmode
        limit 1;
    
    Before:
      KrpcDataStreamSender: SerializeBatchTime: 84.385ms
      HDFS_SCAN_NODE: MaterializeTupleTime: 8.183ms
    After:
      KrpcDataStreamSender: SerializeBatchTime: 67.598ms
      HDFS_SCAN_NODE: MaterializeTupleTime: 8.632ms
    
    Same measurement with a table with a higher scale factor:
      select l1.* from tpch30_parquet_snap.lineitem l1
        join tpch30_parquet_snap.lineitem l2 on l1.l_shipmode = l2.l_shipmode
        limit 1;
    
    Before:
      KrpcDataStreamSender: SerializeBatchTime: 2s359ms
      HDFS_SCAN_NODE: MaterializeTupleTime: 239.267ms
    After:
      KrpcDataStreamSender: SerializeBatchTime: 1s702ms
      HDFS_SCAN_NODE: MaterializeTupleTime: 243.606ms
    
    This is ~27% gain in SerializeBatchTime.
    
    Testing:
    -Added a test to parquet-plain-test.cc to test that small strings are
    smallified upon decoding
    
    Change-Id: I16c550d35cd6d3ec259b899b325611294137ccef
    Reviewed-on: http://gerrit.cloudera.org:8080/24063
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/parquet/parquet-column-chunk-reader.cc | 27 +++++-
 be/src/exec/parquet/parquet-column-chunk-reader.h  | 13 +++
 be/src/exec/parquet/parquet-column-readers.cc      | 99 ++++++++++++++++++++++
 be/src/exec/parquet/parquet-common.h               |  7 +-
 be/src/exec/parquet/parquet-plain-test.cc          | 40 +++++++++
 be/src/runtime/smallable-string.h                  |  5 ++
 be/src/runtime/string-value.h                      |  3 +
 be/src/testutil/random-vector-generators.h         |  2 +-
 8 files changed, 190 insertions(+), 6 deletions(-)

diff --git a/be/src/exec/parquet/parquet-column-chunk-reader.cc 
b/be/src/exec/parquet/parquet-column-chunk-reader.cc
index 23edffe54..5c2f9101c 100644
--- a/be/src/exec/parquet/parquet-column-chunk-reader.cc
+++ b/be/src/exec/parquet/parquet-column-chunk-reader.cc
@@ -73,17 +73,26 @@ Status ParquetColumnChunkReader::InitColumnChunk(const 
HdfsFileDesc& file_desc,
 }
 
 void ParquetColumnChunkReader::Close(MemPool* mem_pool) {
-  if (mem_pool != nullptr && value_mem_type_ == ValueMemoryType::VAR_LEN_STR) {
+  if (keep_data_page_pool_ && mem_pool != nullptr
+      && value_mem_type_ == ValueMemoryType::VAR_LEN_STR) {
     mem_pool->AcquireData(data_page_pool_.get(), false);
   } else {
     data_page_pool_->FreeAll();
   }
 
+  if (UNLIKELY(dict_page_pool_.get())) {
+    if (mem_pool != nullptr) {
+      mem_pool->AcquireData(dict_page_pool_.get(), false);
+    } else {
+      dict_page_pool_->FreeAll();
+    }
+  }
+
   if (decompressor_ != nullptr) decompressor_->Close();
 }
 
 void ParquetColumnChunkReader::ReleaseResourcesOfLastPage(MemPool& mem_pool) {
-  if (value_mem_type_ == ValueMemoryType::VAR_LEN_STR) {
+  if (keep_data_page_pool_ && value_mem_type_ == ValueMemoryType::VAR_LEN_STR) 
{
     mem_pool.AcquireData(data_page_pool_.get(), false);
   } else {
     data_page_pool_->FreeAll();
@@ -167,14 +176,18 @@ Status 
ParquetColumnChunkReader::ReadDictionaryData(ScopedBuffer* uncompressed_b
   if (decompressor_.get() != nullptr || copy_buffer) {
     int buffer_size = current_page_header.uncompressed_page_size;
     if (copy_buffer) {
-      *dict_values = parent_->dictionary_pool_->TryAllocate(buffer_size); // 
case 1.
+      // Allocate a buffer from dict_page_pool_. After decoding, if all strings
+      // are smallified, this will be freed. Otherwise, it will be acquired by
+      // the last row batch.
+      dict_page_pool_.reset(new MemPool(parent_->scan_node_->mem_tracker()));
+      *dict_values = dict_page_pool_->TryAllocate(buffer_size); // case 1.
     } else if (uncompressed_buffer->TryAllocate(buffer_size)) {
       *dict_values = uncompressed_buffer->buffer(); // case 2
     }
     if (UNLIKELY(*dict_values == nullptr)) {
       string details = Substitute(PARQUET_PAGE_MEM_LIMIT_EXCEEDED, 
"InitDictionary",
           buffer_size, "dictionary");
-      return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
+      return dict_page_pool_->mem_tracker()->MemLimitExceeded(
                parent_->state_, details, buffer_size);
     }
   } else {
@@ -402,6 +415,12 @@ Status 
ParquetColumnChunkReader::ReadDataPageData(DataPageInfo* page_info) {
     } else {
       data = compressed_data;
     }
+
+    // If the data is a string, the page pool only needs to be kept if there is
+    // at least one string that cannot be smallified.
+    // When such a string is encountered, this flag will be set to true.
+    keep_data_page_pool_ = false;
+
     if (has_slot_desc) {
       // Use original sizes (includes levels in v2) in the profile.
       parent_->scan_node_->UpdateBytesRead(slot_id_, orig_uncompressed_size, 
0);
diff --git a/be/src/exec/parquet/parquet-column-chunk-reader.h 
b/be/src/exec/parquet/parquet-column-chunk-reader.h
index f9d1b1d86..38e955489 100644
--- a/be/src/exec/parquet/parquet-column-chunk-reader.h
+++ b/be/src/exec/parquet/parquet-column-chunk-reader.h
@@ -150,6 +150,8 @@ class ParquetColumnChunkReader {
   }
 
  private:
+  friend class ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, 
true>;
+
   HdfsParquetScanner* parent_;
   std::string schema_name_;
 
@@ -163,6 +165,17 @@ class ParquetColumnChunkReader {
   /// batches.
   boost::scoped_ptr<MemPool> data_page_pool_;
 
+  /// Data page pool has to be kept for strings if there is any string 
pointing into it.
+  /// If all strings could be smallified, there is no need to keep data page 
pool.
+  /// This is reset to false when switching pages, and set to true when a long 
string is
+  /// found that could not be smallified.
+  bool keep_data_page_pool_ = false;
+
+  /// Pool to allocate a copy of the dictionary page. Only used for strings, 
which will
+  /// point to data in the copy. If all strings could be smallified during 
decoding, the
+  /// copy will be freed, otherwise it will be acquired by the last row batch.
+  boost::scoped_ptr<MemPool> dict_page_pool_;
+
   /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. 
Must be set
   /// with set_io_reservation() before 'stream_' is initialized. Reset for 
each row group
   /// by Reset().
diff --git a/be/src/exec/parquet/parquet-column-readers.cc 
b/be/src/exec/parquet/parquet-column-readers.cc
index b131b2f2d..a8969c1c2 100644
--- a/be/src/exec/parquet/parquet-column-readers.cc
+++ b/be/src/exec/parquet/parquet-column-readers.cc
@@ -814,6 +814,71 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::DecodeValue(
   return true;
 }
 
+// For string values, check if all strings after decoding are small:
+// -if all strings are smallified, chunk reader's dict page pool can be freed
+// -otherwise, strings point to the buffer, so it has to be kept
+template<>
+Status ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>
+  ::CreateDictionaryDecoder(
+    uint8_t* values, int size, DictDecoderBase** decoder) {
+  if (!dict_decoder_.template Reset<parquet::Type::BYTE_ARRAY>
+      (values, size, fixed_len_size_)) {
+    return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
+        slot_desc_->type().DebugString(), "could not decode dictionary");
+  }
+  dict_decoder_init_ = true;
+  *decoder = &dict_decoder_;
+
+  bool all_strings_smallified = true;
+  StringValue val;
+  for (int i = 0; i < dict_decoder_.num_entries(); ++i) {
+    dict_decoder_.GetValue(i, &val);
+    if (!val.IsSmall()) {
+      DCHECK(!val.CanBeSmallified());
+      all_strings_smallified = false;
+      break;
+    }
+  }
+
+  if (col_chunk_reader_.dict_page_pool_.get()) {
+    if (all_strings_smallified) {
+      col_chunk_reader_.dict_page_pool_->FreeAll();
+    } else {
+      
parent_->dictionary_pool_->AcquireData(col_chunk_reader_.dict_page_pool_.get(),
+          false);
+    }
+  }
+
+  return Status::OK();
+}
+
+// StringValue PLAIN: Check if string could be smallified.
+// If not, data page pool has to be kept, because string ptr points into it.
+template<>
+template<>
+bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>::
+    DecodeValue<Encoding::PLAIN>(uint8_t** RESTRICT data,
+      const uint8_t* RESTRICT data_end, StringValue* RESTRICT val) RESTRICT {
+  DCHECK_EQ(page_encoding_, Encoding::PLAIN);
+  int encoded_len = ParquetPlainEncoder::Decode<StringValue, 
parquet::Type::BYTE_ARRAY>(
+      *data, data_end, fixed_len_size_, val);
+  if (UNLIKELY(encoded_len < 0)) {
+    SetPlainDecodeError();
+    return false;
+  }
+
+  // If the string couldn't be smallified that means it points into the data 
buffer,
+  // so the buffer has to be kept
+  if (!val->IsSmall()) {
+    DCHECK(!val->CanBeSmallified());
+    col_chunk_reader_.keep_data_page_pool_ = true;
+  }
+
+  *data += encoded_len;
+
+  return true;
+}
+
 // Specialise for decoding INT64 timestamps from PLAIN decoding, which need to 
call
 // out to the timestamp decoder.
 template <>
@@ -877,6 +942,40 @@ bool ScalarColumnReader<InternalType, PARQUET_TYPE, 
MATERIALIZED>::DecodeValues(
   return true;
 }
 
+// StringValue PLAIN: Check if all strings could be smallified.
+// If not, data page pool has to be kept, because string ptr points into it.
+template <>
+template <>
+bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY, true>
+  ::DecodeValues<Encoding::PLAIN>(int64_t stride, int64_t count,
+    StringValue* RESTRICT out_vals) RESTRICT {
+
+    DCHECK_EQ(page_encoding_, Encoding::PLAIN);
+    int64_t encoded_len = ParquetPlainEncoder
+        ::DecodeBatch<StringValue, parquet::Type::BYTE_ARRAY>(
+            data_, data_end_, fixed_len_size_, count, stride, out_vals);
+    if (UNLIKELY(encoded_len < 0)) {
+      SetPlainDecodeError();
+      return false;
+    }
+    data_ += encoded_len;
+
+    // If any string couldn't be smallified that means it points into the data 
buffer,
+    // so the buffer has to be kept
+    for (int i = 0; i < count; ++i) {
+      StringValue* val = reinterpret_cast<StringValue*>
+          (reinterpret_cast<uint8_t*>(out_vals) + stride * i);
+
+      if (!val->IsSmall()) {
+        DCHECK(!val->CanBeSmallified());
+        col_chunk_reader_.keep_data_page_pool_ = true;
+        break;
+      }
+    }
+
+    return true;
+}
+
 // Specialise for decoding INT64 timestamps from PLAIN decoding, which need to 
call
 // out to the timestamp decoder.
 template <>
diff --git a/be/src/exec/parquet/parquet-common.h 
b/be/src/exec/parquet/parquet-common.h
index 72ad18015..2a71d0ce7 100644
--- a/be/src/exec/parquet/parquet-common.h
+++ b/be/src/exec/parquet/parquet-common.h
@@ -575,7 +575,12 @@ inline int ParquetPlainEncoder::Decode<StringValue, 
parquet::Type::BYTE_ARRAY>(
   int byte_size = ByteSize(*v);
   if (UNLIKELY(str_len < 0 || buffer_end - buffer < byte_size)) return -1;
   if (fixed_len_size > 0 && fixed_len_size < str_len) 
v->SetLen(fixed_len_size);
-  // we still read byte_size bytes, even if we truncate
+  // This is safe to smallify since the string points to the buffer's memory,
+  // and the decoding is already done, so the data will be in the buffer, only 
the string
+  // will not point to it, if the smallify succeeds.
+  v->Smallify();
+
+  // we still read byte_size bytes, even if we truncate and/or smallify
   return byte_size;
 }
 
diff --git a/be/src/exec/parquet/parquet-plain-test.cc 
b/be/src/exec/parquet/parquet-plain-test.cc
index df8daf86b..1a04fd092 100644
--- a/be/src/exec/parquet/parquet-plain-test.cc
+++ b/be/src/exec/parquet/parquet-plain-test.cc
@@ -486,5 +486,45 @@ TEST(PlainEncoding, CorruptString) {
   EXPECT_EQ(decoded_size, -1);
 }
 
+/// Test that strings are smallified after decoding
+TEST(PlainEncoding, SmallString) {
+  std::mt19937 gen;
+  
RandTestUtil::SeedRng("PARQUET_PLAIN_ENCODING_SMALL_STRING_TEST_RANDOM_SEED", 
&gen);
+
+  constexpr int NUM_ELEMENTS = 1024;
+
+  constexpr int min_small_str_length = 1;
+  constexpr int max_small_str_length = 10;
+  constexpr int min_large_str_length = 20;
+  constexpr int max_large_str_length = 100;
+
+  uint8_t buffer[sizeof(int32_t) + max_large_str_length];
+  StringValue result;
+
+  // Small enough strings should be smallified
+  const std::vector<std::string> small_str_vec = RandomStrVec(gen, 
NUM_ELEMENTS,
+      max_small_str_length, min_small_str_length);
+  for (const std::string& small_str: small_str_vec) {
+    StringValue small_str_val = StringValue(small_str);
+    int byte_size = sizeof(int32_t) + small_str_val.Len();
+    Encode(small_str_val, byte_size, buffer, parquet::Type::BYTE_ARRAY);
+    ParquetPlainEncoder::Decode<StringValue, parquet::Type::BYTE_ARRAY>(
+      buffer, buffer + byte_size, byte_size, &result);
+    EXPECT_TRUE(result.IsSmall());
+  }
+
+  // Large strings cannot be smallified
+  const std::vector<std::string> large_str_vec = RandomStrVec(gen, 
NUM_ELEMENTS,
+      max_large_str_length, min_large_str_length);
+  for (const std::string& large_str: large_str_vec) {
+    StringValue large_str_val = StringValue(large_str);
+    int byte_size = sizeof(int32_t) + large_str_val.Len();
+    Encode(large_str_val, byte_size, buffer, parquet::Type::BYTE_ARRAY);
+    ParquetPlainEncoder::Decode<StringValue, parquet::Type::BYTE_ARRAY>(
+      buffer, buffer + byte_size, byte_size, &result);
+    EXPECT_FALSE(result.IsSmall());
+  }
+}
+
 }
 
diff --git a/be/src/runtime/smallable-string.h 
b/be/src/runtime/smallable-string.h
index aa2c4a124..430c9a608 100644
--- a/be/src/runtime/smallable-string.h
+++ b/be/src/runtime/smallable-string.h
@@ -120,6 +120,11 @@ class __attribute__((__packed__)) SmallableString {
     return last_char & MSB_CHAR;
   }
 
+  bool CanBeSmallified() const {
+    if (IsSmall()) return true;
+    return rep.long_rep.len <= SMALL_LIMIT;
+  }
+
   bool Smallify() {
     if (IsSmall()) return true;
     if (rep.long_rep.len > SMALL_LIMIT) return false;
diff --git a/be/src/runtime/string-value.h b/be/src/runtime/string-value.h
index aa48a692d..c472c5cad 100644
--- a/be/src/runtime/string-value.h
+++ b/be/src/runtime/string-value.h
@@ -91,6 +91,8 @@ public:
 
   bool IsSmall() const { return string_impl_.IsSmall(); }
 
+  bool CanBeSmallified() const { return string_impl_.CanBeSmallified(); }
+
   int Len() const { return string_impl_.Len(); }
 
   /// Returns the number of bytes needed outside the slot itself:
@@ -194,6 +196,7 @@ public:
 private:
   friend Tuple;
   friend StringValueTest;
+  friend class ParquetPlainEncoder;
   /// !!! THIS IS UNSAFE TO CALL ON EXISTING STRINGVALUE OBJECTS !!!
   /// Please make sure you only invoke it for newly created StringValues, e.g. 
on the
   /// target StringValue object of a deep copy operation.
diff --git a/be/src/testutil/random-vector-generators.h 
b/be/src/testutil/random-vector-generators.h
index 780e9b190..cfb1d88d3 100644
--- a/be/src/testutil/random-vector-generators.h
+++ b/be/src/testutil/random-vector-generators.h
@@ -62,7 +62,7 @@ std::vector<NUM_T> RandomNumberVec(Generator& gen, const int 
length) {
 template <typename Generator = std::mt19937>
 std::vector<std::string> RandomStrVec(Generator& gen, const int length,
     const int max_str_length, const int min_str_length = 0) {
-  std::uniform_int_distribution<int> length_dist(0, max_str_length);
+  std::uniform_int_distribution<int> length_dist(min_str_length, 
max_str_length);
   std::uniform_int_distribution<char> letter_dist('a', 'z');
 
   std::vector<std::string> vec(length);

Reply via email to