http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/hdfs-parquet-table-writer.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.h b/be/src/exec/parquet/hdfs-parquet-table-writer.h new file mode 100644 index 0000000..dd0bd7f --- /dev/null +++ b/be/src/exec/parquet/hdfs-parquet-table-writer.h @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +#ifndef IMPALA_EXEC_HDFS_PARQUET_TABLE_WRITER_H +#define IMPALA_EXEC_HDFS_PARQUET_TABLE_WRITER_H + +#include "exec/data-sink.h" + +#include <hdfs.h> +#include <map> +#include <boost/scoped_ptr.hpp> + +#include "exec/hdfs-table-writer.h" +#include "exec/parquet/parquet-common.h" +#include "runtime/descriptors.h" +#include "util/compress.h" + +#include "gen-cpp/control_service.pb.h" + +namespace impala { + +class Expr; +struct OutputPartition; +class RuntimeState; +class ThriftSerializer; +class TupleRow; + +/// The writer consumes all rows passed to it and writes the evaluated output_exprs +/// as a parquet file in hdfs. +/// TODO: (parts of the format that are not implemented) +/// - group var encoding +/// - compression +/// - multiple row groups per file +/// TODO: we need a mechanism to pass the equivalent of serde params to this class +/// from the FE. This includes: +/// - compression & codec +/// - type of encoding to use for each type + +class HdfsParquetTableWriter : public HdfsTableWriter { + public: + HdfsParquetTableWriter(HdfsTableSink* parent, RuntimeState* state, + OutputPartition* output_partition, const HdfsPartitionDescriptor* part_desc, + const HdfsTableDescriptor* table_desc); + + ~HdfsParquetTableWriter(); + + /// Initialize column information. + virtual Status Init() override; + + /// Initializes a new file. This resets the file metadata object and writes + /// the file header to the output file. + virtual Status InitNewFile() override; + + /// Appends parquet representation of rows in the batch to the current file. + virtual Status AppendRows(RowBatch* batch, + const std::vector<int32_t>& row_group_indices, bool* new_file) override; + + /// Write out all the data. + virtual Status Finalize() override; + + virtual void Close() override; + + /// Returns the target HDFS block size to use. + virtual uint64_t default_block_size() const override; + + virtual std::string file_extension() const override { return "parq"; } + + private: + /// Default data page size. In bytes. + static const int DEFAULT_DATA_PAGE_SIZE = 64 * 1024; + + /// Max data page size. In bytes. + /// TODO: May need to be increased after addressing IMPALA-1619. + static const int64_t MAX_DATA_PAGE_SIZE = 1024 * 1024 * 1024; + + /// Default hdfs block size. In bytes. + static const int HDFS_BLOCK_SIZE = 256 * 1024 * 1024; + + /// Align block sizes to this constant. In bytes. + static const int HDFS_BLOCK_ALIGNMENT = 1024 * 1024; + + /// Default row group size. In bytes. + static const int ROW_GROUP_SIZE = HDFS_BLOCK_SIZE; + + /// Minimum file size. If the configured size is less, fail. + static const int HDFS_MIN_FILE_SIZE = 8 * 1024 * 1024; + + /// Maximum statistics size. If the size of a single thrift parquet::Statistics struct + /// for a page or row group exceed this value, we'll not write it. We use the same value + /// as 'parquet-mr'. + static const int MAX_COLUMN_STATS_SIZE = 4 * 1024; + + /// In parquet::ColumnIndex we store the min and max values for each page. + /// However, we don't want to store very long strings, so we truncate them. + /// The value of it must not be too small, since we don't want to truncate + /// non-string values. + static const int PAGE_INDEX_MAX_STRING_LENGTH = 64; + + /// Per-column information state. This contains some metadata as well as the + /// data buffers. + class BaseColumnWriter; + friend class BaseColumnWriter; + + template<typename T> class ColumnWriter; + template<typename T> friend class ColumnWriter; + class BoolColumnWriter; + friend class BoolColumnWriter; + + /// Minimum allowable block size in bytes. This is a function of the number of columns + /// in the target file. + int64_t MinBlockSize(int64_t num_file_cols) const; + + /// Fills in the schema portion of the file metadata, converting the schema in + /// table_desc_ into the format in the file metadata + Status CreateSchema(); + + /// Writes the file header information to the output file. + Status WriteFileHeader(); + + /// Writes the column index and offset index of each page in the file. + /// It also resets the column writers. + Status WritePageIndex(); + + /// Writes the file metadata and footer. + Status WriteFileFooter(); + + /// Flushes the current row group to file. This will compute the final + /// offsets of column chunks, updating the file metadata. + Status FlushCurrentRowGroup(); + + /// Adds a row group to the metadata and updates current_row_group_ to the + /// new row group. current_row_group_ will be flushed. + Status AddRowGroup(); + + /// Thrift serializer utility object. Reusing this object allows for + /// fewer memory allocations. + boost::scoped_ptr<ThriftSerializer> thrift_serializer_; + + /// File metdata thrift description. + parquet::FileMetaData file_metadata_; + + /// The current row group being written to. + parquet::RowGroup* current_row_group_; + + /// Array of pointers to column information. The column writers are owned by the + /// table writer, as there is no reason for the column writers to outlive the table + /// writer. + std::vector<std::unique_ptr<BaseColumnWriter>> columns_; + + /// Number of rows in current file + int64_t row_count_; + + /// Current estimate of the total size of the file. The file size estimate includes + /// the running size of the (uncompressed) dictionary, the size of all finalized + /// (compressed) data pages and their page headers. + /// If this size exceeds file_size_limit_, the current data is written and a new file + /// is started. + int64_t file_size_estimate_; + + /// Limit on the total size of the file. + int64_t file_size_limit_; + + /// The file location in the current output file. This is the number of bytes + /// that have been written to the file so far. The metadata uses file offsets + /// in a few places. + int64_t file_pos_; + + /// Memory for column/block buffers that are reused for the duration of the + /// writer (i.e. reused across files). + boost::scoped_ptr<MemPool> reusable_col_mem_pool_; + + /// Memory for column/block buffers that is allocated per file. We need to + /// reset this pool after flushing a file. + boost::scoped_ptr<MemPool> per_file_mem_pool_; + + /// Current position in the batch being written. This must be persistent across + /// calls since the writer may stop in the middle of a row batch and ask for a new + /// file. + int row_idx_; + + /// Staging buffer to use to compress data. This is used only if compression is + /// enabled and is reused between all data pages. + std::vector<uint8_t> compression_staging_buffer_; + + /// For each column, the on disk size written. + ParquetDmlStatsPB parquet_dml_stats_; +}; + +} +#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-bool-decoder.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/parquet-bool-decoder.cc b/be/src/exec/parquet/parquet-bool-decoder.cc new file mode 100644 index 0000000..fdcd1d5 --- /dev/null +++ b/be/src/exec/parquet/parquet-bool-decoder.cc @@ -0,0 +1,68 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/parquet/parquet-bool-decoder.h" + +#include "util/mem-util.h" + +#include "common/names.h" + +namespace impala { + +bool ParquetBoolDecoder::SetData( + parquet::Encoding::type encoding, uint8_t* data, int size) { + encoding_ = encoding; + // Only the relevant decoder is initialized for a given data page. + switch (encoding) { + case parquet::Encoding::PLAIN: + bool_values_.Reset(data, size); + break; + case parquet::Encoding::RLE: + // The first 4 bytes contain the size of the encoded data. This information is + // redundant, as this is the last part of the data page, and the number of + // remaining bytes is already known. + rle_decoder_.Reset(data + 4, size - 4, 1); + break; + default: + return false; + } + num_unpacked_values_ = 0; + unpacked_value_idx_ = 0; + return true; +} + +bool ParquetBoolDecoder::DecodeValues( + int64_t stride, int64_t count, bool* RESTRICT first_value) RESTRICT { + if (encoding_ == parquet::Encoding::PLAIN) { + return DecodeValues<parquet::Encoding::PLAIN>(stride, count, first_value); + } else { + DCHECK_EQ(encoding_, parquet::Encoding::RLE); + return DecodeValues<parquet::Encoding::RLE>(stride, count, first_value); + } +} + +template <parquet::Encoding::type ENCODING> +bool ParquetBoolDecoder::DecodeValues( + int64_t stride, int64_t count, bool* RESTRICT first_value) RESTRICT { + // TODO: we could optimise this further if needed by bypassing 'unpacked_values_'. + StrideWriter<bool> out(first_value, stride); + for (int64_t i = 0; i < count; ++i) { + if (UNLIKELY(!DecodeValue<ENCODING>(out.Advance()))) return false; + } + return true; +} +} // namespace impala http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-bool-decoder.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/parquet-bool-decoder.h b/be/src/exec/parquet/parquet-bool-decoder.h new file mode 100644 index 0000000..6ecdbd2 --- /dev/null +++ b/be/src/exec/parquet/parquet-bool-decoder.h @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "common/compiler-util.h" +#include "exec/parquet/parquet-common.h" +#include "util/mem-util.h" +#include "util/rle-encoding.h" + +namespace impala { + +/// Decoder for RLE and bit-packed boolean-encoded values. +class ParquetBoolDecoder { + public: + /// Set the data for the next page to decode. Return true if the encoding is supported, + /// false otherwise. + bool SetData(parquet::Encoding::type encoding, uint8_t* data, int size); + + /// Decode the next bool value to 'value'. Return true on success or false if there is + /// an error decoding, e.g. invalid or truncated data. + /// Templated so that callers can avoid the overhead of branching on encoding per row. + template <parquet::Encoding::type ENCODING> + bool ALWAYS_INLINE DecodeValue(bool* RESTRICT value) RESTRICT; + + /// Batched version of DecodeValue() that decodes multiple values at a time. + bool DecodeValues(int64_t stride, int64_t count, bool* RESTRICT first_value) RESTRICT; + + private: + /// Implementation of DecodeValues, templated by ENCODING. + template <parquet::Encoding::type ENCODING> + bool DecodeValues(int64_t stride, int64_t count, bool* RESTRICT first_value) RESTRICT; + + parquet::Encoding::type encoding_; + + /// A buffer to store unpacked values. Must be a multiple of 32 size to use the + /// batch-oriented interface of BatchedBitReader. + static const int UNPACKED_BUFFER_LEN = 128; + bool unpacked_values_[UNPACKED_BUFFER_LEN]; + + /// The number of valid values in 'unpacked_values_'. + int num_unpacked_values_ = 0; + + /// The next value to return from 'unpacked_values_'. + int unpacked_value_idx_ = 0; + + /// Bit packed decoder, used if 'encoding_' is PLAIN. + BatchedBitReader bool_values_; + + /// RLE decoder, used if 'encoding_' is RLE. + RleBatchDecoder<bool> rle_decoder_; +}; + +template <parquet::Encoding::type ENCODING> +inline bool ParquetBoolDecoder::DecodeValue(bool* RESTRICT value) RESTRICT { + DCHECK_EQ(ENCODING, encoding_); + if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) { + *value = unpacked_values_[unpacked_value_idx_++]; + } else { + // Unpack as many values as we can into the buffer. We expect to read at least one + // value. + if (ENCODING == parquet::Encoding::PLAIN) { + num_unpacked_values_ = + bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, &unpacked_values_[0]); + } else { + num_unpacked_values_ = + rle_decoder_.GetValues(UNPACKED_BUFFER_LEN, &unpacked_values_[0]); + } + + if (UNLIKELY(num_unpacked_values_ == 0)) { + return false; + } + *value = unpacked_values_[0]; + unpacked_value_idx_ = 1; + } + return true; +} +} // namespace impala http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-collection-column-reader.cc ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/parquet-collection-column-reader.cc b/be/src/exec/parquet/parquet-collection-column-reader.cc new file mode 100644 index 0000000..8c28ab7 --- /dev/null +++ b/be/src/exec/parquet/parquet-collection-column-reader.cc @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet-collection-column-reader.h" + +#include "runtime/collection-value-builder.h" + +namespace impala { + +void CollectionColumnReader::Close(RowBatch* row_batch) { + for (ParquetColumnReader* child_reader : children_) { + child_reader->Close(row_batch); + } +} + +bool CollectionColumnReader::NextLevels() { + DCHECK(!children_.empty()); + DCHECK_LE(rep_level_, new_collection_rep_level()); + for (int c = 0; c < children_.size(); ++c) { + do { + // TODO: verify somewhere that all column readers are at end + if (!children_[c]->NextLevels()) return false; + } while (children_[c]->rep_level() > new_collection_rep_level()); + } + UpdateDerivedState(); + return true; +} + +bool CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) { + DCHECK_GE(rep_level_, 0); + DCHECK_GE(def_level_, 0); + DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) + << "Caller should have called NextLevels() until we are ready to read a value"; + + if (tuple_offset_ == -1) { + return CollectionColumnReader::NextLevels(); + } else if (def_level_ >= max_def_level()) { + return ReadSlot(tuple->GetCollectionSlot(tuple_offset_), pool); + } else { + // Null value + tuple->SetNull(null_indicator_offset_); + return CollectionColumnReader::NextLevels(); + } +} + +bool CollectionColumnReader::ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) { + return CollectionColumnReader::ReadValue(pool, tuple); +} + +bool CollectionColumnReader::ReadValueBatch(MemPool* pool, int max_values, + int tuple_size, uint8_t* tuple_mem, int* num_values) { + // The below loop requires that NextLevels() was called previously to populate + // 'def_level_' and 'rep_level_'. Ensure it is called at the start of each + // row group. + if (def_level_ == ParquetLevel::INVALID_LEVEL && !NextLevels()) return false; + + int val_count = 0; + bool continue_execution = true; + while (val_count < max_values && !RowGroupAtEnd() && continue_execution) { + Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size); + if (def_level_ < def_level_of_immediate_repeated_ancestor()) { + // A containing repeated field is empty or NULL + continue_execution = NextLevels(); + continue; + } + // Fill in position slot if applicable + if (pos_slot_desc_ != nullptr) { + ReadPositionNonBatched(tuple->GetBigIntSlot(pos_slot_desc()->tuple_offset())); + } + continue_execution = ReadValue(pool, tuple); + ++val_count; + if (SHOULD_TRIGGER_COL_READER_DEBUG_ACTION(val_count)) { + continue_execution &= ColReaderDebugAction(&val_count); + } + } + *num_values = val_count; + return continue_execution; +} + +bool CollectionColumnReader::ReadNonRepeatedValueBatch(MemPool* pool, + int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) { + // The below loop requires that NextLevels() was called previously to populate + // 'def_level_' and 'rep_level_'. Ensure it is called at the start of each + // row group. + if (def_level_ == ParquetLevel::INVALID_LEVEL && !NextLevels()) return false; + + int val_count = 0; + bool continue_execution = true; + while (val_count < max_values && !RowGroupAtEnd() && continue_execution) { + Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * tuple_size); + continue_execution = ReadNonRepeatedValue(pool, tuple); + ++val_count; + if (SHOULD_TRIGGER_COL_READER_DEBUG_ACTION(val_count)) { + continue_execution &= ColReaderDebugAction(&val_count); + } + } + *num_values = val_count; + return continue_execution; +} + +bool CollectionColumnReader::ReadSlot(CollectionValue* slot, MemPool* pool) { + DCHECK(!children_.empty()); + DCHECK_LE(rep_level_, new_collection_rep_level()); + + // Recursively read the collection into a new CollectionValue. + *slot = CollectionValue(); + CollectionValueBuilder builder( + slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_); + bool continue_execution = + parent_->AssembleCollection(children_, new_collection_rep_level(), &builder); + if (!continue_execution) return false; + + // AssembleCollection() advances child readers, so we don't need to call NextLevels() + UpdateDerivedState(); + return true; +} + +void CollectionColumnReader::UpdateDerivedState() { + // We don't need to cap our def_level_ at max_def_level(). We always check def_level_ + // >= max_def_level() to check if the collection is defined. + // TODO: consider capping def_level_ at max_def_level() + def_level_ = children_[0]->def_level(); + rep_level_ = children_[0]->rep_level(); + + // All children should have been advanced to the beginning of the next collection + for (int i = 0; i < children_.size(); ++i) { + DCHECK_EQ(children_[i]->rep_level(), rep_level_); + if (def_level_ < max_def_level()) { + // Collection not defined + FILE_CHECK_EQ(children_[i]->def_level(), def_level_); + } else { + // Collection is defined + FILE_CHECK_GE(children_[i]->def_level(), max_def_level()); + } + } + + if (RowGroupAtEnd()) { + // No more values + pos_current_value_ = ParquetLevel::INVALID_POS; + } else if (rep_level_ <= max_rep_level() - 2) { + // Reset position counter if we are at the start of a new parent collection (i.e., + // the current collection is the first item in a new parent collection). + pos_current_value_ = 0; + } +} +} // namespace impala http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-collection-column-reader.h ---------------------------------------------------------------------- diff --git a/be/src/exec/parquet/parquet-collection-column-reader.h b/be/src/exec/parquet/parquet-collection-column-reader.h new file mode 100644 index 0000000..ae23979 --- /dev/null +++ b/be/src/exec/parquet/parquet-collection-column-reader.h @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <vector> + +#include "exec/parquet/parquet-column-readers.h" + +namespace impala { + +/// Collections are not materialized directly in parquet files; only scalar values appear +/// in the file. CollectionColumnReader uses the definition and repetition levels of child +/// column readers to figure out the boundaries of each collection in this column. +class CollectionColumnReader : public ParquetColumnReader { + public: + CollectionColumnReader( + HdfsParquetScanner* parent, const SchemaNode& node, const SlotDescriptor* slot_desc) + : ParquetColumnReader(parent, node, slot_desc) { + DCHECK(node_.is_repeated()); + if (slot_desc != nullptr) DCHECK(slot_desc->type().IsCollectionType()); + } + + virtual ~CollectionColumnReader() {} + + vector<ParquetColumnReader*>* children() { return &children_; } + + virtual bool IsCollectionReader() const override { return true; } + + /// The repetition level indicating that the current value is the first in a new + /// collection (meaning the last value read was the final item in the previous + /// collection). + int new_collection_rep_level() const { return max_rep_level() - 1; } + + /// Materializes CollectionValue into tuple slot (if materializing) and advances to next + /// value. + virtual bool ReadValue(MemPool* pool, Tuple* tuple) override; + + /// Same as ReadValue but does not advance repetition level. Only valid for columns not + /// in collections. + virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) override; + + /// Implementation of ReadValueBatch for collections. + virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size, + uint8_t* tuple_mem, int* num_values) override; + + /// Implementation of ReadNonRepeatedValueBatch() for collections. + virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int tuple_size, + uint8_t* tuple_mem, int* num_values) override; + + /// Advances all child readers to the beginning of the next collection and updates this + /// reader's state. + virtual bool NextLevels() override; + + /// This is called once for each row group in the file. + void Reset() { + def_level_ = ParquetLevel::INVALID_LEVEL; + rep_level_ = ParquetLevel::INVALID_LEVEL; + pos_current_value_ = ParquetLevel::INVALID_POS; + } + + virtual void Close(RowBatch* row_batch) override; + + private: + /// Column readers of fields contained within this collection. There is at least one + /// child reader per collection reader. Child readers either materialize slots in the + /// collection item tuples, or there is a single child reader that does not materialize + /// any slot and is only used by this reader to read def and rep levels. + vector<ParquetColumnReader*> children_; + + /// Updates this reader's def_level_, rep_level_, and pos_current_value_ based on child + /// reader's state. + void UpdateDerivedState(); + + /// Recursively reads from children_ to assemble a single CollectionValue into + /// 'slot'. Also advances rep_level_ and def_level_ via NextLevels(). + /// + /// Returns false if execution should be aborted for some reason, e.g. parse_error_ is + /// set, the query is cancelled, or the scan node limit was reached. Otherwise returns + /// true. + inline bool ReadSlot(CollectionValue* slot, MemPool* pool); +}; +} // namespace impala
