http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/hdfs-parquet-table-writer.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/hdfs-parquet-table-writer.h 
b/be/src/exec/parquet/hdfs-parquet-table-writer.h
new file mode 100644
index 0000000..dd0bd7f
--- /dev/null
+++ b/be/src/exec/parquet/hdfs-parquet-table-writer.h
@@ -0,0 +1,205 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_EXEC_HDFS_PARQUET_TABLE_WRITER_H
+#define IMPALA_EXEC_HDFS_PARQUET_TABLE_WRITER_H
+
+#include "exec/data-sink.h"
+
+#include <hdfs.h>
+#include <map>
+#include <boost/scoped_ptr.hpp>
+
+#include "exec/hdfs-table-writer.h"
+#include "exec/parquet/parquet-common.h"
+#include "runtime/descriptors.h"
+#include "util/compress.h"
+
+#include "gen-cpp/control_service.pb.h"
+
+namespace impala {
+
+class Expr;
+struct OutputPartition;
+class RuntimeState;
+class ThriftSerializer;
+class TupleRow;
+
+/// The writer consumes all rows passed to it and writes the evaluated 
output_exprs
+/// as a parquet file in hdfs.
+/// TODO: (parts of the format that are not implemented)
+/// - group var encoding
+/// - compression
+/// - multiple row groups per file
+/// TODO: we need a mechanism to pass the equivalent of serde params to this 
class
+/// from the FE.  This includes:
+/// - compression & codec
+/// - type of encoding to use for each type
+
+class HdfsParquetTableWriter : public HdfsTableWriter {
+ public:
+  HdfsParquetTableWriter(HdfsTableSink* parent, RuntimeState* state,
+      OutputPartition* output_partition, const HdfsPartitionDescriptor* 
part_desc,
+      const HdfsTableDescriptor* table_desc);
+
+  ~HdfsParquetTableWriter();
+
+  /// Initialize column information.
+  virtual Status Init() override;
+
+  /// Initializes a new file.  This resets the file metadata object and writes
+  /// the file header to the output file.
+  virtual Status InitNewFile() override;
+
+  /// Appends parquet representation of rows in the batch to the current file.
+  virtual Status AppendRows(RowBatch* batch,
+      const std::vector<int32_t>& row_group_indices, bool* new_file) override;
+
+  /// Write out all the data.
+  virtual Status Finalize() override;
+
+  virtual void Close() override;
+
+  /// Returns the target HDFS block size to use.
+  virtual uint64_t default_block_size() const override;
+
+  virtual std::string file_extension() const override { return "parq"; }
+
+ private:
+  /// Default data page size. In bytes.
+  static const int DEFAULT_DATA_PAGE_SIZE = 64 * 1024;
+
+  /// Max data page size. In bytes.
+  /// TODO: May need to be increased after addressing IMPALA-1619.
+  static const int64_t MAX_DATA_PAGE_SIZE = 1024 * 1024 * 1024;
+
+  /// Default hdfs block size. In bytes.
+  static const int HDFS_BLOCK_SIZE = 256 * 1024 * 1024;
+
+  /// Align block sizes to this constant. In bytes.
+  static const int HDFS_BLOCK_ALIGNMENT = 1024 * 1024;
+
+  /// Default row group size.  In bytes.
+  static const int ROW_GROUP_SIZE = HDFS_BLOCK_SIZE;
+
+  /// Minimum file size.  If the configured size is less, fail.
+  static const int HDFS_MIN_FILE_SIZE = 8 * 1024 * 1024;
+
+  /// Maximum statistics size. If the size of a single thrift 
parquet::Statistics struct
+  /// for a page or row group exceed this value, we'll not write it. We use 
the same value
+  /// as 'parquet-mr'.
+  static const int MAX_COLUMN_STATS_SIZE = 4 * 1024;
+
+  /// In parquet::ColumnIndex we store the min and max values for each page.
+  /// However, we don't want to store very long strings, so we truncate them.
+  /// The value of it must not be too small, since we don't want to truncate
+  /// non-string values.
+  static const int PAGE_INDEX_MAX_STRING_LENGTH = 64;
+
+  /// Per-column information state.  This contains some metadata as well as the
+  /// data buffers.
+  class BaseColumnWriter;
+  friend class BaseColumnWriter;
+
+  template<typename T> class ColumnWriter;
+  template<typename T> friend class ColumnWriter;
+  class BoolColumnWriter;
+  friend class BoolColumnWriter;
+
+  /// Minimum allowable block size in bytes. This is a function of the number 
of columns
+  /// in the target file.
+  int64_t MinBlockSize(int64_t num_file_cols) const;
+
+  /// Fills in the schema portion of the file metadata, converting the schema 
in
+  /// table_desc_ into the format in the file metadata
+  Status CreateSchema();
+
+  /// Writes the file header information to the output file.
+  Status WriteFileHeader();
+
+  /// Writes the column index and offset index of each page in the file.
+  /// It also resets the column writers.
+  Status WritePageIndex();
+
+  /// Writes the file metadata and footer.
+  Status WriteFileFooter();
+
+  /// Flushes the current row group to file.  This will compute the final
+  /// offsets of column chunks, updating the file metadata.
+  Status FlushCurrentRowGroup();
+
+  /// Adds a row group to the metadata and updates current_row_group_ to the
+  /// new row group.  current_row_group_ will be flushed.
+  Status AddRowGroup();
+
+  /// Thrift serializer utility object.  Reusing this object allows for
+  /// fewer memory allocations.
+  boost::scoped_ptr<ThriftSerializer> thrift_serializer_;
+
+  /// File metdata thrift description.
+  parquet::FileMetaData file_metadata_;
+
+  /// The current row group being written to.
+  parquet::RowGroup* current_row_group_;
+
+  /// Array of pointers to column information. The column writers are owned by 
the
+  /// table writer, as there is no reason for the column writers to outlive 
the table
+  /// writer.
+  std::vector<std::unique_ptr<BaseColumnWriter>> columns_;
+
+  /// Number of rows in current file
+  int64_t row_count_;
+
+  /// Current estimate of the total size of the file.  The file size estimate 
includes
+  /// the running size of the (uncompressed) dictionary, the size of all 
finalized
+  /// (compressed) data pages and their page headers.
+  /// If this size exceeds file_size_limit_, the current data is written and a 
new file
+  /// is started.
+  int64_t file_size_estimate_;
+
+  /// Limit on the total size of the file.
+  int64_t file_size_limit_;
+
+  /// The file location in the current output file.  This is the number of 
bytes
+  /// that have been written to the file so far.  The metadata uses file 
offsets
+  /// in a few places.
+  int64_t file_pos_;
+
+  /// Memory for column/block buffers that are reused for the duration of the
+  /// writer (i.e. reused across files).
+  boost::scoped_ptr<MemPool> reusable_col_mem_pool_;
+
+  /// Memory for column/block buffers that is allocated per file.  We need to
+  /// reset this pool after flushing a file.
+  boost::scoped_ptr<MemPool> per_file_mem_pool_;
+
+  /// Current position in the batch being written.  This must be persistent 
across
+  /// calls since the writer may stop in the middle of a row batch and ask for 
a new
+  /// file.
+  int row_idx_;
+
+  /// Staging buffer to use to compress data.  This is used only if 
compression is
+  /// enabled and is reused between all data pages.
+  std::vector<uint8_t> compression_staging_buffer_;
+
+  /// For each column, the on disk size written.
+  ParquetDmlStatsPB parquet_dml_stats_;
+};
+
+}
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-bool-decoder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-bool-decoder.cc 
b/be/src/exec/parquet/parquet-bool-decoder.cc
new file mode 100644
index 0000000..fdcd1d5
--- /dev/null
+++ b/be/src/exec/parquet/parquet-bool-decoder.cc
@@ -0,0 +1,68 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/parquet/parquet-bool-decoder.h"
+
+#include "util/mem-util.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+bool ParquetBoolDecoder::SetData(
+    parquet::Encoding::type encoding, uint8_t* data, int size) {
+  encoding_ = encoding;
+  // Only the relevant decoder is initialized for a given data page.
+  switch (encoding) {
+    case parquet::Encoding::PLAIN:
+      bool_values_.Reset(data, size);
+      break;
+    case parquet::Encoding::RLE:
+      // The first 4 bytes contain the size of the encoded data. This 
information is
+      // redundant, as this is the last part of the data page, and the number 
of
+      // remaining bytes is already known.
+      rle_decoder_.Reset(data + 4, size - 4, 1);
+      break;
+    default:
+      return false;
+  }
+  num_unpacked_values_ = 0;
+  unpacked_value_idx_ = 0;
+  return true;
+}
+
+bool ParquetBoolDecoder::DecodeValues(
+    int64_t stride, int64_t count, bool* RESTRICT first_value) RESTRICT {
+  if (encoding_ == parquet::Encoding::PLAIN) {
+    return DecodeValues<parquet::Encoding::PLAIN>(stride, count, first_value);
+  } else {
+    DCHECK_EQ(encoding_, parquet::Encoding::RLE);
+    return DecodeValues<parquet::Encoding::RLE>(stride, count, first_value);
+  }
+}
+
+template <parquet::Encoding::type ENCODING>
+bool ParquetBoolDecoder::DecodeValues(
+    int64_t stride, int64_t count, bool* RESTRICT first_value) RESTRICT {
+  // TODO: we could optimise this further if needed by bypassing 
'unpacked_values_'.
+  StrideWriter<bool> out(first_value, stride);
+  for (int64_t i = 0; i < count; ++i) {
+    if (UNLIKELY(!DecodeValue<ENCODING>(out.Advance()))) return false;
+  }
+  return true;
+}
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-bool-decoder.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-bool-decoder.h 
b/be/src/exec/parquet/parquet-bool-decoder.h
new file mode 100644
index 0000000..6ecdbd2
--- /dev/null
+++ b/be/src/exec/parquet/parquet-bool-decoder.h
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/compiler-util.h"
+#include "exec/parquet/parquet-common.h"
+#include "util/mem-util.h"
+#include "util/rle-encoding.h"
+
+namespace impala {
+
+/// Decoder for RLE and bit-packed boolean-encoded values.
+class ParquetBoolDecoder {
+ public:
+  /// Set the data for the next page to decode. Return true if the encoding is 
supported,
+  /// false otherwise.
+  bool SetData(parquet::Encoding::type encoding, uint8_t* data, int size);
+
+  /// Decode the next bool value to 'value'. Return true on success or false 
if there is
+  /// an error decoding, e.g. invalid or truncated data.
+  /// Templated so that callers can avoid the overhead of branching on 
encoding per row.
+  template <parquet::Encoding::type ENCODING>
+  bool ALWAYS_INLINE DecodeValue(bool* RESTRICT value) RESTRICT;
+
+  /// Batched version of DecodeValue() that decodes multiple values at a time.
+  bool DecodeValues(int64_t stride, int64_t count, bool* RESTRICT first_value) 
RESTRICT;
+
+ private:
+  /// Implementation of DecodeValues, templated by ENCODING.
+  template <parquet::Encoding::type ENCODING>
+  bool DecodeValues(int64_t stride, int64_t count, bool* RESTRICT first_value) 
RESTRICT;
+
+  parquet::Encoding::type encoding_;
+
+  /// A buffer to store unpacked values. Must be a multiple of 32 size to use 
the
+  /// batch-oriented interface of BatchedBitReader.
+  static const int UNPACKED_BUFFER_LEN = 128;
+  bool unpacked_values_[UNPACKED_BUFFER_LEN];
+
+  /// The number of valid values in 'unpacked_values_'.
+  int num_unpacked_values_ = 0;
+
+  /// The next value to return from 'unpacked_values_'.
+  int unpacked_value_idx_ = 0;
+
+  /// Bit packed decoder, used if 'encoding_' is PLAIN.
+  BatchedBitReader bool_values_;
+
+  /// RLE decoder, used if 'encoding_' is RLE.
+  RleBatchDecoder<bool> rle_decoder_;
+};
+
+template <parquet::Encoding::type ENCODING>
+inline bool ParquetBoolDecoder::DecodeValue(bool* RESTRICT value) RESTRICT {
+  DCHECK_EQ(ENCODING, encoding_);
+  if (LIKELY(unpacked_value_idx_ < num_unpacked_values_)) {
+    *value = unpacked_values_[unpacked_value_idx_++];
+  } else {
+    // Unpack as many values as we can into the buffer. We expect to read at 
least one
+    // value.
+    if (ENCODING == parquet::Encoding::PLAIN) {
+      num_unpacked_values_ =
+          bool_values_.UnpackBatch(1, UNPACKED_BUFFER_LEN, 
&unpacked_values_[0]);
+    } else {
+      num_unpacked_values_ =
+          rle_decoder_.GetValues(UNPACKED_BUFFER_LEN, &unpacked_values_[0]);
+    }
+
+    if (UNLIKELY(num_unpacked_values_ == 0)) {
+      return false;
+    }
+    *value = unpacked_values_[0];
+    unpacked_value_idx_ = 1;
+  }
+  return true;
+}
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-collection-column-reader.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-collection-column-reader.cc 
b/be/src/exec/parquet/parquet-collection-column-reader.cc
new file mode 100644
index 0000000..8c28ab7
--- /dev/null
+++ b/be/src/exec/parquet/parquet-collection-column-reader.cc
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet-collection-column-reader.h"
+
+#include "runtime/collection-value-builder.h"
+
+namespace impala {
+
+void CollectionColumnReader::Close(RowBatch* row_batch) {
+  for (ParquetColumnReader* child_reader : children_) {
+    child_reader->Close(row_batch);
+  }
+}
+
+bool CollectionColumnReader::NextLevels() {
+  DCHECK(!children_.empty());
+  DCHECK_LE(rep_level_, new_collection_rep_level());
+  for (int c = 0; c < children_.size(); ++c) {
+    do {
+      // TODO: verify somewhere that all column readers are at end
+      if (!children_[c]->NextLevels()) return false;
+    } while (children_[c]->rep_level() > new_collection_rep_level());
+  }
+  UpdateDerivedState();
+  return true;
+}
+
+bool CollectionColumnReader::ReadValue(MemPool* pool, Tuple* tuple) {
+  DCHECK_GE(rep_level_, 0);
+  DCHECK_GE(def_level_, 0);
+  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor())
+      << "Caller should have called NextLevels() until we are ready to read a 
value";
+
+  if (tuple_offset_ == -1) {
+    return CollectionColumnReader::NextLevels();
+  } else if (def_level_ >= max_def_level()) {
+    return ReadSlot(tuple->GetCollectionSlot(tuple_offset_), pool);
+  } else {
+    // Null value
+    tuple->SetNull(null_indicator_offset_);
+    return CollectionColumnReader::NextLevels();
+  }
+}
+
+bool CollectionColumnReader::ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) 
{
+  return CollectionColumnReader::ReadValue(pool, tuple);
+}
+
+bool CollectionColumnReader::ReadValueBatch(MemPool* pool, int max_values,
+    int tuple_size, uint8_t* tuple_mem, int* num_values) {
+  // The below loop requires that NextLevels() was called previously to 
populate
+  // 'def_level_' and 'rep_level_'. Ensure it is called at the start of each
+  // row group.
+  if (def_level_ == ParquetLevel::INVALID_LEVEL && !NextLevels()) return false;
+
+  int val_count = 0;
+  bool continue_execution = true;
+  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
+    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * 
tuple_size);
+    if (def_level_ < def_level_of_immediate_repeated_ancestor()) {
+      // A containing repeated field is empty or NULL
+      continue_execution = NextLevels();
+      continue;
+    }
+    // Fill in position slot if applicable
+    if (pos_slot_desc_ != nullptr) {
+      
ReadPositionNonBatched(tuple->GetBigIntSlot(pos_slot_desc()->tuple_offset()));
+    }
+    continue_execution = ReadValue(pool, tuple);
+    ++val_count;
+    if (SHOULD_TRIGGER_COL_READER_DEBUG_ACTION(val_count)) {
+      continue_execution &= ColReaderDebugAction(&val_count);
+    }
+  }
+  *num_values = val_count;
+  return continue_execution;
+}
+
+bool CollectionColumnReader::ReadNonRepeatedValueBatch(MemPool* pool,
+    int max_values, int tuple_size, uint8_t* tuple_mem, int* num_values) {
+  // The below loop requires that NextLevels() was called previously to 
populate
+  // 'def_level_' and 'rep_level_'. Ensure it is called at the start of each
+  // row group.
+  if (def_level_ == ParquetLevel::INVALID_LEVEL && !NextLevels()) return false;
+
+  int val_count = 0;
+  bool continue_execution = true;
+  while (val_count < max_values && !RowGroupAtEnd() && continue_execution) {
+    Tuple* tuple = reinterpret_cast<Tuple*>(tuple_mem + val_count * 
tuple_size);
+    continue_execution = ReadNonRepeatedValue(pool, tuple);
+    ++val_count;
+    if (SHOULD_TRIGGER_COL_READER_DEBUG_ACTION(val_count)) {
+      continue_execution &= ColReaderDebugAction(&val_count);
+    }
+  }
+  *num_values = val_count;
+  return continue_execution;
+}
+
+bool CollectionColumnReader::ReadSlot(CollectionValue* slot, MemPool* pool) {
+  DCHECK(!children_.empty());
+  DCHECK_LE(rep_level_, new_collection_rep_level());
+
+  // Recursively read the collection into a new CollectionValue.
+  *slot = CollectionValue();
+  CollectionValueBuilder builder(
+      slot, *slot_desc_->collection_item_descriptor(), pool, parent_->state_);
+  bool continue_execution =
+      parent_->AssembleCollection(children_, new_collection_rep_level(), 
&builder);
+  if (!continue_execution) return false;
+
+  // AssembleCollection() advances child readers, so we don't need to call 
NextLevels()
+  UpdateDerivedState();
+  return true;
+}
+
+void CollectionColumnReader::UpdateDerivedState() {
+  // We don't need to cap our def_level_ at max_def_level(). We always check 
def_level_
+  // >= max_def_level() to check if the collection is defined.
+  // TODO: consider capping def_level_ at max_def_level()
+  def_level_ = children_[0]->def_level();
+  rep_level_ = children_[0]->rep_level();
+
+  // All children should have been advanced to the beginning of the next 
collection
+  for (int i = 0; i < children_.size(); ++i) {
+    DCHECK_EQ(children_[i]->rep_level(), rep_level_);
+    if (def_level_ < max_def_level()) {
+      // Collection not defined
+      FILE_CHECK_EQ(children_[i]->def_level(), def_level_);
+    } else {
+      // Collection is defined
+      FILE_CHECK_GE(children_[i]->def_level(), max_def_level());
+    }
+  }
+
+  if (RowGroupAtEnd()) {
+    // No more values
+    pos_current_value_ = ParquetLevel::INVALID_POS;
+  } else if (rep_level_ <= max_rep_level() - 2) {
+    // Reset position counter if we are at the start of a new parent 
collection (i.e.,
+    // the current collection is the first item in a new parent collection).
+    pos_current_value_ = 0;
+  }
+}
+} // namespace impala

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-collection-column-reader.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-collection-column-reader.h 
b/be/src/exec/parquet/parquet-collection-column-reader.h
new file mode 100644
index 0000000..ae23979
--- /dev/null
+++ b/be/src/exec/parquet/parquet-collection-column-reader.h
@@ -0,0 +1,97 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vector>
+
+#include "exec/parquet/parquet-column-readers.h"
+
+namespace impala {
+
+/// Collections are not materialized directly in parquet files; only scalar 
values appear
+/// in the file. CollectionColumnReader uses the definition and repetition 
levels of child
+/// column readers to figure out the boundaries of each collection in this 
column.
+class CollectionColumnReader : public ParquetColumnReader {
+ public:
+  CollectionColumnReader(
+      HdfsParquetScanner* parent, const SchemaNode& node, const 
SlotDescriptor* slot_desc)
+    : ParquetColumnReader(parent, node, slot_desc) {
+    DCHECK(node_.is_repeated());
+    if (slot_desc != nullptr) DCHECK(slot_desc->type().IsCollectionType());
+  }
+
+  virtual ~CollectionColumnReader() {}
+
+  vector<ParquetColumnReader*>* children() { return &children_; }
+
+  virtual bool IsCollectionReader() const override { return true; }
+
+  /// The repetition level indicating that the current value is the first in a 
new
+  /// collection (meaning the last value read was the final item in the 
previous
+  /// collection).
+  int new_collection_rep_level() const { return max_rep_level() - 1; }
+
+  /// Materializes CollectionValue into tuple slot (if materializing) and 
advances to next
+  /// value.
+  virtual bool ReadValue(MemPool* pool, Tuple* tuple) override;
+
+  /// Same as ReadValue but does not advance repetition level. Only valid for 
columns not
+  /// in collections.
+  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) override;
+
+  /// Implementation of ReadValueBatch for collections.
+  virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
+      uint8_t* tuple_mem, int* num_values) override;
+
+  /// Implementation of ReadNonRepeatedValueBatch() for collections.
+  virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int 
tuple_size,
+      uint8_t* tuple_mem, int* num_values) override;
+
+  /// Advances all child readers to the beginning of the next collection and 
updates this
+  /// reader's state.
+  virtual bool NextLevels() override;
+
+  /// This is called once for each row group in the file.
+  void Reset() {
+    def_level_ = ParquetLevel::INVALID_LEVEL;
+    rep_level_ = ParquetLevel::INVALID_LEVEL;
+    pos_current_value_ = ParquetLevel::INVALID_POS;
+  }
+
+  virtual void Close(RowBatch* row_batch) override;
+
+ private:
+  /// Column readers of fields contained within this collection. There is at 
least one
+  /// child reader per collection reader. Child readers either materialize 
slots in the
+  /// collection item tuples, or there is a single child reader that does not 
materialize
+  /// any slot and is only used by this reader to read def and rep levels.
+  vector<ParquetColumnReader*> children_;
+
+  /// Updates this reader's def_level_, rep_level_, and pos_current_value_ 
based on child
+  /// reader's state.
+  void UpdateDerivedState();
+
+  /// Recursively reads from children_ to assemble a single CollectionValue 
into
+  /// 'slot'. Also advances rep_level_ and def_level_ via NextLevels().
+  ///
+  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
+  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
+  /// true.
+  inline bool ReadSlot(CollectionValue* slot, MemPool* pool);
+};
+} // namespace impala

Reply via email to