http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc
b/be/src/exec/parquet-column-readers.cc
deleted file mode 100644
index aec1265..0000000
--- a/be/src/exec/parquet-column-readers.cc
+++ /dev/null
@@ -1,1931 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet-column-readers.h"
-
-#include <boost/scoped_ptr.hpp>
-#include <string>
-#include <sstream>
-#include <gflags/gflags.h>
-#include <gutil/strings/substitute.h>
-
-#include "exec/hdfs-parquet-scanner.h"
-#include "exec/parquet-metadata-utils.h"
-#include "exec/parquet-scratch-tuple-batch.h"
-#include "exec/read-write-util.h"
-#include "exec/scanner-context.inline.h"
-#include "rpc/thrift-util.h"
-#include "runtime/collection-value-builder.h"
-#include "runtime/exec-env.h"
-#include "runtime/io/disk-io-mgr.h"
-#include "runtime/io/request-context.h"
-#include "runtime/tuple-row.h"
-#include "runtime/tuple.h"
-#include "runtime/runtime-state.h"
-#include "runtime/mem-pool.h"
-#include "util/bit-util.h"
-#include "util/codec.h"
-#include "util/debug-util.h"
-#include "util/dict-encoding.h"
-#include "util/rle-encoding.h"
-
-#include "common/names.h"
-
-// Provide a workaround for IMPALA-1658.
-DEFINE_bool(convert_legacy_hive_parquet_utc_timestamps, false,
- "When true, TIMESTAMPs read from files written by Parquet-MR (used by
Hive) will "
- "be converted from UTC to local time. Writes are unaffected.");
-
-// Max dictionary page header size in bytes. This is an estimate and only
needs to be an
-// upper bound.
-static const int MAX_DICT_HEADER_SIZE = 100;
-
-// Max data page header size in bytes. This is an estimate and only needs to
be an upper
-// bound. It is theoretically possible to have a page header of any size due
to string
-// value statistics, but in practice we'll have trouble reading string values
this large.
-// Also, this limit is in place to prevent impala from reading corrupt parquet
files.
-DEFINE_int32(max_page_header_size, 8*1024*1024, "max parquet page header size
in bytes");
-
-// Trigger debug action on every other call of Read*ValueBatch() once at least
128
-// tuples have been produced to simulate failure such as exceeding memory
limit.
-// Triggering it every other call so as not to always fail on the first column
reader
-// when materializing multiple columns. Failing on non-empty row batch tests
proper
-// resources freeing by the Parquet scanner.
-#ifndef NDEBUG
-static int debug_count = 0;
-#define SHOULD_TRIGGER_DEBUG_ACTION(num_tuples) \
- ((debug_count++ % 2) == 1 && num_tuples >= 128)
-#else
-#define SHOULD_TRIGGER_DEBUG_ACTION(x) (false)
-#endif
-
-using namespace impala::io;
-
-using parquet::Encoding;
-
-namespace impala {
-
-const string PARQUET_COL_MEM_LIMIT_EXCEEDED =
- "ParquetColumnReader::$0() failed to allocate $1 bytes for $2.";
-
-Status ParquetLevelDecoder::Init(const string& filename, Encoding::type
encoding,
- MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int*
data_size) {
- DCHECK(*data != nullptr);
- DCHECK_GE(*data_size, 0);
- DCHECK_GT(cache_size, 0);
- cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32);
- encoding_ = encoding;
- max_level_ = max_level;
- filename_ = filename;
- RETURN_IF_ERROR(InitCache(cache_pool, cache_size));
-
- // Return because there is no level data to read, e.g., required field.
- if (max_level == 0) return Status::OK();
-
- int32_t num_bytes = 0;
- switch (encoding) {
- case Encoding::RLE: {
- Status status;
- if (!ReadWriteUtil::Read(data, data_size, &num_bytes, &status)) {
- return status;
- }
- if (num_bytes < 0 || num_bytes > *data_size) {
- return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename,
num_bytes);
- }
- int bit_width = BitUtil::Log2Ceiling64(max_level + 1);
- rle_decoder_.Reset(*data, num_bytes, bit_width);
- break;
- }
- case parquet::Encoding::BIT_PACKED:
- return Status(TErrorCode::PARQUET_BIT_PACKED_LEVELS, filename);
- default: {
- stringstream ss;
- ss << "Unsupported encoding: " << encoding;
- return Status(ss.str());
- }
- }
- if (UNLIKELY(num_bytes < 0 || num_bytes > *data_size)) {
- return Status(Substitute("Corrupt Parquet file '$0': $1 bytes of encoded
levels but "
- "only $2 bytes left in page", filename, num_bytes, *data_size));
- }
- *data += num_bytes;
- *data_size -= num_bytes;
- return Status::OK();
-}
-
-Status ParquetLevelDecoder::InitCache(MemPool* pool, int cache_size) {
- num_cached_levels_ = 0;
- cached_level_idx_ = 0;
- // Memory has already been allocated.
- if (cached_levels_ != nullptr) {
- DCHECK_EQ(cache_size_, cache_size);
- return Status::OK();
- }
-
- cached_levels_ = reinterpret_cast<uint8_t*>(pool->TryAllocate(cache_size));
- if (cached_levels_ == nullptr) {
- return pool->mem_tracker()->MemLimitExceeded(
- nullptr, "Definition level cache", cache_size);
- }
- memset(cached_levels_, 0, cache_size);
- cache_size_ = cache_size;
- return Status::OK();
-}
-
-inline int16_t ParquetLevelDecoder::ReadLevel() {
- if (UNLIKELY(!CacheHasNext())) {
- if (UNLIKELY(!FillCache(cache_size_, &num_cached_levels_))) {
- return HdfsParquetScanner::INVALID_LEVEL;
- }
- DCHECK_GE(num_cached_levels_, 0);
- if (UNLIKELY(num_cached_levels_ == 0)) {
- return HdfsParquetScanner::INVALID_LEVEL;
- }
- }
- return CacheGetNext();
-}
-
-int32_t ParquetLevelDecoder::NextRepeatedRunLength() {
- if (CacheHasNext() || encoding_ != Encoding::RLE) return 0;
- // Treat always-zero levels as an infinitely long run of zeroes. Return the
maximum
- // run length allowed by the Parquet standard.
- if (max_level_ == 0) return numeric_limits<int32_t>::max();
- return rle_decoder_.NextNumRepeats();
-}
-
-uint8_t ParquetLevelDecoder::GetRepeatedValue(uint32_t num_to_consume) {
- DCHECK(!CacheHasNext());
- DCHECK_EQ(encoding_, Encoding::RLE);
- // Treat always-zero levels as an infinitely long run of zeroes.
- if (max_level_ == 0) return 0;
- return rle_decoder_.GetRepeatedValue(num_to_consume);
-}
-
-Status ParquetLevelDecoder::CacheNextBatch(int vals_remaining) {
- /// Fill the cache completely if there are enough values remaining.
- /// Otherwise don't try to read more values than are left.
- int batch_size = min(vals_remaining, cache_size_);
- if (max_level_ > 0) {
- if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_)
- || num_cached_levels_ < batch_size)) {
- return Status(decoding_error_code_, vals_remaining, filename_);
- }
- } else {
- // No levels to read, e.g., because the field is required. The cache was
- // already initialized with all zeros, so we can hand out those values.
- DCHECK_EQ(max_level_, 0);
- cached_level_idx_ = 0;
- num_cached_levels_ = batch_size;
- }
- return Status::OK();
-}
-
-bool ParquetLevelDecoder::FillCache(int batch_size, int* num_cached_levels) {
- DCHECK(!CacheHasNext());
- DCHECK(num_cached_levels != nullptr);
- DCHECK_GE(max_level_, 0);
- DCHECK_GE(*num_cached_levels, 0);
- cached_level_idx_ = 0;
- if (max_level_ == 0) {
- // No levels to read, e.g., because the field is required. The cache was
- // already initialized with all zeros, so we can hand out those values.
- *num_cached_levels = batch_size;
- return true;
- }
- DCHECK_EQ(encoding_, parquet::Encoding::RLE);
- *num_cached_levels = rle_decoder_.GetValues(batch_size, cached_levels_);
- return *num_cached_levels > 0;
-}
-
-/// Per column type reader. InternalType is the datatype that Impala uses
internally to
-/// store tuple data and PARQUET_TYPE is the corresponding primitive datatype
(as defined
-/// in the parquet spec) that is used to store column values in parquet files.
-/// If MATERIALIZED is true, the column values are materialized into the slot
described
-/// by slot_desc. If MATERIALIZED is false, the column values are not
materialized, but
-/// the position can be accessed.
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool
MATERIALIZED>
-class ScalarColumnReader : public BaseScalarColumnReader {
- public:
- ScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
- const SlotDescriptor* slot_desc);
- virtual ~ScalarColumnReader() { }
-
- virtual bool ReadValue(MemPool* pool, Tuple* tuple) override {
- return ReadValue<true>(tuple);
- }
-
- virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) override {
- return ReadValue<false>(tuple);
- }
-
- virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
- uint8_t* tuple_mem, int* num_values) override {
- return ReadValueBatch<true>(max_values, tuple_size, tuple_mem, num_values);
- }
-
- virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int
tuple_size,
- uint8_t* tuple_mem, int* num_values) override {
- return ReadValueBatch<false>(max_values, tuple_size, tuple_mem,
num_values);
- }
-
- virtual DictDecoderBase* GetDictionaryDecoder() override {
- return HasDictionaryDecoder() ? &dict_decoder_ : nullptr;
- }
-
- virtual bool NeedsConversion() override { return NeedsConversionInline(); }
- virtual bool NeedsValidation() override { return NeedsValidationInline(); }
-
- protected:
- template <bool IN_COLLECTION>
- inline bool ReadValue(Tuple* tuple);
-
- /// Implementation of the ReadValueBatch() functions specialized for this
- /// column reader type. This function drives the reading of data pages and
- /// caching of rep/def levels. Once a data page and cached levels are
available,
- /// it calls into a more specialized MaterializeValueBatch() for doing the
actual
- /// value materialization using the level caches.
- /// Use RESTRICT so that the compiler knows that it is safe to cache member
- /// variables in registers or on the stack (otherwise gcc's alias analysis
- /// conservatively assumes that buffers like 'tuple_mem', 'num_values' or the
- /// 'def_levels_' 'rep_levels_' buffers may alias 'this', especially with
- /// -fno-strict-alias).
- template <bool IN_COLLECTION>
- bool ReadValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT
tuple_mem,
- int* RESTRICT num_values) RESTRICT;
-
- /// Helper function for ReadValueBatch() above that performs value
materialization.
- /// It assumes a data page with remaining values is available, and that the
def/rep
- /// level caches have been populated. Materializes values into 'tuple_mem'
with a
- /// stride of 'tuple_size' and updates 'num_buffered_values_'. Returns the
number of
- /// values materialized in 'num_values'.
- /// For efficiency, the simple special case of !MATERIALIZED &&
!IN_COLLECTION is not
- /// handled in this function.
- /// Use RESTRICT so that the compiler knows that it is safe to cache member
- /// variables in registers or on the stack (otherwise gcc's alias analysis
- /// conservatively assumes that buffers like 'tuple_mem', 'num_values' or the
- /// 'def_levels_' 'rep_levels_' buffers may alias 'this', especially with
- /// -fno-strict-alias).
- template <bool IN_COLLECTION, Encoding::type ENCODING, bool NEEDS_CONVERSION>
- bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT
tuple_mem,
- int* RESTRICT num_values) RESTRICT;
-
- /// Same as above, but dispatches to the appropriate templated
implementation of
- /// MaterializeValueBatch() based on 'page_encoding_' and
NeedsConversionInline().
- template <bool IN_COLLECTION>
- bool MaterializeValueBatch(int max_values, int tuple_size, uint8_t* RESTRICT
tuple_mem,
- int* RESTRICT num_values) RESTRICT;
-
- /// Fast path for MaterializeValueBatch() that materializes values for a run
of
- /// repeated definition levels. Read up to 'max_values' values into
'tuple_mem',
- /// returning the number of values materialised in 'num_values'.
- bool MaterializeValueBatchRepeatedDefLevel(int max_values, int tuple_size,
- uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT;
-
- /// Read 'num_to_read' values into a batch of tuples starting at 'tuple_mem'.
- bool ReadSlots(
- int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem)
RESTRICT;
-
- /// Read 'num_to_read' values into a batch of tuples starting at
'tuple_mem', when
- /// conversion is needed.
- bool ReadAndConvertSlots(
- int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem)
RESTRICT;
-
- /// Read 'num_to_read' values into a batch of tuples starting at
'tuple_mem', when
- /// conversion is not needed.
- bool ReadSlotsNoConversion(
- int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem)
RESTRICT;
-
- /// Read 'num_to_read' position values into a batch of tuples starting at
'tuple_mem'.
- void ReadPositions(
- int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem)
RESTRICT;
-
- virtual Status CreateDictionaryDecoder(
- uint8_t* values, int size, DictDecoderBase** decoder) override {
- if (!dict_decoder_.template Reset<PARQUET_TYPE>(values, size,
fixed_len_size_)) {
- return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
- slot_desc_->type().DebugString(), "could not decode dictionary");
- }
- dict_decoder_init_ = true;
- *decoder = &dict_decoder_;
- return Status::OK();
- }
-
- virtual bool HasDictionaryDecoder() override {
- return dict_decoder_init_;
- }
-
- virtual void ClearDictionaryDecoder() override {
- dict_decoder_init_ = false;
- }
-
- virtual Status InitDataPage(uint8_t* data, int size) override;
-
- private:
- /// Writes the next value into the appropriate destination slot in 'tuple'.
Returns
- /// false if execution should be aborted for some reason, e.g. parse_error_
is set, the
- /// query is cancelled, or the scan node limit was reached. Otherwise
returns true.
- ///
- /// Force inlining - GCC does not always inline this into hot loops.
- template <Encoding::type ENCODING, bool NEEDS_CONVERSION>
- inline ALWAYS_INLINE bool ReadSlot(Tuple* tuple);
-
- /// Reads position into 'pos' and updates 'pos_current_value_' based on
'rep_level'.
- /// This helper is only used on the batched decoding path where we need to
reset
- /// 'pos_current_value_' to 0 based on 'rep_level'.
- inline ALWAYS_INLINE void ReadPositionBatched(int16_t rep_level, int64_t*
pos);
-
- /// Decode one value from *data into 'val' and advance *data. 'data_end' is
one byte
- /// past the end of the buffer. Return false and set 'parse_error_' if there
is an
- /// error decoding the value.
- template <Encoding::type ENCODING>
- inline ALWAYS_INLINE bool DecodeValue(
- uint8_t** data, const uint8_t* data_end, InternalType* RESTRICT val)
RESTRICT;
-
- /// Decode multiple values into 'out_vals' with a stride of 'stride' bytes.
Return
- /// false and set 'parse_error_' if there is an error decoding any value.
- inline ALWAYS_INLINE bool DecodeValues(
- int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT;
-
- /// Most column readers never require conversion, so we can avoid branches by
- /// returning constant false. Column readers for types that require
conversion
- /// must specialize this function.
- inline bool NeedsConversionInline() const {
- DCHECK(!needs_conversion_);
- return false;
- }
-
- /// Similar to NeedsConversion(), most column readers do not require
validation,
- /// so to avoid branches, we return constant false. In general, types where
not
- /// all possible bit representations of the data type are valid should be
- /// validated.
- inline bool NeedsValidationInline() const {
- return false;
- }
-
- /// Converts and writes 'src' into 'slot' based on desc_->type()
- bool ConvertSlot(const InternalType* src, void* slot) {
- DCHECK(false);
- return false;
- }
-
- /// Checks if 'val' is invalid, e.g. due to being out of the valid value
range. If it
- /// is invalid, logs the error and returns false. If the error should stop
execution,
- /// sets 'parent_->parse_status_'.
- bool ValidateValue(InternalType* val) const {
- DCHECK(false);
- return false;
- }
-
- /// Pull out slow-path Status construction code
- void __attribute__((noinline)) SetDictDecodeError() {
- parent_->parse_status_ = Status(TErrorCode::PARQUET_DICT_DECODE_FAILURE,
filename(),
- slot_desc_->type().DebugString(), stream_->file_offset());
- }
-
- void __attribute__((noinline)) SetPlainDecodeError() {
- parent_->parse_status_ = Status(TErrorCode::PARQUET_CORRUPT_PLAIN_VALUE,
filename(),
- slot_desc_->type().DebugString(), stream_->file_offset());
- }
-
- /// Dictionary decoder for decoding column values.
- DictDecoder<InternalType> dict_decoder_;
-
- /// True if dict_decoder_ has been initialized with a dictionary page.
- bool dict_decoder_init_ = false;
-
- /// true if decoded values must be converted before being written to an
output tuple.
- bool needs_conversion_ = false;
-
- /// The size of this column with plain encoding for FIXED_LEN_BYTE_ARRAY, or
- /// the max length for VARCHAR columns. Unused otherwise.
- int fixed_len_size_;
-
- /// Contains extra data needed for Timestamp decoding.
- ParquetTimestampDecoder timestamp_decoder_;
-
- /// Allocated from parent_->perm_pool_ if NeedsConversion() is true and null
otherwise.
- uint8_t* conversion_buffer_ = nullptr;
-};
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool
MATERIALIZED>
-ScalarColumnReader<InternalType, PARQUET_TYPE,
MATERIALIZED>::ScalarColumnReader(
- HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor*
slot_desc)
- : BaseScalarColumnReader(parent, node, slot_desc),
- dict_decoder_(parent->scan_node_->mem_tracker()) {
- if (!MATERIALIZED) {
- // We're not materializing any values, just counting them. No need (or
ability) to
- // initialize state used to materialize values.
- DCHECK(slot_desc_ == nullptr);
- return;
- }
-
- DCHECK(slot_desc_ != nullptr);
- DCHECK_NE(slot_desc_->type().type, TYPE_BOOLEAN);
- if (slot_desc_->type().type == TYPE_DECIMAL
- && PARQUET_TYPE == parquet::Type::FIXED_LEN_BYTE_ARRAY) {
- fixed_len_size_ = node.element->type_length;
- } else if (slot_desc_->type().type == TYPE_VARCHAR) {
- fixed_len_size_ = slot_desc_->type().len;
- } else {
- fixed_len_size_ = -1;
- }
-
- needs_conversion_ = slot_desc_->type().type == TYPE_CHAR;
-
- if (slot_desc_->type().type == TYPE_TIMESTAMP) {
- timestamp_decoder_ = parent->CreateTimestampDecoder(*node.element);
- dict_decoder_.SetTimestampHelper(timestamp_decoder_);
- needs_conversion_ = timestamp_decoder_.NeedsConversion();
- }
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool
MATERIALIZED>
-Status ScalarColumnReader<InternalType, PARQUET_TYPE,
MATERIALIZED>::InitDataPage(
- uint8_t* data, int size) {
- // Data can be empty if the column contains all NULLs
- DCHECK_GE(size, 0);
- page_encoding_ = current_page_header_.data_page_header.encoding;
- if (page_encoding_ != parquet::Encoding::PLAIN_DICTIONARY &&
- page_encoding_ != parquet::Encoding::PLAIN) {
- return GetUnsupportedDecodingError();
- }
-
- // If slot_desc_ is NULL, we don't need so decode any values so
dict_decoder_ does
- // not need to be initialized.
- if (page_encoding_ == Encoding::PLAIN_DICTIONARY && slot_desc_ != nullptr) {
- if (!dict_decoder_init_) {
- return Status("File corrupt. Missing dictionary page.");
- }
- RETURN_IF_ERROR(dict_decoder_.SetData(data, size));
- }
- // Allocate a temporary buffer to hold InternalType values if we need to
convert
- // before writing to the final slot.
- if (NeedsConversionInline() && conversion_buffer_ == nullptr) {
- int64_t buffer_size = sizeof(InternalType) * parent_->state_->batch_size();
- conversion_buffer_ =
- parent_->perm_pool_->TryAllocateAligned(buffer_size,
alignof(InternalType));
- if (conversion_buffer_ == nullptr) {
- return
parent_->perm_pool_->mem_tracker()->MemLimitExceeded(parent_->state_,
- "Failed to allocate conversion buffer in Parquet scanner",
buffer_size);
- }
- }
- // TODO: Perform filter selectivity checks here.
- return Status::OK();
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool
MATERIALIZED>
-template <bool IN_COLLECTION>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadValue(
- Tuple* tuple) {
- // NextLevels() should have already been called and def and rep levels
should be in
- // valid range.
- DCHECK_GE(rep_level_, 0);
- DCHECK_GE(def_level_, 0);
- DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
- "Caller should have called NextLevels() until we are ready to read a
value";
-
- if (MATERIALIZED) {
- if (def_level_ >= max_def_level()) {
- bool continue_execution;
- if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
- continue_execution = NeedsConversionInline() ?
- ReadSlot<Encoding::PLAIN_DICTIONARY, true>(tuple) :
- ReadSlot<Encoding::PLAIN_DICTIONARY, false>(tuple);
- } else {
- DCHECK_EQ(page_encoding_, Encoding::PLAIN);
- continue_execution = NeedsConversionInline() ?
- ReadSlot<Encoding::PLAIN, true>(tuple) :
- ReadSlot<Encoding::PLAIN, false>(tuple);
- }
- if (!continue_execution) return false;
- } else {
- tuple->SetNull(null_indicator_offset_);
- }
- }
- return NextLevels<IN_COLLECTION>();
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool
MATERIALIZED>
-template <bool IN_COLLECTION>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE,
MATERIALIZED>::ReadValueBatch(
- int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
- int* RESTRICT num_values) RESTRICT {
- // Repetition level is only present if this column is nested in a collection
type.
- if (IN_COLLECTION) {
- DCHECK_GT(max_rep_level(), 0) << slot_desc()->DebugString();
- } else {
- DCHECK_EQ(max_rep_level(), 0) << slot_desc()->DebugString();
- }
-
- int val_count = 0;
- bool continue_execution = true;
- while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
- DCHECK_GE(num_buffered_values_, 0);
- // Read next page if necessary.
- if (num_buffered_values_ == 0) {
- if (!NextPage()) {
- continue_execution = parent_->parse_status_.ok();
- continue;
- }
- }
-
- // Not materializing anything - skip decoding any levels and rely on the
value
- // count from page metadata to return the correct number of rows.
- if (!MATERIALIZED && !IN_COLLECTION) {
- int vals_to_add = min(num_buffered_values_, max_values - val_count);
- val_count += vals_to_add;
- num_buffered_values_ -= vals_to_add;
- DCHECK_GE(num_buffered_values_, 0);
- continue;
- }
- // Fill the rep level cache if needed. We are flattening out the fields of
the
- // nested collection into the top-level tuple returned by the scan, so we
don't
- // care about the nesting structure unless the position slot is being
populated.
- if (IN_COLLECTION && pos_slot_desc_ != nullptr &&
!rep_levels_.CacheHasNext()) {
- parent_->parse_status_.MergeStatus(
- rep_levels_.CacheNextBatch(num_buffered_values_));
- if (UNLIKELY(!parent_->parse_status_.ok())) return false;
- }
-
- const int remaining_val_capacity = max_values - val_count;
- uint8_t* next_tuple = tuple_mem + val_count * tuple_size;
- if (def_levels_.NextRepeatedRunLength() > 0) {
- // Fast path to materialize a run of values with the same definition
level. This
- // avoids checking for NULL/not-NULL for every value.
- int ret_val_count = 0;
- continue_execution = MaterializeValueBatchRepeatedDefLevel(
- remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
- val_count += ret_val_count;
- } else {
- // We don't have a repeated run - cache def levels and process
value-by-value.
- if (!def_levels_.CacheHasNext()) {
- parent_->parse_status_.MergeStatus(
- def_levels_.CacheNextBatch(num_buffered_values_));
- if (UNLIKELY(!parent_->parse_status_.ok())) return false;
- }
-
- // Read data page and cached levels to materialize values.
- int ret_val_count = 0;
- continue_execution = MaterializeValueBatch<IN_COLLECTION>(
- remaining_val_capacity, tuple_size, next_tuple, &ret_val_count);
- val_count += ret_val_count;
- }
- if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
- continue_execution &= ColReaderDebugAction(&val_count);
- }
- }
- *num_values = val_count;
- return continue_execution;
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool
MATERIALIZED>
-template <bool IN_COLLECTION, Encoding::type ENCODING, bool NEEDS_CONVERSION>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE,
MATERIALIZED>::MaterializeValueBatch(
- int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
- int* RESTRICT num_values) RESTRICT {
- DCHECK(MATERIALIZED || IN_COLLECTION);
- DCHECK_GT(num_buffered_values_, 0);
- DCHECK(def_levels_.CacheHasNext());
- if (IN_COLLECTION && pos_slot_desc_ != nullptr)
DCHECK(rep_levels_.CacheHasNext());
- const int cache_start_idx = def_levels_.CacheCurrIdx();
- uint8_t* curr_tuple = tuple_mem;
- int val_count = 0;
- DCHECK_LE(def_levels_.CacheRemaining(), num_buffered_values_);
- max_values = min(max_values, num_buffered_values_);
- while (def_levels_.CacheHasNext() && val_count < max_values) {
- Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
- int def_level = def_levels_.CacheGetNext();
-
- if (IN_COLLECTION) {
- if (def_level < def_level_of_immediate_repeated_ancestor()) {
- // A containing repeated field is empty or NULL. Skip the value but
- // move to the next repetition level if necessary.
- if (pos_slot_desc_ != nullptr) rep_levels_.CacheSkipLevels(1);
- continue;
- }
- if (pos_slot_desc_ != nullptr) {
- ReadPositionBatched(rep_levels_.CacheGetNext(),
- tuple->GetBigIntSlot(pos_slot_desc_->tuple_offset()));
- }
- }
-
- if (MATERIALIZED) {
- if (def_level >= max_def_level()) {
- bool continue_execution = ReadSlot<ENCODING, NEEDS_CONVERSION>(tuple);
- if (UNLIKELY(!continue_execution)) return false;
- } else {
- tuple->SetNull(null_indicator_offset_);
- }
- }
- curr_tuple += tuple_size;
- ++val_count;
- }
- num_buffered_values_ -= (def_levels_.CacheCurrIdx() - cache_start_idx);
- DCHECK_GE(num_buffered_values_, 0);
- *num_values = val_count;
- return true;
-}
-
-// Note that the structure of this function is very similar to
MaterializeValueBatch()
-// above, except it is unrolled to operate on multiple values at a time.
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool
MATERIALIZED>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE,
- MATERIALIZED>::MaterializeValueBatchRepeatedDefLevel(int max_values, int
tuple_size,
- uint8_t* RESTRICT tuple_mem, int* RESTRICT num_values) RESTRICT {
- DCHECK_GT(num_buffered_values_, 0);
- if (pos_slot_desc_ != nullptr) DCHECK(rep_levels_.CacheHasNext());
- int32_t def_level_repeats = def_levels_.NextRepeatedRunLength();
- DCHECK_GT(def_level_repeats, 0);
- // Peek at the def level. The number of def levels we'll consume depends on
several
- // conditions below.
- uint8_t def_level = def_levels_.GetRepeatedValue(0);
- int32_t num_def_levels_to_consume = 0;
-
- if (def_level < def_level_of_immediate_repeated_ancestor()) {
- DCHECK_GT(max_rep_level_, 0) << "Only possible if in a collection.";
- // A containing repeated field is empty or NULL. We don't need to return
any values
- // but need to advance any rep levels.
- if (pos_slot_desc_ != nullptr) {
- num_def_levels_to_consume =
- min<uint32_t>(def_level_repeats, rep_levels_.CacheRemaining());
- rep_levels_.CacheSkipLevels(num_def_levels_to_consume);
- } else {
- num_def_levels_to_consume = def_level_repeats;
- }
- *num_values = 0;
- } else {
- // Cannot consume more levels than allowed by buffered input values and
output space.
- num_def_levels_to_consume =
- min(num_buffered_values_, min(max_values, def_level_repeats));
- if (pos_slot_desc_ != nullptr) {
- num_def_levels_to_consume =
- min<uint32_t>(num_def_levels_to_consume,
rep_levels_.CacheRemaining());
- ReadPositions(num_def_levels_to_consume, tuple_size, tuple_mem);
- }
- if (MATERIALIZED) {
- if (def_level >= max_def_level()) {
- if (!ReadSlots(num_def_levels_to_consume, tuple_size, tuple_mem)) {
- return false;
- }
- } else {
- Tuple::SetNullIndicators(
- null_indicator_offset_, num_def_levels_to_consume, tuple_size,
tuple_mem);
- }
- }
- *num_values = num_def_levels_to_consume;
- }
- // We now know how many we actually consumed.
- def_levels_.GetRepeatedValue(num_def_levels_to_consume);
- num_buffered_values_ -= num_def_levels_to_consume;
- DCHECK_GE(num_buffered_values_, 0);
- return true;
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool
MATERIALIZED>
-template <bool IN_COLLECTION>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE,
MATERIALIZED>::MaterializeValueBatch(
- int max_values, int tuple_size, uint8_t* RESTRICT tuple_mem,
- int* RESTRICT num_values) RESTRICT {
- // Dispatch to the correct templated implementation of
MaterializeValueBatch().
- if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
- if (NeedsConversionInline()) {
- return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY,
true>(
- max_values, tuple_size, tuple_mem, num_values);
- } else {
- return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN_DICTIONARY,
false>(
- max_values, tuple_size, tuple_mem, num_values);
- }
- } else {
- DCHECK_EQ(page_encoding_, Encoding::PLAIN);
- if (NeedsConversionInline()) {
- return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN, true>(
- max_values, tuple_size, tuple_mem, num_values);
- } else {
- return MaterializeValueBatch<IN_COLLECTION, Encoding::PLAIN, false>(
- max_values, tuple_size, tuple_mem, num_values);
- }
- }
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool
MATERIALIZED>
-template <Encoding::type ENCODING, bool NEEDS_CONVERSION>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlot(
- Tuple* RESTRICT tuple) RESTRICT {
- void* slot = tuple->GetSlot(tuple_offset_);
- // Use an uninitialized stack allocation for temporary value to avoid running
- // constructors doing work unnecessarily, e.g. if T == StringValue.
- alignas(InternalType) uint8_t val_buf[sizeof(InternalType)];
- InternalType* val_ptr =
- reinterpret_cast<InternalType*>(NEEDS_CONVERSION ? val_buf : slot);
-
- if (UNLIKELY(!DecodeValue<ENCODING>(&data_, data_end_, val_ptr))) return
false;
- if (UNLIKELY(NeedsValidationInline() && !ValidateValue(val_ptr))) {
- if (UNLIKELY(!parent_->parse_status_.ok())) return false;
- // The value is invalid but execution should continue - set the null
indicator and
- // skip conversion.
- tuple->SetNull(null_indicator_offset_);
- return true;
- }
- if (NEEDS_CONVERSION && UNLIKELY(!ConvertSlot(val_ptr, slot))) return false;
- return true;
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool
MATERIALIZED>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::ReadSlots(
- int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT
{
- if (NeedsConversionInline()) {
- return ReadAndConvertSlots(num_to_read, tuple_size, tuple_mem);
- } else {
- return ReadSlotsNoConversion(num_to_read, tuple_size, tuple_mem);
- }
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool
MATERIALIZED>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE,
MATERIALIZED>::ReadAndConvertSlots(
- int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT
{
- DCHECK(NeedsConversionInline());
- DCHECK(conversion_buffer_ != nullptr);
- InternalType* first_val =
reinterpret_cast<InternalType*>(conversion_buffer_);
- // Decode into the conversion buffer before doing the conversion into the
output tuples.
- if (!DecodeValues(sizeof(InternalType), num_to_read, first_val)) return
false;
-
- InternalType* curr_val = first_val;
- uint8_t* curr_tuple = tuple_mem;
- for (int64_t i = 0; i < num_to_read; ++i, ++curr_val, curr_tuple +=
tuple_size) {
- Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
- if (NeedsValidationInline() && UNLIKELY(!ValidateValue(curr_val))) {
- if (UNLIKELY(!parent_->parse_status_.ok())) return false;
- // The value is invalid but execution should continue - set the null
indicator and
- // skip conversion.
- tuple->SetNull(null_indicator_offset_);
- continue;
- }
- if (UNLIKELY(!ConvertSlot(curr_val, tuple->GetSlot(tuple_offset_)))) {
- return false;
- }
- }
- return true;
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool
MATERIALIZED>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE,
MATERIALIZED>::ReadSlotsNoConversion(
- int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT
{
- DCHECK(!NeedsConversionInline());
- // No conversion needed - decode directly into the output slots.
- InternalType* first_slot = reinterpret_cast<InternalType*>(tuple_mem +
tuple_offset_);
- if (!DecodeValues(tuple_size, num_to_read, first_slot)) return false;
- if (NeedsValidationInline()) {
- // Validate the written slots.
- uint8_t* curr_tuple = tuple_mem;
- for (int64_t i = 0; i < num_to_read; ++i, curr_tuple += tuple_size) {
- Tuple* tuple = reinterpret_cast<Tuple*>(curr_tuple);
- InternalType* val =
static_cast<InternalType*>(tuple->GetSlot(tuple_offset_));
- if (UNLIKELY(!ValidateValue(val))) {
- if (UNLIKELY(!parent_->parse_status_.ok())) return false;
- // The value is invalid but execution should continue - set the null
indicator and
- // skip conversion.
- tuple->SetNull(null_indicator_offset_);
- }
- }
- }
- return true;
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool
MATERIALIZED>
-template <Encoding::type ENCODING>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE, MATERIALIZED>::DecodeValue(
- uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end,
- InternalType* RESTRICT val) RESTRICT {
- DCHECK_EQ(page_encoding_, ENCODING);
- if (ENCODING == Encoding::PLAIN_DICTIONARY) {
- if (UNLIKELY(!dict_decoder_.GetNextValue(val))) {
- SetDictDecodeError();
- return false;
- }
- } else {
- DCHECK_EQ(ENCODING, Encoding::PLAIN);
- int encoded_len = ParquetPlainEncoder::Decode<InternalType, PARQUET_TYPE>(
- *data, data_end, fixed_len_size_, val);
- if (UNLIKELY(encoded_len < 0)) {
- SetPlainDecodeError();
- return false;
- }
- *data += encoded_len;
- }
- return true;
-}
-
-template <>
-template <Encoding::type ENCODING>
-bool ScalarColumnReader<TimestampValue, parquet::Type::INT64,
true>::DecodeValue(
- uint8_t** RESTRICT data, const uint8_t* RESTRICT data_end,
- TimestampValue* RESTRICT val) RESTRICT {
- DCHECK_EQ(page_encoding_, ENCODING);
- if (ENCODING == Encoding::PLAIN_DICTIONARY) {
- if (UNLIKELY(!dict_decoder_.GetNextValue(val))) {
- SetDictDecodeError();
- return false;
- }
- } else {
- DCHECK_EQ(ENCODING, Encoding::PLAIN);
- int encoded_len =
- timestamp_decoder_.Decode<parquet::Type::INT64>(*data, data_end, val);
- if (UNLIKELY(encoded_len < 0)) {
- SetPlainDecodeError();
- return false;
- }
- *data += encoded_len;
- }
- return true;
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool
MATERIALIZED>
-bool ScalarColumnReader<InternalType, PARQUET_TYPE,
MATERIALIZED>::DecodeValues(
- int64_t stride, int64_t count, InternalType* RESTRICT out_vals) RESTRICT {
- if (page_encoding_ == Encoding::PLAIN_DICTIONARY) {
- if (UNLIKELY(!dict_decoder_.GetNextValues(out_vals, stride, count))) {
- SetDictDecodeError();
- return false;
- }
- } else {
- DCHECK_EQ(page_encoding_, Encoding::PLAIN);
- int64_t encoded_len = ParquetPlainEncoder::DecodeBatch<InternalType,
PARQUET_TYPE>(
- data_, data_end_, fixed_len_size_, count, stride, out_vals);
- if (UNLIKELY(encoded_len < 0)) {
- SetPlainDecodeError();
- return false;
- }
- data_ += encoded_len;
- }
- return true;
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool
MATERIALIZED>
-void ScalarColumnReader<InternalType, PARQUET_TYPE,
MATERIALIZED>::ReadPositionBatched(
- int16_t rep_level, int64_t* pos) {
- // Reset position counter if we are at the start of a new parent collection.
- if (rep_level <= max_rep_level() - 1) pos_current_value_ = 0;
- *pos = pos_current_value_++;
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE, bool
MATERIALIZED>
-void ScalarColumnReader<InternalType, PARQUET_TYPE,
MATERIALIZED>::ReadPositions(
- int64_t num_to_read, int tuple_size, uint8_t* RESTRICT tuple_mem) RESTRICT
{
- const int pos_slot_offset = pos_slot_desc()->tuple_offset();
- void* first_slot =
reinterpret_cast<Tuple*>(tuple_mem)->GetSlot(pos_slot_offset);
- StrideWriter<int64_t> out{reinterpret_cast<int64_t*>(first_slot),
tuple_size};
- for (int64_t i = 0; i < num_to_read; ++i) {
- ReadPositionBatched(rep_levels_.CacheGetNext(), out.Advance());
- }
-}
-
-template <>
-inline bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY,
- true>::NeedsConversionInline() const {
- return needs_conversion_;
-}
-
-template <>
-bool ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY,
true>::ConvertSlot(
- const StringValue* src, void* slot) {
- DCHECK(slot_desc() != nullptr);
- DCHECK(slot_desc()->type().type == TYPE_CHAR);
- int char_len = slot_desc()->type().len;
- int unpadded_len = min(char_len, src->len);
- char* dst_char = reinterpret_cast<char*>(slot);
- memcpy(dst_char, src->ptr, unpadded_len);
- StringValue::PadWithSpaces(dst_char, char_len, unpadded_len);
- return true;
-}
-
-template <>
-inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
-::NeedsConversionInline() const {
- return needs_conversion_;
-}
-
-template <>
-inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>
-::NeedsConversionInline() const {
- return needs_conversion_;
-}
-
-template <>
-bool ScalarColumnReader<TimestampValue, parquet::Type::INT96,
true>::ConvertSlot(
- const TimestampValue* src, void* slot) {
- // Conversion should only happen when this flag is enabled.
- DCHECK(FLAGS_convert_legacy_hive_parquet_utc_timestamps);
- DCHECK(timestamp_decoder_.NeedsConversion());
- TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
- *dst_ts = *src;
- // TODO: IMPALA-7862: converting timestamps after validating them can move
them out of
- // range. We should either validate after conversion or require conversion
to produce an
- // in-range value.
- timestamp_decoder_.ConvertToLocalTime(dst_ts);
- return true;
-}
-
-template <>
-bool ScalarColumnReader<TimestampValue, parquet::Type::INT64,
true>::ConvertSlot(
- const TimestampValue* src, void* slot) {
- DCHECK(timestamp_decoder_.NeedsConversion());
- TimestampValue* dst_ts = reinterpret_cast<TimestampValue*>(slot);
- *dst_ts = *src;
- // TODO: IMPALA-7862: converting timestamps after validating them can move
them out of
- // range. We should either validate after conversion or require conversion
to produce an
- // in-range value.
- timestamp_decoder_.ConvertToLocalTime(static_cast<TimestampValue*>(dst_ts));
- return true;
-}
-
-template <>
-inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>
-::NeedsValidationInline() const {
- return true;
-}
-
-template <>
-inline bool ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>
-::NeedsValidationInline() const {
- return true;
-}
-
-template <>
-bool ScalarColumnReader<TimestampValue, parquet::Type::INT96,
true>::ValidateValue(
- TimestampValue* val) const {
- if (UNLIKELY(!TimestampValue::IsValidDate(val->date())
- || !TimestampValue::IsValidTime(val->time()))) {
- // If both are corrupt, invalid time takes precedence over invalid date,
because
- // invalid date may come from a more or less functional encoder that does
not respect
- // the 1400..9999 limit, while an invalid time is a good indicator of
buggy encoder
- // or memory garbage.
- TErrorCode::type errorCode = TimestampValue::IsValidTime(val->time())
- ? TErrorCode::PARQUET_TIMESTAMP_OUT_OF_RANGE
- : TErrorCode::PARQUET_TIMESTAMP_INVALID_TIME_OF_DAY;
- ErrorMsg msg(errorCode, filename(), node_.element->name);
- Status status = parent_->state_->LogOrReturnError(msg);
- if (!status.ok()) parent_->parse_status_ = status;
- return false;
- }
- return true;
-}
-
-template <>
-bool ScalarColumnReader<TimestampValue, parquet::Type::INT64,
true>::ValidateValue(
- TimestampValue* val) const {
- // The range was already checked during the int64_t->TimestampValue
conversion, which
- // sets the date to invalid if it was out of range.
- if (UNLIKELY(!val->HasDate())) {
- ErrorMsg msg(TErrorCode::PARQUET_TIMESTAMP_OUT_OF_RANGE,
- filename(), node_.element->name);
- Status status = parent_->state_->LogOrReturnError(msg);
- if (!status.ok()) parent_->parse_status_ = status;
- return false;
- }
- DCHECK(TimestampValue::IsValidDate(val->date()));
- DCHECK(TimestampValue::IsValidTime(val->time()));
- return true;
-}
-
-class BoolColumnReader : public BaseScalarColumnReader {
- public:
- BoolColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
- const SlotDescriptor* slot_desc)
- : BaseScalarColumnReader(parent, node, slot_desc) {
- if (slot_desc_ != nullptr) DCHECK_EQ(slot_desc_->type().type,
TYPE_BOOLEAN);
- }
-
- virtual ~BoolColumnReader() { }
-
- virtual bool ReadValue(MemPool* pool, Tuple* tuple) override {
- return ReadValue<true>(tuple);
- }
-
- virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) override {
- return ReadValue<false>(tuple);
- }
-
- protected:
- virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
- DictDecoderBase** decoder) override {
- DCHECK(false) << "Dictionary encoding is not supported for bools. Should
never "
- << "have gotten this far.";
- return Status::OK();
- }
-
- virtual bool HasDictionaryDecoder() override {
- // Decoder should never be created for bools.
- return false;
- }
-
- virtual void ClearDictionaryDecoder() override { }
-
- virtual Status InitDataPage(uint8_t* data, int size) override;
-
- private:
- template <bool IN_COLLECTION>
- inline bool ReadValue(Tuple* tuple);
-
- /// Decodes the next value into 'value'. Returns false and sets
- /// 'parent_->parse_status_' if an error is encountered decoding the
- /// value. Otherwise returns true.
- template <bool IN_COLLECTION>
- inline bool DecodeValue(bool* value);
-
- /// A buffer to store unpacked values. Must be a multiple of 32 size to use
the
- /// batch-oriented interface of BatchedBitReader.
- static const int UNPACKED_BUFFER_LEN = 128;
- bool unpacked_values_[UNPACKED_BUFFER_LEN];
-
- /// The number of valid values in 'unpacked_values_'.
- int num_unpacked_values_ = 0;
-
- /// The next value to return from 'unpacked_values_'.
- int unpacked_value_idx_ = 0;
-
- /// Bit packed decoder, used if 'encoding_' is PLAIN.
- BatchedBitReader bool_values_;
-
- /// RLE decoder, used if 'encoding_' is RLE.
- RleBatchDecoder<bool> rle_decoder_;
-};
-
-Status BoolColumnReader::InitDataPage(uint8_t* data, int size) {
- page_encoding_ = current_page_header_.data_page_header.encoding;
- // Only the relevant decoder is initialized for a given data page.
- switch (page_encoding_) {
- case parquet::Encoding::PLAIN:
- bool_values_.Reset(data, size);
- break;
- case parquet::Encoding::RLE:
- // The first 4 bytes contain the size of the encoded data. This
information is
- // redundant, as this is the last part of the data page, and the number
of
- // remaining bytes is already known.
- rle_decoder_.Reset(data + 4, size - 4, 1);
- break;
- default:
- return GetUnsupportedDecodingError();
- }
- num_unpacked_values_ = 0;
- unpacked_value_idx_ = 0;
- return Status::OK();
-}
-
-template <bool IN_COLLECTION>
-bool BoolColumnReader::ReadValue(Tuple* tuple) {
- DCHECK(slot_desc_ != nullptr);
- // Def and rep levels should be in valid range.
- DCHECK_GE(rep_level_, 0);
- DCHECK_GE(def_level_, 0);
- DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
- "Caller should have called NextLevels() until we are ready to read a
value";
-
- if (def_level_ >= max_def_level()) {
- bool* slot = tuple->GetBoolSlot(tuple_offset_);
- if (UNLIKELY(!DecodeValue<IN_COLLECTION>(slot))) return false;
- } else {
- // Null value
- tuple->SetNull(null_indicator_offset_);
- }
- return NextLevels<IN_COLLECTION>();
-}
-
-template <bool IN_COLLECTION>
-bool BoolColumnReader::DecodeValue(bool* value) {
- if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) {
- *value = unpacked_values_[unpacked_value_idx_++];
- } else {
- // Unpack as many values as we can into the buffer. We expect to read at
least one
- // value.
- if (page_encoding_ == parquet::Encoding::PLAIN) {
- num_unpacked_values_ =
- bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN,
&unpacked_values_[0]);
- } else {
- num_unpacked_values_ =
- rle_decoder_.GetValues(UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
- }
-
- if (UNLIKELY(num_unpacked_values_ == 0)) {
- parent_->parse_status_ = Status("Invalid bool column.");
- return false;
- }
- *value = unpacked_values_[0];
- unpacked_value_idx_ = 1;
- }
- return true;
-}
-
-// Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error
handling
-// path doesn't falsely report that the file is corrupted.
-bool ParquetColumnReader::ColReaderDebugAction(int* val_count) {
-#ifndef NDEBUG
- Status status = parent_->ScannerDebugAction();
- if (!status.ok()) {
- if (!status.IsCancelled()) parent_->parse_status_.MergeStatus(status);
- *val_count = 0;
- return false;
- }
-#endif
- return true;
-}
-
-bool ParquetColumnReader::ReadValueBatch(MemPool* pool, int max_values,
- int tuple_size, uint8_t* tuple_mem, int* num_values) {
- // The below loop requires that NextLevels() was called previously to
populate
- // 'def_level_' and 'rep_level_'. Ensure it is called at the start of each
- // row group.
- if (def_level_ == HdfsParquetScanner::INVALID_LEVEL && !NextLevels()) return
false;
-
- int val_count = 0;
- bool continue_execution = true;
- while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
- Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count *
tuple_size);
- if (def_level_ < def_level_of_immediate_repeated_ancestor()) {
- // A containing repeated field is empty or NULL
- continue_execution = NextLevels();
- continue;
- }
- // Fill in position slot if applicable
- if (pos_slot_desc_ != nullptr) {
-
ReadPositionNonBatched(tuple->GetBigIntSlot(pos_slot_desc()->tuple_offset()));
- }
- continue_execution = ReadValue(pool, tuple);
- ++val_count;
- if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
- continue_execution &= ColReaderDebugAction(&val_count);
- }
- }
- *num_values = val_count;
- return continue_execution;
-}
-
-bool ParquetColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
- int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) {
- // The below loop requires that NextLevels() was called previously to
populate
- // 'def_level_' and 'rep_level_'. Ensure it is called at the start of each
- // row group.
- if (def_level_ == HdfsParquetScanner::INVALID_LEVEL && !NextLevels()) return
false;
-
- int val_count = 0;
- bool continue_execution = true;
- while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
- Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count *
tuple_size);
- continue_execution = ReadNonRepeatedValue(pool, tuple);
- ++val_count;
- if (SHOULD_TRIGGER_DEBUG_ACTION(val_count)) {
- continue_execution &= ColReaderDebugAction(&val_count);
- }
- }
- *num_values = val_count;
- return continue_execution;
-}
-
-void ParquetColumnReader::ReadPositionNonBatched(int64_t* pos) {
- // NextLevels() should have already been called
- DCHECK_GE(rep_level_, 0);
- DCHECK_GE(def_level_, 0);
- DCHECK_GE(pos_current_value_, 0);
- DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
- "Caller should have called NextLevels() until we are ready to read a
value";
- *pos = pos_current_value_++;
-}
-
-// In 1.1, we had a bug where the dictionary page metadata was not set.
Returns true
-// if this matches those versions and compatibility workarounds need to be
used.
-static bool RequiresSkippedDictionaryHeaderCheck(
- const ParquetFileVersion& v) {
- if (v.application != "impala") return false;
- return v.VersionEq(1,1,0) || (v.VersionEq(1,2,0) && v.is_impala_internal);
-}
-
-Status BaseScalarColumnReader::Reset(const HdfsFileDesc& file_desc,
- const parquet::ColumnChunk& col_chunk, int row_group_idx) {
- // Ensure metadata is valid before using it to initialize the reader.
-
RETURN_IF_ERROR(ParquetMetadataUtils::ValidateRowGroupColumn(parent_->file_metadata_,
- parent_->filename(), row_group_idx, col_idx(), schema_element(),
- parent_->state_));
- num_buffered_values_ = 0;
- data_ = nullptr;
- data_end_ = nullptr;
- stream_ = nullptr;
- io_reservation_ = 0;
- metadata_ = &col_chunk.meta_data;
- num_values_read_ = 0;
- def_level_ = HdfsParquetScanner::INVALID_LEVEL;
- // See ColumnReader constructor.
- rep_level_ = max_rep_level() == 0 ? 0 : HdfsParquetScanner::INVALID_LEVEL;
- pos_current_value_ = HdfsParquetScanner::INVALID_POS;
-
- if (metadata_->codec != parquet::CompressionCodec::UNCOMPRESSED) {
- RETURN_IF_ERROR(Codec::CreateDecompressor(
- nullptr, false, ConvertParquetToImpalaCodec(metadata_->codec),
&decompressor_));
- }
- int64_t col_start = col_chunk.meta_data.data_page_offset;
- if (col_chunk.meta_data.__isset.dictionary_page_offset) {
- // Already validated in ValidateColumnOffsets()
- DCHECK_LT(col_chunk.meta_data.dictionary_page_offset, col_start);
- col_start = col_chunk.meta_data.dictionary_page_offset;
- }
- int64_t col_len = col_chunk.meta_data.total_compressed_size;
- if (col_len <= 0) {
- return Status(Substitute("File '$0' contains invalid column chunk size:
$1",
- filename(), col_len));
- }
- int64_t col_end = col_start + col_len;
-
- // Already validated in ValidateColumnOffsets()
- DCHECK_GT(col_end, 0);
- DCHECK_LT(col_end, file_desc.file_length);
- const ParquetFileVersion& file_version = parent_->file_version_;
- if (file_version.application == "parquet-mr" && file_version.VersionLt(1, 2,
9)) {
- // The Parquet MR writer had a bug in 1.2.8 and below where it didn't
include the
- // dictionary page header size in total_compressed_size and
total_uncompressed_size
- // (see IMPALA-694). We pad col_len to compensate.
- int64_t bytes_remaining = file_desc.file_length - col_end;
- int64_t pad = min<int64_t>(MAX_DICT_HEADER_SIZE, bytes_remaining);
- col_len += pad;
- }
-
- // TODO: this will need to change when we have co-located files and the
columns
- // are different files.
- if (!col_chunk.file_path.empty() && col_chunk.file_path != filename()) {
- return Status(Substitute("Expected parquet column file path '$0' to match "
- "filename '$1'", col_chunk.file_path, filename()));
- }
-
- const ScanRange* metadata_range = parent_->metadata_range_;
- int64_t partition_id = parent_->context_->partition_descriptor()->id();
- const ScanRange* split_range =
-
static_cast<ScanRangeMetadata*>(metadata_range->meta_data())->original_split;
- // Determine if the column is completely contained within a local split.
- bool col_range_local = split_range->expected_local()
- && col_start >= split_range->offset()
- && col_end <= split_range->offset() + split_range->len();
- scan_range_ = parent_->scan_node_->AllocateScanRange(metadata_range->fs(),
- filename(), col_len, col_start, partition_id, split_range->disk_id(),
- col_range_local,
- BufferOpts(split_range->try_cache(), file_desc.mtime));
- ClearDictionaryDecoder();
- return Status::OK();
-}
-
-Status BaseScalarColumnReader::StartScan() {
- DCHECK(scan_range_ != nullptr) << "Must Reset() before starting scan.";
- DiskIoMgr* io_mgr = ExecEnv::GetInstance()->disk_io_mgr();
- ScannerContext* context = parent_->context_;
- DCHECK_GT(io_reservation_, 0);
- bool needs_buffers;
- RETURN_IF_ERROR(parent_->scan_node_->reader_context()->StartScanRange(
- scan_range_, &needs_buffers));
- if (needs_buffers) {
- RETURN_IF_ERROR(io_mgr->AllocateBuffersForRange(
- context->bp_client(), scan_range_, io_reservation_));
- }
- stream_ = parent_->context_->AddStream(scan_range_, io_reservation_);
- DCHECK(stream_ != nullptr);
- return Status::OK();
-}
-
-Status BaseScalarColumnReader::ReadPageHeader(bool peek,
- parquet::PageHeader* next_page_header, uint32_t* next_header_size, bool*
eos) {
- DCHECK(stream_ != nullptr);
- *eos = false;
-
- uint8_t* buffer;
- int64_t buffer_size;
- RETURN_IF_ERROR(stream_->GetBuffer(true, &buffer, &buffer_size));
- // check for end of stream
- if (buffer_size == 0) {
- // The data pages contain fewer values than stated in the column metadata.
- DCHECK(stream_->eosr());
- DCHECK_LT(num_values_read_, metadata_->num_values);
- // TODO for 2.3: node_.element->name isn't necessarily useful
- ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
metadata_->num_values,
- num_values_read_, node_.element->name, filename());
- RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
- *eos = true;
- return Status::OK();
- }
-
- // We don't know the actual header size until the thrift object is
deserialized. Loop
- // until we successfully deserialize the header or exceed the maximum header
size.
- uint32_t header_size;
- Status status;
- while (true) {
- header_size = buffer_size;
- status = DeserializeThriftMsg(buffer, &header_size, true,
next_page_header);
- if (status.ok()) break;
-
- if (buffer_size >= FLAGS_max_page_header_size) {
- stringstream ss;
- ss << "ParquetScanner: could not read data page because page header
exceeded "
- << "maximum size of "
- << PrettyPrinter::Print(FLAGS_max_page_header_size, TUnit::BYTES);
- status.AddDetail(ss.str());
- return status;
- }
-
- // Didn't read entire header, increase buffer size and try again
- int64_t new_buffer_size = max<int64_t>(buffer_size * 2, 1024);
- status = Status::OK();
- bool success = stream_->GetBytes(
- new_buffer_size, &buffer, &new_buffer_size, &status, /* peek */ true);
- if (!success) {
- DCHECK(!status.ok());
- return status;
- }
- DCHECK(status.ok());
-
- // Even though we increased the allowed buffer size, the number of bytes
- // read did not change. The header is not limited by the buffer space,
- // so it must be incomplete in the file.
- if (buffer_size == new_buffer_size) {
- DCHECK_NE(new_buffer_size, 0);
- return Status(TErrorCode::PARQUET_HEADER_EOF, filename());
- }
- DCHECK_GT(new_buffer_size, buffer_size);
- buffer_size = new_buffer_size;
- }
-
- *next_header_size = header_size;
-
- // Successfully deserialized current_page_header_
- if (!peek && !stream_->SkipBytes(header_size, &status)) return status;
-
- int data_size = next_page_header->compressed_page_size;
- if (UNLIKELY(data_size < 0)) {
- return Status(Substitute("Corrupt Parquet file '$0': negative page size $1
for "
- "column '$2'", filename(), data_size, schema_element().name));
- }
- int uncompressed_size = next_page_header->uncompressed_page_size;
- if (UNLIKELY(uncompressed_size < 0)) {
- return Status(Substitute("Corrupt Parquet file '$0': negative uncompressed
page "
- "size $1 for column '$2'", filename(), uncompressed_size,
- schema_element().name));
- }
-
- return Status::OK();
-}
-
-Status BaseScalarColumnReader::InitDictionary() {
- // Peek at the next page header
- bool eos;
- parquet::PageHeader next_page_header;
- uint32_t next_header_size;
- DCHECK(stream_ != nullptr);
- DCHECK(!HasDictionaryDecoder());
-
- RETURN_IF_ERROR(ReadPageHeader(true /* peek */, &next_page_header,
- &next_header_size, &eos));
- if (eos) return Status::OK();
- // The dictionary must be the first data page, so if the first page
- // is not a dictionary, then there is no dictionary.
- if (next_page_header.type != parquet::PageType::DICTIONARY_PAGE) return
Status::OK();
-
- current_page_header_ = next_page_header;
- Status status;
- if (!stream_->SkipBytes(next_header_size, &status)) return status;
-
- int data_size = current_page_header_.compressed_page_size;
- if (slot_desc_ == nullptr) {
- // Skip processing the dictionary page if we don't need to decode any
values. In
- // addition to being unnecessary, we are likely unable to successfully
decode the
- // dictionary values because we don't necessarily create the right type of
scalar
- // reader if there's no slot to read into (see CreateReader()).
- if (!stream_->SkipBytes(data_size, &status)) return status;
- return Status::OK();
- }
-
- if (node_.element->type == parquet::Type::BOOLEAN) {
- return Status("Unexpected dictionary page. Dictionary page is not"
- " supported for booleans.");
- }
-
- const parquet::DictionaryPageHeader* dict_header = nullptr;
- if (current_page_header_.__isset.dictionary_page_header) {
- dict_header = ¤t_page_header_.dictionary_page_header;
- } else {
- if (!RequiresSkippedDictionaryHeaderCheck(parent_->file_version_)) {
- return Status("Dictionary page does not have dictionary header set.");
- }
- }
- if (dict_header != nullptr &&
- dict_header->encoding != Encoding::PLAIN &&
- dict_header->encoding != Encoding::PLAIN_DICTIONARY) {
- return Status("Only PLAIN and PLAIN_DICTIONARY encodings are supported "
- "for dictionary pages.");
- }
-
- if (!stream_->ReadBytes(data_size, &data_, &status)) return status;
- data_end_ = data_ + data_size;
-
- // The size of dictionary can be 0, if every value is null. The dictionary
still has to
- // be reset in this case.
- DictDecoderBase* dict_decoder;
- if (current_page_header_.uncompressed_page_size == 0) {
- return CreateDictionaryDecoder(nullptr, 0, &dict_decoder);
- }
-
- // There are 3 different cases from the aspect of memory management:
- // 1. If the column type is string, the dictionary will contain pointers to
a buffer,
- // so the buffer's lifetime must be as long as any row batch that
references it.
- // 2. If the column type is not string, and the dictionary page is
compressed, then a
- // temporary buffer is needed for the uncompressed values.
- // 3. If the column type is not string, and the dictionary page is not
compressed,
- // then no buffer is necessary.
- ScopedBuffer uncompressed_buffer(parent_->dictionary_pool_->mem_tracker());
- uint8_t* dict_values = nullptr;
- if (decompressor_.get() != nullptr || slot_desc_->type().IsStringType()) {
- int buffer_size = current_page_header_.uncompressed_page_size;
- if (slot_desc_->type().IsStringType()) {
- dict_values = parent_->dictionary_pool_->TryAllocate(buffer_size); //
case 1.
- } else if (uncompressed_buffer.TryAllocate(buffer_size)) {
- dict_values = uncompressed_buffer.buffer(); // case 2
- }
- if (UNLIKELY(dict_values == nullptr)) {
- string details = Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED,
"InitDictionary",
- buffer_size, "dictionary");
- return parent_->dictionary_pool_->mem_tracker()->MemLimitExceeded(
- parent_->state_, details, buffer_size);
- }
- } else {
- dict_values = data_; // case 3.
- }
-
- if (decompressor_.get() != nullptr) {
- int uncompressed_size = current_page_header_.uncompressed_page_size;
- RETURN_IF_ERROR(decompressor_->ProcessBlock32(true, data_size, data_,
- &uncompressed_size, &dict_values));
- VLOG_FILE << "Decompressed " << data_size << " to " << uncompressed_size;
- if (current_page_header_.uncompressed_page_size != uncompressed_size) {
- return Status(Substitute("Error decompressing dictionary page in file
'$0'. "
- "Expected $1 uncompressed bytes but got $2", filename(),
- current_page_header_.uncompressed_page_size,
uncompressed_size));
- }
- } else {
- if (current_page_header_.uncompressed_page_size != data_size) {
- return Status(Substitute("Error reading dictionary page in file '$0'. "
- "Expected $1 bytes but got $2", filename(),
- current_page_header_.uncompressed_page_size,
data_size));
- }
- if (slot_desc_->type().IsStringType()) memcpy(dict_values, data_,
data_size);
- }
-
- RETURN_IF_ERROR(CreateDictionaryDecoder(
- dict_values, current_page_header_.uncompressed_page_size,
&dict_decoder));
- if (dict_header != nullptr &&
- dict_header->num_values != dict_decoder->num_entries()) {
- return Status(TErrorCode::PARQUET_CORRUPT_DICTIONARY, filename(),
- slot_desc_->type().DebugString(),
- Substitute("Expected $0 entries but data contained $1
entries",
- dict_header->num_values,
dict_decoder->num_entries()));
- }
-
- return Status::OK();
-}
-
-Status BaseScalarColumnReader::InitDictionaries(
- const vector<BaseScalarColumnReader*> readers) {
- for (BaseScalarColumnReader* reader : readers) {
- RETURN_IF_ERROR(reader->InitDictionary());
- }
- return Status::OK();
-}
-
-Status BaseScalarColumnReader::ReadDataPage() {
- // We're about to move to the next data page. The previous data page is
- // now complete, free up any memory allocated for it. If the data page
contained
- // strings we need to attach it to the returned batch.
- if (PageContainsTupleData(page_encoding_)) {
- parent_->scratch_batch_->aux_mem_pool.AcquireData(data_page_pool_.get(),
false);
- } else {
- data_page_pool_->FreeAll();
- }
- // We don't hold any pointers to earlier pages in the stream - we can safely
free
- // any I/O or boundary buffer.
- stream_->ReleaseCompletedResources(false);
-
- // Read the next data page, skipping page types we don't care about.
- // We break out of this loop on the non-error case (a data page was found or
we read all
- // the pages).
- while (true) {
- DCHECK_EQ(num_buffered_values_, 0);
- if (num_values_read_ == metadata_->num_values) {
- // No more pages to read
- // TODO: should we check for stream_->eosr()?
- break;
- } else if (num_values_read_ > metadata_->num_values) {
- ErrorMsg msg(TErrorCode::PARQUET_COLUMN_METADATA_INVALID,
- metadata_->num_values, num_values_read_, node_.element->name,
filename());
- RETURN_IF_ERROR(parent_->state_->LogOrReturnError(msg));
- return Status::OK();
- }
-
- bool eos;
- uint32_t header_size;
- RETURN_IF_ERROR(ReadPageHeader(false /* peek */, ¤t_page_header_,
- &header_size, &eos));
- if (eos) return Status::OK();
-
- if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) {
- // Any dictionary is already initialized, as InitDictionary has already
- // been called. There are two possibilities:
- // 1. The parquet file has two dictionary pages
- // OR
- // 2. The parquet file does not have the dictionary as the first data
page.
- // Both are errors in the parquet file.
- if (HasDictionaryDecoder()) {
- return Status(Substitute("Corrupt Parquet file '$0': multiple
dictionary pages "
- "for column '$1'", filename(), schema_element().name));
- } else {
- return Status(Substitute("Corrupt Parquet file: '$0': dictionary page
for "
- "column '$1' is not the first page", filename(),
schema_element().name));
- }
- }
-
- Status status;
- int data_size = current_page_header_.compressed_page_size;
- if (current_page_header_.type != parquet::PageType::DATA_PAGE) {
- // We can safely skip non-data pages
- if (!stream_->SkipBytes(data_size, &status)) {
- DCHECK(!status.ok());
- return status;
- }
- continue;
- }
-
- // Read Data Page
- // TODO: when we start using page statistics, we will need to ignore
certain corrupt
- // statistics. See IMPALA-2208 and PARQUET-251.
- if (!stream_->ReadBytes(data_size, &data_, &status)) {
- DCHECK(!status.ok());
- return status;
- }
- data_end_ = data_ + data_size;
- int num_values = current_page_header_.data_page_header.num_values;
- if (num_values < 0) {
- return Status(Substitute("Error reading data page in Parquet file '$0'. "
- "Invalid number of values in metadata: $1", filename(), num_values));
- }
- num_buffered_values_ = num_values;
- num_values_read_ += num_buffered_values_;
-
- int uncompressed_size = current_page_header_.uncompressed_page_size;
- if (decompressor_.get() != nullptr) {
- SCOPED_TIMER(parent_->decompress_timer_);
- uint8_t* decompressed_buffer;
- RETURN_IF_ERROR(AllocateUncompressedDataPage(
- uncompressed_size, "decompressed data", &decompressed_buffer));
- RETURN_IF_ERROR(decompressor_->ProcessBlock32(true,
- current_page_header_.compressed_page_size, data_, &uncompressed_size,
- &decompressed_buffer));
- VLOG_FILE << "Decompressed " << current_page_header_.compressed_page_size
- << " to " << uncompressed_size;
- if (current_page_header_.uncompressed_page_size != uncompressed_size) {
- return Status(Substitute("Error decompressing data page in file '$0'. "
- "Expected $1 uncompressed bytes but got $2", filename(),
- current_page_header_.uncompressed_page_size, uncompressed_size));
- }
- data_ = decompressed_buffer;
- data_size = current_page_header_.uncompressed_page_size;
- data_end_ = data_ + data_size;
- } else {
- DCHECK_EQ(metadata_->codec, parquet::CompressionCodec::UNCOMPRESSED);
- if (current_page_header_.compressed_page_size != uncompressed_size) {
- return Status(Substitute("Error reading data page in file '$0'. "
- "Expected $1 bytes but got $2", filename(),
- current_page_header_.compressed_page_size, uncompressed_size));
- }
- if
(PageContainsTupleData(current_page_header_.data_page_header.encoding)) {
- // In this case returned batches will have pointers into the data page
itself.
- // We don't transfer disk I/O buffers out of the scanner so we need to
copy
- // the page data so that it can be attached to output batches.
- uint8_t* copy_buffer;
- RETURN_IF_ERROR(AllocateUncompressedDataPage(
- uncompressed_size, "uncompressed variable-length data",
©_buffer));
- memcpy(copy_buffer, data_, uncompressed_size);
- data_ = copy_buffer;
- data_end_ = data_ + uncompressed_size;
- }
- }
-
- // Initialize the repetition level data
- RETURN_IF_ERROR(rep_levels_.Init(filename(),
- current_page_header_.data_page_header.repetition_level_encoding,
- parent_->perm_pool_.get(), parent_->state_->batch_size(),
max_rep_level(), &data_,
- &data_size));
-
- // Initialize the definition level data
- RETURN_IF_ERROR(def_levels_.Init(filename(),
- current_page_header_.data_page_header.definition_level_encoding,
- parent_->perm_pool_.get(), parent_->state_->batch_size(),
max_def_level(), &data_,
- &data_size));
-
- // Data can be empty if the column contains all NULLs
- RETURN_IF_ERROR(InitDataPage(data_, data_size));
- break;
- }
-
- return Status::OK();
-}
-
-Status BaseScalarColumnReader::AllocateUncompressedDataPage(int64_t size,
- const char* err_ctx, uint8_t** buffer) {
- *buffer = data_page_pool_->TryAllocate(size);
- if (*buffer == nullptr) {
- string details =
- Substitute(PARQUET_COL_MEM_LIMIT_EXCEEDED, "ReadDataPage", size,
err_ctx);
- return data_page_pool_->mem_tracker()->MemLimitExceeded(
- parent_->state_, details, size);
- }
- return Status::OK();
-}
-
-template <bool ADVANCE_REP_LEVEL>
-bool BaseScalarColumnReader::NextLevels() {
- if (!ADVANCE_REP_LEVEL) DCHECK_EQ(max_rep_level(), 0) <<
slot_desc()->DebugString();
-
- if (UNLIKELY(num_buffered_values_ == 0)) {
- if (!NextPage()) return parent_->parse_status_.ok();
- }
- --num_buffered_values_;
- DCHECK_GE(num_buffered_values_, 0);
-
- // Definition level is not present if column and any containing structs are
required.
- def_level_ = max_def_level() == 0 ? 0 : def_levels_.ReadLevel();
- // The compiler can optimize these two conditions into a single branch by
treating
- // def_level_ as unsigned.
- if (UNLIKELY(def_level_ < 0 || def_level_ > max_def_level())) {
- SetLevelDecodeError("def", def_level_, max_def_level());
- return false;
- }
-
- if (ADVANCE_REP_LEVEL && max_rep_level() > 0) {
- // Repetition level is only present if this column is nested in any
collection type.
- rep_level_ = rep_levels_.ReadLevel();
- if (UNLIKELY(rep_level_ < 0 || rep_level_ > max_rep_level())) {
- SetLevelDecodeError("rep", rep_level_, max_rep_level());
- return false;
- }
- // Reset position counter if we are at the start of a new parent
collection.
- if (rep_level_ <= max_rep_level() - 1) pos_current_value_ = 0;
- }
-
- return parent_->parse_status_.ok();
-}
-
-Status BaseScalarColumnReader::GetUnsupportedDecodingError() {
- return Status(Substitute(
- "File '$0' is corrupt: unexpected encoding: $1 for data page of column
'$2'.",
- filename(), PrintThriftEnum(page_encoding_), schema_element().name));
-}
-
-bool BaseScalarColumnReader::NextPage() {
- parent_->assemble_rows_timer_.Stop();
- parent_->parse_status_ = ReadDataPage();
- if (UNLIKELY(!parent_->parse_status_.ok())) return false;
- if (num_buffered_values_ == 0) {
- rep_level_ = HdfsParquetScanner::ROW_GROUP_END;
- def_level_ = HdfsParquetScanner::ROW_GROUP_END;
- pos_current_value_ = HdfsParquetScanner::INVALID_POS;
- return false;
- }
- parent_->assemble_rows_timer_.Start();
- return true;
-}
-
-void BaseScalarColumnReader::SetLevelDecodeError(const char* level_name,
- int decoded_level, int max_level) {
- if (decoded_level < 0) {
- DCHECK_EQ(decoded_level, HdfsParquetScanner::INVALID_LEVEL);
- parent_->parse_status_.MergeStatus(Status(Substitute("Corrupt Parquet file
'$0': "
- "could not read all $1 levels for column '$2'", filename(),
- level_name, schema_element().name)));
- } else {
- parent_->parse_status_.MergeStatus(Status(Substitute("Corrupt Parquet file
'$0': "
- "invalid $1 level $2 > max $1 level $3 for column '$4'", filename(),
- level_name, decoded_level, max_level, schema_element().name)));
- }
-}
-
-bool CollectionColumnReader::NextLevels() {
- DCHECK(!children_.empty());
- DCHECK_LE(rep_level_, new_collection_rep_level());
- for (int c = 0; c < children_.size(); ++c) {
- do {
- // TODO(skye): verify somewhere that all column readers are at end
- if (!children_[c]->NextLevels()) return false;
- } while (children_[c]->rep_level() > new_collection_rep_level());
- }
- UpdateDerivedState();
- return true;
-}
-
-bool CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) {
- DCHECK_GE(rep_level_, 0);
- DCHECK_GE(def_level_, 0);
- DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
- "Caller should have called NextLevels() until we are ready to read a
value";
-
- if (tuple_offset_ == -1) {
- return CollectionColumnReader::NextLevels();
- } else if (def_level_ >= max_def_level()) {
- return ReadSlot(tuple->GetCollectionSlot(tuple_offset_), pool);
- } else {
- // Null value
- tuple->SetNull(null_indicator_offset_);
- return CollectionColumnReader::NextLevels();
- }
-}
-
-bool CollectionColumnReader::ReadNonRepeatedValue(MemPool* pool, Tuple* tuple)
{
- return CollectionColumnReader::ReadValue(pool, tuple);
-}
-
-bool CollectionColumnReader::ReadSlot(CollectionValue* slot, MemPool* pool) {
- DCHECK(!children_.empty());
- DCHECK_LE(rep_level_, new_collection_rep_level());
-
- // Recursively read the collection into a new CollectionValue.
- *slot = CollectionValue();
- CollectionValueBuilder builder(
- slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_);
- bool continue_execution = parent_->AssembleCollection(
- children_, new_collection_rep_level(), &builder);
- if (!continue_execution) return false;
-
- // AssembleCollection() advances child readers, so we don't need to call
NextLevels()
- UpdateDerivedState();
- return true;
-}
-
-void CollectionColumnReader::UpdateDerivedState() {
- // We don't need to cap our def_level_ at max_def_level(). We always check
def_level_
- // >= max_def_level() to check if the collection is defined.
- // TODO(skye): consider capping def_level_ at max_def_level()
- def_level_ = children_[0]->def_level();
- rep_level_ = children_[0]->rep_level();
-
- // All children should have been advanced to the beginning of the next
collection
- for (int i = 0; i < children_.size(); ++i) {
- DCHECK_EQ(children_[i]->rep_level(), rep_level_);
- if (def_level_ < max_def_level()) {
- // Collection not defined
- FILE_CHECK_EQ(children_[i]->def_level(), def_level_);
- } else {
- // Collection is defined
- FILE_CHECK_GE(children_[i]->def_level(), max_def_level());
- }
- }
-
- if (RowGroupAtEnd()) {
- // No more values
- pos_current_value_ = HdfsParquetScanner::INVALID_POS;
- } else if (rep_level_ <= max_rep_level() - 2) {
- // Reset position counter if we are at the start of a new parent
collection (i.e.,
- // the current collection is the first item in a new parent collection).
- pos_current_value_ = 0;
- }
-}
-
-/// Returns a column reader for decimal types based on its size and parquet
type.
-static ParquetColumnReader* CreateDecimalColumnReader(const SchemaNode& node,
- const SlotDescriptor* slot_desc, HdfsParquetScanner* parent) {
- switch (node.element->type) {
- case parquet::Type::FIXED_LEN_BYTE_ARRAY:
- switch (slot_desc->type().GetByteSize()) {
- case 4:
- return new ScalarColumnReader<Decimal4Value,
parquet::Type::FIXED_LEN_BYTE_ARRAY,
- true>(parent, node, slot_desc);
- case 8:
- return new ScalarColumnReader<Decimal8Value,
parquet::Type::FIXED_LEN_BYTE_ARRAY,
- true>(parent, node, slot_desc);
- case 16:
- return new ScalarColumnReader<Decimal16Value,
parquet::Type::FIXED_LEN_BYTE_ARRAY,
- true>(parent, node, slot_desc);
- }
- break;
- case parquet::Type::BYTE_ARRAY:
- switch (slot_desc->type().GetByteSize()) {
- case 4:
- return new ScalarColumnReader<Decimal4Value,
parquet::Type::BYTE_ARRAY, true>(
- parent, node, slot_desc);
- case 8:
- return new ScalarColumnReader<Decimal8Value,
parquet::Type::BYTE_ARRAY, true>(
- parent, node, slot_desc);
- case 16:
- return new ScalarColumnReader<Decimal16Value,
parquet::Type::BYTE_ARRAY, true>(
- parent, node, slot_desc);
- }
- break;
- case parquet::Type::INT32:
- DCHECK_EQ(sizeof(Decimal4Value::StorageType),
slot_desc->type().GetByteSize());
- return new ScalarColumnReader<Decimal4Value, parquet::Type::INT32, true>(
- parent, node, slot_desc);
- case parquet::Type::INT64:
- DCHECK_EQ(sizeof(Decimal8Value::StorageType),
slot_desc->type().GetByteSize());
- return new ScalarColumnReader<Decimal8Value, parquet::Type::INT64, true>(
- parent, node, slot_desc);
- default:
- DCHECK(false) << "Invalid decimal primitive type";
- }
- DCHECK(false) << "Invalid decimal type";
- return nullptr;
-}
-
-ParquetColumnReader* ParquetColumnReader::Create(const SchemaNode& node,
- bool is_collection_field, const SlotDescriptor* slot_desc,
- HdfsParquetScanner* parent) {
- if (is_collection_field) {
- // Create collection reader (note this handles both NULL and non-NULL
'slot_desc')
- return new CollectionColumnReader(parent, node, slot_desc);
- } else if (slot_desc != nullptr) {
- // Create the appropriate ScalarColumnReader type to read values into
'slot_desc'
- switch (slot_desc->type().type) {
- case TYPE_BOOLEAN:
- return new BoolColumnReader(parent, node, slot_desc);
- case TYPE_TINYINT:
- return new ScalarColumnReader<int8_t, parquet::Type::INT32,
true>(parent, node,
- slot_desc);
- case TYPE_SMALLINT:
- return new ScalarColumnReader<int16_t, parquet::Type::INT32,
true>(parent, node,
- slot_desc);
- case TYPE_INT:
- return new ScalarColumnReader<int32_t, parquet::Type::INT32,
true>(parent, node,
- slot_desc);
- case TYPE_BIGINT:
- switch (node.element->type) {
- case parquet::Type::INT32:
- return new ScalarColumnReader<int64_t, parquet::Type::INT32,
true>(parent,
- node, slot_desc);
- default:
- return new ScalarColumnReader<int64_t, parquet::Type::INT64,
true>(parent,
- node, slot_desc);
- }
- case TYPE_FLOAT:
- return new ScalarColumnReader<float, parquet::Type::FLOAT,
true>(parent, node,
- slot_desc);
- case TYPE_DOUBLE:
- switch (node.element->type) {
- case parquet::Type::INT32:
- return new ScalarColumnReader<double , parquet::Type::INT32,
true>(parent,
- node, slot_desc);
- case parquet::Type::FLOAT:
- return new ScalarColumnReader<double, parquet::Type::FLOAT,
true>(parent,
- node, slot_desc);
- default:
- return new ScalarColumnReader<double, parquet::Type::DOUBLE,
true>(parent,
- node, slot_desc);
- }
- case TYPE_TIMESTAMP:
- return CreateTimestampColumnReader(node, slot_desc, parent);
- case TYPE_STRING:
- case TYPE_VARCHAR:
- case TYPE_CHAR:
- return new ScalarColumnReader<StringValue, parquet::Type::BYTE_ARRAY,
true>(
- parent, node, slot_desc);
- case TYPE_DECIMAL:
- return CreateDecimalColumnReader(node, slot_desc, parent);
- default:
- DCHECK(false) << slot_desc->type().DebugString();
- return nullptr;
- }
- } else {
- // Special case for counting scalar values (e.g. count(*), no materialized
columns in
- // the file, only materializing a position slot). We won't actually read
any values,
- // only the rep and def levels, so it doesn't matter what kind of reader
we make.
- return new ScalarColumnReader<int8_t, parquet::Type::INT32, false>(parent,
node,
- slot_desc);
- }
-}
-
-ParquetColumnReader* ParquetColumnReader::CreateTimestampColumnReader(
- const SchemaNode& node, const SlotDescriptor* slot_desc,
- HdfsParquetScanner* parent) {
- if (node.element->type == parquet::Type::INT96) {
- return new ScalarColumnReader<TimestampValue, parquet::Type::INT96, true>(
- parent, node, slot_desc);
- }
- else if (node.element->type == parquet::Type::INT64) {
- return new ScalarColumnReader<TimestampValue, parquet::Type::INT64, true>(
- parent, node, slot_desc);
- }
- DCHECK(false) << slot_desc->type().DebugString();
- return nullptr;
-}
-
-}