fatemehp commented on code in PR #14142:
URL: https://github.com/apache/arrow/pull/14142#discussion_r1007317749
##########
cpp/src/parquet/column_reader.cc:
##########
@@ -1328,6 +1329,156 @@ class TypedRecordReader : public
ColumnReaderImplBase<DType>,
return records_read;
}
+
+ // Skip records that we have in our buffer. This function is only for
+ // non-repeated fields.
+ int64_t SkipRecordsInBufferNonRepeated(int64_t num_records) {
+ ARROW_DCHECK(this->max_rep_level_ == 0);
+ ARROW_DCHECK(this->has_values_to_process());
+
+ int64_t remaining_records = levels_written_ - levels_position_;
+ int64_t skipped_records = std::min(num_records, remaining_records);
+ int64_t start_levels_position = levels_position_;
+ // Since there is no repetition, number of levels equals number of records.
+ levels_position_ += skipped_records;
+ // We skipped the levels by incrementing 'levels_position_'. For values
+ // we do not have a buffer, so we need to read them and throw them away.
+ // First we need to figure out how many present/not-null values there are.
+ std::shared_ptr<::arrow::ResizableBuffer> valid_bits;
+ valid_bits = AllocateBuffer(this->pool_);
+ PARQUET_THROW_NOT_OK(
+ valid_bits->Resize(bit_util::BytesForBits(skipped_records), true));
+ ValidityBitmapInputOutput validity_io;
+ validity_io.values_read_upper_bound = skipped_records;
+ validity_io.valid_bits = valid_bits->mutable_data();
+ validity_io.valid_bits_offset = 0;
+ DefLevelsToBitmap(def_levels() + start_levels_position,
+ levels_position_ - start_levels_position,
+ this->leaf_info_, &validity_io);
+ int64_t values_to_read = validity_io.values_read - validity_io.null_count;
+ ReadAndThrowAway(values_to_read);
+ // Mark the levels as read in the underlying column reader.
+ this->ConsumeBufferedValues(skipped_records);
+ return skipped_records;
+ }
+
+ // Skip records for repeated fields. Returns number of skipped records.
+ // Skip records for repeated fields. Returns number of skipped records.
+ int64_t SkipRecordsRepeated(int64_t num_records) {
+ ARROW_DCHECK_GT(this->max_rep_level_, 0);
+
+ // For repeated fields, we are technically reading and throwing away the
+ // levels and values since we do not know the record boundaries in advance.
+ // Keep filling the buffer and skipping until we reach the desired number
+ // of records or we run out of values in the column chunk.
+ int64_t skipped_records = 0;
+ int64_t level_batch_size = std::max<int>(kMinLevelBatchSize, num_records);
+ // If 'at_record_start_' is false, but (skip_records == num_records), it
+ // means that for the last record that was counted, we have not seen all
+ // of it's values yet.
+ while (!at_record_start_ || skipped_records < num_records) {
+ // Is there more data to read in this row group?
+ // HasNextInternal() will advance to the next page if necessary.
+ if (!this->HasNextInternal()) {
+ if (!at_record_start_) {
+ // We ended the row group while inside a record that we haven't seen
+ // the end of yet. So increment the record count for the last record
+ // in the row group
+ ++skipped_records;
+ at_record_start_ = true;
+ }
+ break;
+ }
+
+ // Read some more levels.
+ int64_t batch_size =
+ std::min(level_batch_size, available_values_current_page());
+ // No more data in column. This must be an empty page.
+ // If we had exhausted the last page, HasNextInternal() must have
advanced
+ // to the next page. So there must be available values to process.
+ if (batch_size == 0) {
+ break;
+ }
+
+ ReserveLevels(batch_size);
+
+ int16_t* def_levels = this->def_levels() + levels_written_;
Review Comment:
I am bumping it here for correctness. At any point in time levels_written_
shows the end of the levels that are in the buffer. So we update it right here
after we read a batch of levels. Note that we may not throw away all the levels
that we read here. We may only throw away some of them in
DelimitAndSkipRecordsInBuffer. When we throw away levels, we will update
levels_written_ accordingly.
You are bringing up a good point here. We actually can read the values that
we want to skip into a separate buffer and throw them away, which will then
reduce the amount of shifting that we have to do. It can make the code a bit
more complicated though since I need to consume the values from this buffer
first, then read into the scratch buffer, and if anything is left transfer it
over. I will keep this in mind as an optimization on top of this pull request.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]