http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-column-readers.h 
b/be/src/exec/parquet/parquet-column-readers.h
new file mode 100644
index 0000000..cde4fe1
--- /dev/null
+++ b/be/src/exec/parquet/parquet-column-readers.h
@@ -0,0 +1,476 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_PARQUET_COLUMN_READERS_H
+#define IMPALA_PARQUET_COLUMN_READERS_H
+
+#include <boost/scoped_ptr.hpp>
+
+#include "exec/parquet/hdfs-parquet-scanner.h"
+#include "exec/parquet/parquet-level-decoder.h"
+#include "util/bit-stream-utils.h"
+#include "util/codec.h"
+
+namespace impala {
+
+class DictDecoderBase;
+class Tuple;
+class MemPool;
+
+/// Base class for reading a Parquet column. Reads a logical column, not 
necessarily a
+/// column materialized in the file (e.g. collections). The two subclasses are
+/// BaseScalarColumnReader and CollectionColumnReader. Column readers read one 
def and rep
+/// level pair at a time. The current def and rep level are exposed to the 
user, and the
+/// corresponding value (if defined) can optionally be copied into a slot via
+/// ReadValue(). Can also write position slots.
+///
+/// The constructor adds the object to the obj_pool of the parent 
HdfsParquetScanner.
+class ParquetColumnReader {
+ public:
+  /// Creates a column reader for 'node' and associates it with the given 
parent scanner.
+  /// The constructor of column readers add the new object to the parent's 
object pool.
+  /// 'slot_desc' may be NULL, in which case the returned column reader can 
only be used
+  /// to read def/rep levels.
+  /// 'is_collection_field' should be set to true if the returned reader is 
reading a
+  /// collection. This cannot be determined purely by 'node' because a 
repeated scalar
+  /// node represents both an array and the array's items (in this case
+  /// 'is_collection_field' should be true if the reader reads one value per 
array, and
+  /// false if it reads one value per item).  The reader is added to the 
runtime state's
+  /// object pool. Does not create child readers for collection readers; these 
must be
+  /// added by the caller.
+  ///
+  /// It supports the following primitive type widening that does not have any 
loss of
+  /// precision.
+  /// - tinyint (INT32) -> smallint (INT32), int (INT32), bigint (INT64), 
double (DOUBLE)
+  /// - smallint (INT32) -> int (INT32), bigint (INT64), double (DOUBLE)
+  /// - int (INT32) -> bigint (INT64), double (DOUBLE)
+  /// - float (FLOAT) -> double (DOUBLE)
+  static ParquetColumnReader* Create(const SchemaNode& node, bool 
is_collection_field,
+      const SlotDescriptor* slot_desc, HdfsParquetScanner* parent);
+
+  static ParquetColumnReader* CreateTimestampColumnReader(const SchemaNode& 
node,
+      const SlotDescriptor* slot_desc, HdfsParquetScanner* parent);
+
+  virtual ~ParquetColumnReader() { }
+
+  int def_level() const { return def_level_; }
+  int rep_level() const { return rep_level_; }
+
+  const SlotDescriptor* slot_desc() const { return slot_desc_; }
+  const parquet::SchemaElement& schema_element() const { return 
*node_.element; }
+  int16_t max_def_level() const { return max_def_level_; }
+  int16_t max_rep_level() const { return max_rep_level_; }
+  int def_level_of_immediate_repeated_ancestor() const {
+    return node_.def_level_of_immediate_repeated_ancestor;
+  }
+  const SlotDescriptor* pos_slot_desc() const { return pos_slot_desc_; }
+  void set_pos_slot_desc(const SlotDescriptor* pos_slot_desc) {
+    DCHECK(pos_slot_desc_ == NULL);
+    pos_slot_desc_ = pos_slot_desc;
+  }
+
+  /// Returns true if this reader materializes collections (i.e. 
CollectionValues).
+  virtual bool IsCollectionReader() const = 0;
+
+  const char* filename() const { return parent_->filename(); }
+
+  /// Read the current value (or null) into 'tuple' for this column. This 
should only be
+  /// called when a value is defined, i.e., def_level() >=
+  /// def_level_of_immediate_repeated_ancestor() (since empty or NULL 
collections produce
+  /// no output values), otherwise NextLevels() should be called instead.
+  ///
+  /// Advances this column reader to the next value (i.e. NextLevels() doesn't 
need to be
+  /// called after calling ReadValue()).
+  ///
+  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
+  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
+  /// true.
+  ///
+  /// NextLevels() must be called on this reader before calling ReadValue() 
for the first
+  /// time. This is to initialize the current value that ReadValue() will read.
+  ///
+  /// TODO: this is the function that needs to be codegen'd (e.g. 
CodegenReadValue())
+  /// The codegened functions from all the materialized cols will then be 
combined
+  /// into one function.
+  /// TODO: another option is to materialize col by col for the entire row 
batch in
+  /// one call.  e.g. MaterializeCol would write out 1024 values.  Our row 
batches
+  /// are currently dense so we'll need to figure out something there.
+  virtual bool ReadValue(MemPool* pool, Tuple* tuple) = 0;
+
+  /// Same as ReadValue() but does not advance repetition level. Only valid 
for columns
+  /// not in collections.
+  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) = 0;
+
+  /// Batched version of ReadValue() that reads up to max_values at once and 
materializes
+  /// them into tuples in tuple_mem. Returns the number of values actually 
materialized
+  /// in *num_values. The return value, error behavior and state changes are 
generally
+  /// the same as in ReadValue(). For example, if an error occurs in the 
middle of
+  /// materializing a batch then false is returned, and num_values, tuple_mem, 
as well as
+  /// this column reader are left in an undefined state, assuming that the 
caller will
+  /// immediately abort execution. NextLevels() does *not* need to be called 
before
+  /// ReadValueBatch(), unlike ReadValue().
+  virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
+      uint8_t* tuple_mem, int* num_values) = 0;
+
+  /// Batched version of ReadNonRepeatedValue() that reads up to max_values at 
once and
+  /// materializes them into tuples in tuple_mem.
+  /// The return value and error behavior are the same as in ReadValueBatch().
+  virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int 
tuple_size,
+      uint8_t* tuple_mem, int* num_values) = 0;
+
+  /// Advances this column reader's def and rep levels to the next logical 
value, i.e. to
+  /// the next scalar value or the beginning of the next collection, without 
attempting to
+  /// read the value. This is used to skip past def/rep levels that don't 
materialize a
+  /// value, such as the def/rep levels corresponding to an empty containing 
collection.
+  ///
+  /// NextLevels() must be called on this reader before calling ReadValue() 
for the first
+  /// time. This is to initialize the current value that ReadValue() will read.
+  ///
+  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
+  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
+  /// true.
+  virtual bool NextLevels() = 0;
+
+  /// Writes pos_current_value_ (i.e. "reads" the synthetic position field of 
the
+  /// parent collection) to 'pos' and increments pos_current_value_. Only 
valid to
+  /// call when doing non-batched reading, i.e. NextLevels() must have been 
called
+  /// before each call to this function to advance to the next element in the
+  /// collection.
+  inline void ReadPositionNonBatched(int64_t* pos);
+
+  /// Returns true if this column reader has reached the end of the row group.
+  inline bool RowGroupAtEnd() {
+    DCHECK_EQ(rep_level_ == ParquetLevel::ROW_GROUP_END,
+        def_level_ == ParquetLevel::ROW_GROUP_END);
+    return rep_level_ == ParquetLevel::ROW_GROUP_END;
+  }
+
+  /// If 'row_batch' is non-NULL, transfers the remaining resources backing 
tuples to it,
+  /// and frees up other resources. If 'row_batch' is NULL frees all resources 
instead.
+  virtual void Close(RowBatch* row_batch) = 0;
+
+ protected:
+  HdfsParquetScanner* parent_;
+  const SchemaNode& node_;
+  const SlotDescriptor* const slot_desc_;
+
+  /// The slot descriptor for the position field of the tuple, if there is 
one. NULL if
+  /// there's not. Only one column reader for a given tuple desc will have 
this set.
+  const SlotDescriptor* pos_slot_desc_;
+
+  /// The next value to write into the position slot, if there is one. 64-bit 
int because
+  /// the pos slot is always a BIGINT Set to ParquetLevel::INVALID_POS when 
this column
+  /// reader does not have a current rep and def level (i.e. before the first 
NextLevels()
+  /// call or after the last value in the column has been read).
+  int64_t pos_current_value_;
+
+  /// The current repetition and definition levels of this reader. Advanced via
+  /// ReadValue() and NextLevels(). Set to ParquetLevel:: INVALID_LEVEL before 
the first
+  /// NextLevels() call for a row group or if an error is encountered decoding 
a level.
+  /// Set to ROW_GROUP_END after the last value in the column has been read). 
If this is
+  /// not inside a collection, rep_level_ is always 0, 
ParquetLevel::INVALID_LEVEL or
+  /// ParquetLevel::ROW_GROUP_END.
+  /// int16_t is large enough to hold the valid levels 0-255 and negative 
sentinel values
+  /// ParquetLevel::INVALID_LEVEL and ParquetLevel::ROW_GROUP_END. The maximum 
values are
+  /// cached here because they are accessed in inner loops.
+  int16_t rep_level_;
+  const int16_t max_rep_level_;
+  int16_t def_level_;
+  const int16_t max_def_level_;
+
+  // Cache frequently accessed members of slot_desc_ for perf.
+
+  /// slot_desc_->tuple_offset(). -1 if slot_desc_ is NULL.
+  const int tuple_offset_;
+
+  /// slot_desc_->null_indicator_offset(). Invalid if slot_desc_ is NULL.
+  const NullIndicatorOffset null_indicator_offset_;
+
+  ParquetColumnReader(
+      HdfsParquetScanner* parent, const SchemaNode& node, const 
SlotDescriptor* slot_desc)
+    : parent_(parent),
+      node_(node),
+      slot_desc_(slot_desc),
+      pos_slot_desc_(NULL),
+      pos_current_value_(ParquetLevel::INVALID_POS),
+      rep_level_(ParquetLevel::INVALID_LEVEL),
+      max_rep_level_(node_.max_rep_level),
+      def_level_(ParquetLevel::INVALID_LEVEL),
+      max_def_level_(node_.max_def_level),
+      tuple_offset_(slot_desc == NULL ? -1 : slot_desc->tuple_offset()),
+      null_indicator_offset_(slot_desc == NULL ? NullIndicatorOffset() :
+                                                 
slot_desc->null_indicator_offset()) {
+    DCHECK(parent != nullptr);
+    parent->obj_pool_.Add(this);
+
+    DCHECK_GE(node_.max_rep_level, 0);
+    DCHECK_LE(node_.max_rep_level, std::numeric_limits<int16_t>::max());
+    DCHECK_GE(node_.max_def_level, 0);
+    DCHECK_LE(node_.max_def_level, std::numeric_limits<int16_t>::max());
+    // rep_level_ is always valid and equal to 0 if col not in collection.
+    if (max_rep_level() == 0) rep_level_ = 0;
+  }
+
+  /// Called in the middle of creating a scratch tuple batch to simulate 
failures
+  /// such as exceeding memory limit or cancellation. Returns false if the 
debug
+  /// action deems that the parquet column reader should halt execution. 
'val_count'
+  /// is the counter which the column reader uses to track the number of tuples
+  /// produced so far. If the column reader should halt execution, 
'parse_status_'
+  /// is updated with the error status and 'val_count' is set to 0.
+  inline bool ColReaderDebugAction(int* val_count);
+};
+
+/// Reader for a single column from the parquet file.  It's associated with a
+/// ScannerContext::Stream and is responsible for decoding the data.  Super 
class for
+/// per-type column readers. This contains most of the logic, the type 
specific functions
+/// must be implemented in the subclass.
+class BaseScalarColumnReader : public ParquetColumnReader {
+ public:
+  BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
+      const SlotDescriptor* slot_desc)
+    : ParquetColumnReader(parent, node, slot_desc),
+      data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())) {
+    DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
+  }
+
+  virtual ~BaseScalarColumnReader() { }
+
+  virtual bool IsCollectionReader() const { return false; }
+
+  /// Resets the reader for each row group in the file and creates the scan
+  /// range for the column, but does not start it. To start scanning,
+  /// set_io_reservation() must be called to assign reservation to this
+  /// column, followed by StartScan().
+  Status Reset(const HdfsFileDesc& file_desc, const parquet::ColumnChunk& 
col_chunk,
+    int row_group_idx);
+
+  /// Starts the column scan range. The reader must be Reset() and have a
+  /// reservation assigned via set_io_reservation(). This must be called
+  /// before any of the column data can be read (including dictionary and
+  /// data pages). Returns an error status if there was an error starting the
+  /// scan or allocating buffers for it.
+  Status StartScan();
+
+  /// Helper to start scans for multiple columns at once.
+  static Status StartScans(const std::vector<BaseScalarColumnReader*> readers) 
{
+    for (BaseScalarColumnReader* reader : readers) 
RETURN_IF_ERROR(reader->StartScan());
+    return Status::OK();
+  }
+
+  virtual void Close(RowBatch* row_batch);
+
+  io::ScanRange* scan_range() const { return scan_range_; }
+  int64_t total_len() const { return metadata_->total_compressed_size; }
+  int col_idx() const { return node_.col_idx; }
+  THdfsCompression::type codec() const {
+    if (metadata_ == NULL) return THdfsCompression::NONE;
+    return ConvertParquetToImpalaCodec(metadata_->codec);
+  }
+  void set_io_reservation(int bytes) { io_reservation_ = bytes; }
+
+  /// Reads the next definition and repetition levels for this column. 
Initializes the
+  /// next data page if necessary.
+  virtual bool NextLevels() { return NextLevels<true>(); }
+
+  /// Check the data stream to see if there is a dictionary page. If there is,
+  /// use that page to initialize dict_decoder_ and advance the data stream
+  /// past the dictionary page.
+  Status InitDictionary();
+
+  /// Convenience function to initialize multiple dictionaries.
+  static Status InitDictionaries(const std::vector<BaseScalarColumnReader*> 
readers);
+
+  // Returns the dictionary or NULL if the dictionary doesn't exist
+  virtual DictDecoderBase* GetDictionaryDecoder() { return nullptr; }
+
+  // Returns whether the datatype for this column requires conversion from the 
on-disk
+  // format for correctness. For example, timestamps can require an offset to 
be
+  // applied.
+  virtual bool NeedsConversion() { return false; }
+
+  // Returns whether the datatype for this column requires validation. For 
example,
+  // the timestamp format has certain bit combinations that are invalid, and 
these
+  // need to be validated when read from disk.
+  virtual bool NeedsValidation() { return false; }
+
+  // TODO: Some encodings might benefit a lot from a SkipValues(int num_rows) 
if
+  // we know this row can be skipped. This could be very useful with stats and 
big
+  // sections can be skipped. Implement that when we can benefit from it.
+
+ protected:
+  // Friend parent scanner so it can perform validation (e.g. 
ValidateEndOfRowGroup())
+  friend class HdfsParquetScanner;
+
+  // Class members that are accessed for every column should be included up 
here so they
+  // fit in as few cache lines as possible.
+
+  /// Pointer to start of next value in data page
+  uint8_t* data_ = nullptr;
+
+  /// End of the data page.
+  const uint8_t* data_end_ = nullptr;
+
+  /// Decoder for definition levels.
+  ParquetLevelDecoder def_levels_{true};
+
+  /// Decoder for repetition levels.
+  ParquetLevelDecoder rep_levels_{false};
+
+  /// Page encoding for values of the current data page. Cached here for perf. 
Set in
+  /// InitDataPage().
+  parquet::Encoding::type page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
+
+  /// Num values remaining in the current data page
+  int num_buffered_values_ = 0;
+
+  // Less frequently used members that are not accessed in inner loop should 
go below
+  // here so they do not occupy precious cache line space.
+
+  /// The number of values seen so far. Updated per data page.
+  int64_t num_values_read_ = 0;
+
+  /// Metadata for the column for the current row group.
+  const parquet::ColumnMetaData* metadata_ = nullptr;
+
+  boost::scoped_ptr<Codec> decompressor_;
+
+  /// The scan range for the column's data. Initialized for each row group by 
Reset().
+  io::ScanRange* scan_range_ = nullptr;
+
+  // Stream used to read data from 'scan_range_'. Initialized by StartScan().
+  ScannerContext::Stream* stream_ = nullptr;
+
+  /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. 
Must be set
+  /// with set_io_reservation() before 'stream_' is initialized. Reset for 
each row group
+  /// by Reset().
+  int64_t io_reservation_ = 0;
+
+  /// Pool to allocate storage for data pages from - either decompression 
buffers for
+  /// compressed data pages or copies of the data page with var-len data to 
attach to
+  /// batches.
+  boost::scoped_ptr<MemPool> data_page_pool_;
+
+  /// Header for current data page.
+  parquet::PageHeader current_page_header_;
+
+  /// Reads the next page header into next_page_header/next_header_size.
+  /// If the stream reaches the end before reading a complete page header,
+  /// eos is set to true. If peek is false, the stream position is advanced
+  /// past the page header. If peek is true, the stream position is not moved.
+  /// Returns an error status if the next page header could not be read.
+  Status ReadPageHeader(bool peek, parquet::PageHeader* next_page_header,
+      uint32_t* next_header_size, bool* eos);
+
+  /// Read the next data page. If a dictionary page is encountered, that will 
be read and
+  /// this function will continue reading the next data page.
+  Status ReadDataPage();
+
+  /// Try to move the the next page and buffer more values. Return false and
+  /// sets rep_level_, def_level_ and pos_current_value_ to -1 if no more 
pages or an
+  /// error encountered.
+  bool NextPage();
+
+  /// Implementation for NextLevels().
+  template <bool ADVANCE_REP_LEVEL>
+  bool NextLevels();
+
+  /// Creates a dictionary decoder from values/size. 'decoder' is set to point 
to a
+  /// dictionary decoder stored in this object. Subclass must implement this. 
Returns
+  /// an error status if the dictionary values could not be decoded 
successfully.
+  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
+      DictDecoderBase** decoder) = 0;
+
+  /// Return true if the column has a dictionary decoder. Subclass must 
implement this.
+  virtual bool HasDictionaryDecoder() = 0;
+
+  /// Clear the dictionary decoder so HasDictionaryDecoder() will return 
false. Subclass
+  /// must implement this.
+  virtual void ClearDictionaryDecoder() = 0;
+
+  /// Initializes the reader with the data contents. This is the content for 
the entire
+  /// decompressed data page. Decoders can initialize state from here. The 
caller must
+  /// validate the input such that 'size' is non-negative and that 'data' has 
at least
+  /// 'size' bytes remaining.
+  virtual Status InitDataPage(uint8_t* data, int size) = 0;
+
+  /// Allocate memory for the uncompressed contents of a data page of 'size' 
bytes from
+  /// 'data_page_pool_'. 'err_ctx' provides context for error messages. On 
success,
+  /// 'buffer' points to the allocated memory. Otherwise an error status is 
returned.
+  Status AllocateUncompressedDataPage(
+      int64_t size, const char* err_ctx, uint8_t** buffer);
+
+  /// Returns true if a data page for this column with the specified 
'encoding' may
+  /// contain strings referenced by returned batches. Cases where this is not 
true are:
+  /// * Dictionary-compressed pages, where any string data lives in 
'dictionary_pool_'.
+  /// * Fixed-length slots, where there is no string data.
+  bool PageContainsTupleData(parquet::Encoding::type page_encoding) {
+    return page_encoding != parquet::Encoding::PLAIN_DICTIONARY
+        && slot_desc_ != nullptr && slot_desc_->type().IsVarLenStringType();
+  }
+
+  /// Slow-path status construction code for def/rep decoding errors. 
'level_name' is
+  /// either "rep" or "def", 'decoded_level' is the value returned from
+  /// ParquetLevelDecoder::ReadLevel() and 'max_level' is the maximum allowed 
value.
+  void __attribute__((noinline)) SetLevelDecodeError(const char* level_name,
+      int decoded_level, int max_level);
+
+  // Returns a detailed error message about unsupported encoding.
+  Status GetUnsupportedDecodingError();
+};
+
+// Inline to allow inlining into collection and scalar column reader.
+inline void ParquetColumnReader::ReadPositionNonBatched(int64_t* pos) {
+  // NextLevels() should have already been called
+  DCHECK_GE(rep_level_, 0);
+  DCHECK_GE(def_level_, 0);
+  DCHECK_GE(pos_current_value_, 0);
+  DCHECK_GE(def_level_, def_level_of_immediate_repeated_ancestor()) <<
+      "Caller should have called NextLevels() until we are ready to read a 
value";
+  *pos = pos_current_value_++;
+}
+
+// Change 'val_count' to zero to exercise IMPALA-5197. This verifies the error 
handling
+// path doesn't falsely report that the file is corrupted.
+// Inlined to avoid overhead in release builds.
+inline bool ParquetColumnReader::ColReaderDebugAction(int* val_count) {
+#ifndef NDEBUG
+  Status status = parent_->ScannerDebugAction();
+  if (!status.ok()) {
+    if (!status.IsCancelled()) parent_->parse_status_.MergeStatus(status);
+    *val_count = 0;
+    return false;
+  }
+#endif
+  return true;
+}
+
+// Trigger debug action on every other call of Read*ValueBatch() once at least 
128
+// tuples have been produced to simulate failure such as exceeding memory 
limit.
+// Triggering it every other call so as not to always fail on the first column 
reader
+// when materializing multiple columns. Failing on non-empty row batch tests 
proper
+// resources freeing by the Parquet scanner.
+#ifndef NDEBUG
+extern int parquet_column_reader_debug_count;
+#define SHOULD_TRIGGER_COL_READER_DEBUG_ACTION(num_tuples) \
+    ((parquet_column_reader_debug_count++ % 2) == 1 && num_tuples >= 128)
+#else
+#define SHOULD_TRIGGER_COL_READER_DEBUG_ACTION(x) (false)
+#endif
+
+} // namespace impala
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-column-stats.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-column-stats.cc 
b/be/src/exec/parquet/parquet-column-stats.cc
new file mode 100644
index 0000000..478bba4
--- /dev/null
+++ b/be/src/exec/parquet/parquet-column-stats.cc
@@ -0,0 +1,193 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet-column-stats.inline.h"
+
+#include <algorithm>
+#include <cmath>
+#include <limits>
+
+#include "common/names.h"
+
+namespace impala {
+
+bool ColumnStatsReader::ReadFromThrift(StatsField stats_field, void* slot) 
const {
+  if (!(col_chunk_.__isset.meta_data && 
col_chunk_.meta_data.__isset.statistics)) {
+    return false;
+  }
+  const parquet::Statistics& stats = col_chunk_.meta_data.statistics;
+
+  // Try to read the requested stats field. If it is not set, we may fall back 
to reading
+  // the old stats, based on the column type.
+  const string* stat_value = nullptr;
+  switch (stats_field) {
+    case StatsField::MIN:
+      if (stats.__isset.min_value && CanUseStats()) {
+        stat_value = &stats.min_value;
+        break;
+      }
+      if (stats.__isset.min && CanUseDeprecatedStats()) {
+        stat_value = &stats.min;
+      }
+      break;
+    case StatsField::MAX:
+      if (stats.__isset.max_value && CanUseStats()) {
+        stat_value = &stats.max_value;
+        break;
+      }
+      if (stats.__isset.max && CanUseDeprecatedStats()) {
+        stat_value = &stats.max;
+      }
+      break;
+    default:
+      DCHECK(false) << "Unsupported statistics field requested";
+  }
+  if (stat_value == nullptr) return false;
+
+  switch (col_type_.type) {
+    case TYPE_BOOLEAN:
+      return ColumnStats<bool>::DecodePlainValue(*stat_value, slot,
+          parquet::Type::BOOLEAN);
+    case TYPE_TINYINT: {
+      // parquet::Statistics encodes INT_8 values using 4 bytes.
+      int32_t col_stats;
+      bool ret = ColumnStats<int32_t>::DecodePlainValue(*stat_value, 
&col_stats,
+          parquet::Type::INT32);
+      if (!ret || col_stats < std::numeric_limits<int8_t>::min() ||
+          col_stats > std::numeric_limits<int8_t>::max()) {
+        return false;
+      }
+      *static_cast<int8_t*>(slot) = col_stats;
+      return true;
+    }
+    case TYPE_SMALLINT: {
+      // parquet::Statistics encodes INT_16 values using 4 bytes.
+      int32_t col_stats;
+      bool ret = ColumnStats<int32_t>::DecodePlainValue(*stat_value, 
&col_stats,
+          parquet::Type::INT32);
+      if (!ret || col_stats < std::numeric_limits<int16_t>::min() ||
+          col_stats > std::numeric_limits<int16_t>::max()) {
+        return false;
+      }
+      *static_cast<int16_t*>(slot) = col_stats;
+      return true;
+    }
+    case TYPE_INT:
+      return ColumnStats<int32_t>::DecodePlainValue(*stat_value, slot, 
element_.type);
+    case TYPE_BIGINT:
+      return ColumnStats<int64_t>::DecodePlainValue(*stat_value, slot, 
element_.type);
+    case TYPE_FLOAT:
+      // IMPALA-6527, IMPALA-6538: ignore min/max stats if NaN
+      return ColumnStats<float>::DecodePlainValue(*stat_value, slot, 
element_.type)
+          && !std::isnan(*reinterpret_cast<float*>(slot));
+    case TYPE_DOUBLE:
+      // IMPALA-6527, IMPALA-6538: ignore min/max stats if NaN
+      return ColumnStats<double>::DecodePlainValue(*stat_value, slot, 
element_.type)
+          && !std::isnan(*reinterpret_cast<double*>(slot));
+    case TYPE_TIMESTAMP:
+      return DecodeTimestamp(*stat_value, stats_field,
+          static_cast<TimestampValue*>(slot));
+    case TYPE_STRING:
+    case TYPE_VARCHAR:
+      return ColumnStats<StringValue>::DecodePlainValue(*stat_value, slot, 
element_.type);
+    case TYPE_CHAR:
+      /// We don't read statistics for CHAR columns, since CHAR support is 
broken in
+      /// Impala (IMPALA-1652).
+      return false;
+    case TYPE_DECIMAL:
+      switch (col_type_.GetByteSize()) {
+        case 4:
+          return ColumnStats<Decimal4Value>::DecodePlainValue(*stat_value, 
slot,
+              element_.type);
+        case 8:
+          return ColumnStats<Decimal8Value>::DecodePlainValue(*stat_value, 
slot,
+              element_.type);
+        case 16:
+          return ColumnStats<Decimal16Value>::DecodePlainValue(*stat_value, 
slot,
+              element_.type);
+        }
+      DCHECK(false) << "Unknown decimal byte size: " << 
col_type_.GetByteSize();
+    default:
+      DCHECK(false) << col_type_.DebugString();
+  }
+  return false;
+}
+
+bool ColumnStatsReader::DecodeTimestamp(const std::string& stat_value,
+    ColumnStatsReader::StatsField stats_field, TimestampValue* slot) const {
+  bool stats_read = false;
+  if (element_.type == parquet::Type::INT96) {
+    stats_read =
+        ColumnStats<TimestampValue>::DecodePlainValue(stat_value, slot, 
element_.type);
+  } else if (element_.type == parquet::Type::INT64) {
+    int64_t tmp;
+    stats_read = ColumnStats<int64_t>::DecodePlainValue(stat_value, &tmp, 
element_.type);
+    if (stats_read) *slot = timestamp_decoder_.Int64ToTimestampValue(tmp);
+  } else {
+    DCHECK(false) << element_.name;
+    return false;
+  }
+
+  if (stats_read && timestamp_decoder_.NeedsConversion()) {
+    if (stats_field == ColumnStatsReader::StatsField::MIN) {
+      timestamp_decoder_.ConvertMinStatToLocalTime(slot);
+    } else {
+      timestamp_decoder_.ConvertMaxStatToLocalTime(slot);
+    }
+  }
+  return stats_read && slot->HasDateAndTime();
+}
+
+bool ColumnStatsReader::ReadNullCountStat(int64_t* null_count) const {
+  if (!(col_chunk_.__isset.meta_data && 
col_chunk_.meta_data.__isset.statistics)) {
+    return false;
+  }
+  const parquet::Statistics& stats = col_chunk_.meta_data.statistics;
+  if (stats.__isset.null_count) {
+    *null_count = stats.null_count;
+    return true;
+  }
+  return false;
+}
+
+Status ColumnStatsBase::CopyToBuffer(StringBuffer* buffer, StringValue* value) 
{
+  if (value->ptr == buffer->buffer()) return Status::OK();
+  buffer->Clear();
+  RETURN_IF_ERROR(buffer->Append(value->ptr, value->len));
+  value->ptr = buffer->buffer();
+  return Status::OK();
+}
+
+bool ColumnStatsReader::CanUseStats() const {
+  // If column order is not set, only statistics for numeric types can be 
trusted.
+  if (col_order_ == nullptr) {
+    return col_type_.IsBooleanType() || col_type_.IsIntegerType()
+        || col_type_.IsFloatingPointType();
+  }
+  // Stats can be used if the column order is TypeDefinedOrder (see 
parquet.thrift).
+  return col_order_->__isset.TYPE_ORDER;
+}
+
+bool ColumnStatsReader::CanUseDeprecatedStats() const {
+  // If column order is set to something other than TypeDefinedOrder, we shall 
not use the
+  // stats (see parquet.thrift).
+  if (col_order_ != nullptr && !col_order_->__isset.TYPE_ORDER) return false;
+  return col_type_.IsBooleanType() || col_type_.IsIntegerType()
+      || col_type_.IsFloatingPointType();
+}
+
+}  // end ns impala

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-column-stats.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-column-stats.h 
b/be/src/exec/parquet/parquet-column-stats.h
new file mode 100644
index 0000000..c259c7a
--- /dev/null
+++ b/be/src/exec/parquet/parquet-column-stats.h
@@ -0,0 +1,299 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_PARQUET_COLUMN_STATS_H
+#define IMPALA_EXEC_PARQUET_COLUMN_STATS_H
+
+#include <string>
+#include <type_traits>
+
+#include "exec/parquet/parquet-common.h"
+#include "runtime/decimal-value.h"
+#include "runtime/string-buffer.h"
+#include "runtime/timestamp-value.h"
+#include "runtime/types.h"
+
+#include "gen-cpp/parquet_types.h"
+
+namespace impala {
+
+/// This class, together with its derivatives, is used to update column 
statistics when
+/// writing parquet files. It provides an interface to populate a 
parquet::Statistics
+/// object and attach it to an object supplied by the caller. It can also be 
used to
+/// decode parquet::Statistics into slots.
+///
+/// We currently support writing the 'min_value' and 'max_value' fields in
+/// parquet::Statistics. The other two statistical values - 'null_count' and
+/// 'distinct_count' - are not tracked or populated. We do not populate the 
deprecated
+/// 'min' and 'max' fields.
+///
+/// Regarding the ordering of values, we follow the parquet-format 
specification for
+/// logical types (LogicalTypes.md in parquet-format):
+///
+/// - Numeric values (BOOLEAN, INT, FLOAT, DOUBLE, DECIMAL) are ordered by 
their numeric
+///   value (as opposed to their binary representation).
+///
+/// - Strings are ordered using bytewise, unsigned comparison.
+///
+/// - Timestamps are compared by numerically comparing the points in time they 
represent.
+///
+/// NULL values are not considered for min/max statistics, and if a column 
consists only
+/// of NULL values, then no min/max statistics are written.
+///
+/// Updating the statistics is handled in derived classes to alleviate the 
need for
+/// virtual function calls.
+///
+/// TODO: Populate null_count and distinct_count.
+class ColumnStatsBase {
+ public:
+  /// min and max functions for types that are not floating point numbers
+  template <typename T, typename Enable = void>
+  struct MinMaxTrait {
+    static decltype(auto) MinValue(const T& a, const T& b) { return 
std::min(a, b); }
+    static decltype(auto) MaxValue(const T& a, const T& b) { return 
std::max(a, b); }
+    static int Compare(const T& a, const T& b) {
+      if (a < b) return -1;
+      if (a > b) return 1;
+      return 0;
+    }
+  };
+
+  /// min and max functions for floating point types
+  template <typename T>
+  struct MinMaxTrait<T, std::enable_if_t<std::is_floating_point<T>::value>> {
+    static decltype(auto) MinValue(const T& a, const T& b) { return 
std::fmin(a, b); }
+    static decltype(auto) MaxValue(const T& a, const T& b) { return 
std::fmax(a, b); }
+    static int Compare(const T& a, const T& b) {
+      //TODO: Should be aligned with PARQUET-1222, once resolved
+      if (a == b) return 0;
+      if (std::isnan(a) && std::isnan(b)) return 0;
+      if (MaxValue(a, b) == a) return 1;
+      return -1;
+    }
+  };
+
+  ColumnStatsBase() : has_min_max_values_(false), null_count_(0) {}
+  virtual ~ColumnStatsBase() {}
+
+  /// Merges this statistics object with values from 'other'. If other has not 
been
+  /// initialized, then this object will not be changed. It maintains internal 
state that
+  /// tracks whether the min/max values are ordered.
+  virtual void Merge(const ColumnStatsBase& other) = 0;
+
+  /// Copies the contents of this object's statistics values to internal 
buffers. Some
+  /// data types (e.g. StringValue) need to be copied at the end of processing 
a row
+  /// batch, since the batch memory will be released. Overwrite this method in 
derived
+  /// classes to provide the functionality.
+  virtual Status MaterializeStringValuesToInternalBuffers() WARN_UNUSED_RESULT 
{
+    return Status::OK();
+  }
+
+  /// Returns the number of bytes needed to encode the current statistics into 
a
+  /// parquet::Statistics object.
+  virtual int64_t BytesNeeded() const = 0;
+
+  /// Encodes the current values into a Statistics thrift message.
+  virtual void EncodeToThrift(parquet::Statistics* out) const = 0;
+
+  /// Resets the state of this object.
+  void Reset();
+
+  /// Update the statistics by incrementing the null_count. It is called each 
time a null
+  /// value is appended to the column or the statistics are merged.
+  void IncrementNullCount(int64_t count) { null_count_ += count; }
+
+  /// Returns the boundary order of the pages. That is, whether the lists of 
min/max
+  /// elements inside the ColumnIndex are ordered and if so, in which 
direction.
+  /// If both 'ascending_boundary_order_' and 'descending_boundary_order_' is 
true,
+  /// it means all elements are equal, we choose ascending order in this case.
+  /// If only one flag is true, or both of them is false, then we return the 
identified
+  /// ordering, or unordered.
+  parquet::BoundaryOrder::type GetBoundaryOrder() const {
+    if (ascending_boundary_order_) return parquet::BoundaryOrder::ASCENDING;
+    if (descending_boundary_order_) return parquet::BoundaryOrder::DESCENDING;
+    return parquet::BoundaryOrder::UNORDERED;
+  }
+
+ protected:
+  // Copies the memory of 'value' into 'buffer' and make 'value' point to 
'buffer'.
+  // 'buffer' is reset before making the copy.
+  static Status CopyToBuffer(StringBuffer* buffer, StringValue* value) 
WARN_UNUSED_RESULT;
+
+  /// Stores whether the min and max values of the current object have been 
initialized.
+  bool has_min_max_values_;
+
+  // Number of null values since the last call to Reset().
+  int64_t null_count_;
+
+  // If true, min/max values are ascending.
+  // We assume the values are ascending, so start with true and only make it 
false when
+  // we find a descending value. If not all values are equal, then at least 
one of
+  // 'ascending_boundary_order_' and 'descending_boundary_order_' will be 
false.
+  bool ascending_boundary_order_ = true;
+
+  // If true, min/max values are descending.
+  // See description of 'ascending_boundary_order_'.
+  bool descending_boundary_order_ = true;
+};
+
+/// This class contains behavior specific to our in-memory formats for 
different types.
+template <typename T>
+class ColumnStats : public ColumnStatsBase {
+  friend class ColumnStatsBase;
+  // We explicitly require types to be listed here in order to support column 
statistics.
+  // When adding a type here, users of this class need to ensure that the 
statistics
+  // follow the ordering semantics of parquet's min/max statistics for the new 
type.
+  // Details on how the values should be ordered can be found in the 
'parquet-format'
+  // project in 'parquet.thrift' and 'LogicalTypes.md'.
+  using value_type = typename std::enable_if<
+      std::is_arithmetic<T>::value
+        || std::is_same<bool, T>::value
+        || std::is_same<StringValue, T>::value
+        || std::is_same<TimestampValue, T>::value
+        || std::is_same<Decimal4Value, T>::value
+        || std::is_same<Decimal8Value, T>::value
+        || std::is_same<Decimal16Value, T>::value,
+      T>::type;
+
+ public:
+  /// 'mem_pool' is used to materialize string values so that the user of this 
class can
+  /// free the memory of the original values.
+  /// 'plain_encoded_value_size' specifies the size of each encoded value in 
plain
+  /// encoding, -1 if the type is variable-length.
+  ColumnStats(MemPool* mem_pool, int plain_encoded_value_size)
+    : ColumnStatsBase(),
+      plain_encoded_value_size_(plain_encoded_value_size),
+      mem_pool_(mem_pool),
+      min_buffer_(mem_pool),
+      max_buffer_(mem_pool),
+      prev_page_min_buffer_(mem_pool),
+      prev_page_max_buffer_(mem_pool) {}
+
+  /// Updates the statistics based on the values min_value and max_value. If 
necessary,
+  /// initializes the statistics. It may keep a reference to either value until
+  /// MaterializeStringValuesToInternalBuffers() gets called.
+  void Update(const T& min_value, const T& max_value);
+
+  /// Wrapper to call the Update function which takes in the min_value and 
max_value.
+  void Update(const T& v) { Update(v, v); }
+
+  virtual void Merge(const ColumnStatsBase& other) override;
+  virtual Status MaterializeStringValuesToInternalBuffers() override {
+    return Status::OK();
+  }
+
+  virtual int64_t BytesNeeded() const override;
+  virtual void EncodeToThrift(parquet::Statistics* out) const override;
+
+  /// Decodes the plain encoded stats value from 'buffer' and writes the 
result into the
+  /// buffer pointed to by 'slot'. Returns true if decoding was successful, 
false
+  /// otherwise. For timestamps, an additional validation will be performed.
+  static bool DecodePlainValue(const std::string& buffer, void* slot,
+      parquet::Type::type parquet_type);
+
+ protected:
+  /// Encodes a single value using parquet's plain encoding and stores it into 
the binary
+  /// string 'out'. String values are stored without additional encoding. 
'bytes_needed'
+  /// must be positive.
+  static void EncodePlainValue(const T& v, int64_t bytes_needed, std::string* 
out);
+
+  /// Returns the number of bytes needed to encode value 'v'.
+  int64_t BytesNeeded(const T& v) const;
+
+  // Size of each encoded value in plain encoding, -1 if the type is 
variable-length.
+  int plain_encoded_value_size_;
+
+  // Minimum value since the last call to Reset().
+  T min_value_;
+
+  // Maximum value since the last call to Reset().
+  T max_value_;
+
+  // Minimum value of the previous page. Need to store that to calculate 
boundary order.
+  T prev_page_min_value_;
+
+  // Maximum value of the previous page. Need to store that to calculate 
boundary order.
+  T prev_page_max_value_;
+
+  // Memory pool to allocate from when making copies of the statistics data.
+  MemPool* mem_pool_;
+
+  // Local buffers to copy statistics data into.
+  StringBuffer min_buffer_;
+  StringBuffer max_buffer_;
+  StringBuffer prev_page_min_buffer_;
+  StringBuffer prev_page_max_buffer_;
+};
+
+/// Class that handles the decoding of Parquet stats (min/max/null_count) for 
a given
+/// column chunk.
+class ColumnStatsReader {
+public:
+  /// Enum to select whether to read minimum or maximum statistics. Values do 
not
+  /// correspond to fields in parquet::Statistics, but instead select between 
retrieving
+  /// the minimum or maximum value.
+  enum class StatsField { MIN, MAX };
+
+  ColumnStatsReader(const parquet::ColumnChunk& col_chunk,  const ColumnType& 
col_type,
+      const parquet::ColumnOrder* col_order, const parquet::SchemaElement& 
element)
+  : col_chunk_(col_chunk),
+    col_type_(col_type),
+    col_order_(col_order),
+    element_(element) {}
+
+  /// Sets extra information that is only needed for decoding TIMESTAMP stats.
+  void SetTimestampDecoder(ParquetTimestampDecoder timestamp_decoder) {
+    timestamp_decoder_ = timestamp_decoder;
+  }
+
+  /// Decodes the parquet::Statistics from 'col_chunk_' and writes the value 
selected by
+  /// 'stats_field' into the buffer pointed to by 'slot', based on 
'col_type_'. Returns
+  /// true if reading statistics for columns of type 'col_type_' is supported 
and decoding
+  /// was successful, false otherwise.
+  bool ReadFromThrift(StatsField stats_field, void* slot) const;
+
+  // Gets the null_count statistics from the column chunk's metadata and 
returns
+  // it via an output parameter.
+  // Returns true if the null_count stats were read successfully, false 
otherwise.
+  bool ReadNullCountStat(int64_t* null_count) const;
+
+private:
+  /// Returns true if we support reading statistics stored in the fields 
'min_value' and
+  /// 'max_value' in parquet::Statistics for the type 'col_type_' and the 
column order
+  /// 'col_order_'. Otherwise, returns false. If 'col_order_' is nullptr, only 
primitive
+  /// numeric types are supported.
+  bool CanUseStats() const;
+
+  /// Returns true if we consider statistics stored in the deprecated fields 
'min' and
+  /// 'max' in parquet::Statistics to be correct for the type 'col_type_' and 
the column
+  /// order 'col_order_'. Otherwise, returns false.
+  bool CanUseDeprecatedStats() const;
+
+  /// Decodes 'stat_value' and does INT64->TimestampValue and timezone 
conversions if
+  /// necessary. Returns true if the decoding and conversions were successful.
+  bool DecodeTimestamp(const std::string& stat_value,
+      ColumnStatsReader::StatsField stats_field,
+      TimestampValue* slot) const;
+
+  const parquet::ColumnChunk& col_chunk_;
+  const ColumnType& col_type_;
+  const parquet::ColumnOrder* col_order_;
+  const parquet::SchemaElement& element_;
+  ParquetTimestampDecoder timestamp_decoder_;
+};
+} // end ns impala
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-column-stats.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-column-stats.inline.h 
b/be/src/exec/parquet/parquet-column-stats.inline.h
new file mode 100644
index 0000000..933b3ae
--- /dev/null
+++ b/be/src/exec/parquet/parquet-column-stats.inline.h
@@ -0,0 +1,254 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_EXEC_PARQUET_COLUMN_STATS_INLINE_H
+#define IMPALA_EXEC_PARQUET_COLUMN_STATS_INLINE_H
+
+#include "exec/parquet/parquet-common.h"
+#include "gen-cpp/parquet_types.h"
+#include "parquet-column-stats.h"
+#include "runtime/string-value.inline.h"
+
+namespace impala {
+
+inline void ColumnStatsBase::Reset() {
+  has_min_max_values_ = false;
+  null_count_ = 0;
+  ascending_boundary_order_ = true;
+  descending_boundary_order_ = true;
+}
+
+template <typename T>
+inline void ColumnStats<T>::Update(const T& min_value, const T& max_value) {
+  if (!has_min_max_values_) {
+    has_min_max_values_ = true;
+    min_value_ = min_value;
+    max_value_ = max_value;
+  } else {
+    min_value_ = MinMaxTrait<T>::MinValue(min_value_, min_value);
+    max_value_ = MinMaxTrait<T>::MaxValue(max_value_, max_value);
+  }
+}
+
+template <typename T>
+inline void ColumnStats<T>::Merge(const ColumnStatsBase& other) {
+  DCHECK(dynamic_cast<const ColumnStats<T>*>(&other));
+  const ColumnStats<T>* cs = static_cast<const ColumnStats<T>*>(&other);
+  if (cs->has_min_max_values_) {
+    if (has_min_max_values_) {
+      if (ascending_boundary_order_) {
+        if (MinMaxTrait<T>::Compare(prev_page_max_value_, cs->max_value_) > 0 
||
+            MinMaxTrait<T>::Compare(prev_page_min_value_, cs->min_value_) > 0) 
{
+          ascending_boundary_order_ = false;
+        }
+      }
+      if (descending_boundary_order_) {
+        if (MinMaxTrait<T>::Compare(prev_page_max_value_, cs->max_value_) < 0 
||
+            MinMaxTrait<T>::Compare(prev_page_min_value_, cs->min_value_) < 0) 
{
+          descending_boundary_order_ = false;
+        }
+      }
+    }
+    Update(cs->min_value_, cs->max_value_);
+    prev_page_min_value_ = cs->min_value_;
+    prev_page_max_value_ = cs->max_value_;
+  }
+  IncrementNullCount(cs->null_count_);
+}
+
+template <typename T>
+inline int64_t ColumnStats<T>::BytesNeeded() const {
+  return BytesNeeded(min_value_) + BytesNeeded(max_value_)
+      + ParquetPlainEncoder::ByteSize(null_count_);
+}
+
+template <typename T>
+inline void ColumnStats<T>::EncodeToThrift(parquet::Statistics* out) const {
+  if (has_min_max_values_) {
+    std::string min_str;
+    EncodePlainValue(min_value_, BytesNeeded(min_value_), &min_str);
+    out->__set_min_value(move(min_str));
+    std::string max_str;
+    EncodePlainValue(max_value_, BytesNeeded(max_value_), &max_str);
+    out->__set_max_value(move(max_str));
+  }
+  out->__set_null_count(null_count_);
+}
+
+template <typename T>
+inline void ColumnStats<T>::EncodePlainValue(
+    const T& v, int64_t bytes_needed, std::string* out) {
+  DCHECK_GT(bytes_needed, 0);
+  out->resize(bytes_needed);
+  const int64_t bytes_written = ParquetPlainEncoder::Encode(
+      v, bytes_needed, reinterpret_cast<uint8_t*>(&(*out)[0]));
+  DCHECK_EQ(bytes_needed, bytes_written);
+}
+
+template <typename T>
+inline bool ColumnStats<T>::DecodePlainValue(const std::string& buffer, void* 
slot,
+    parquet::Type::type parquet_type) {
+  T* result = reinterpret_cast<T*>(slot);
+  int size = buffer.size();
+  const uint8_t* data = reinterpret_cast<const uint8_t*>(buffer.data());
+  if (ParquetPlainEncoder::DecodeByParquetType<T>(data, data + size, size, 
result,
+      parquet_type) == -1) {
+    return false;
+  }
+  return true;
+}
+
+template <typename T>
+inline int64_t ColumnStats<T>::BytesNeeded(const T& v) const {
+  return plain_encoded_value_size_ < 0 ? ParquetPlainEncoder::ByteSize<T>(v) :
+      plain_encoded_value_size_;
+}
+
+/// Plain encoding for Boolean values is not handled by the 
ParquetPlainEncoder and thus
+/// needs special handling here.
+template <>
+inline void ColumnStats<bool>::EncodePlainValue(
+    const bool& v, int64_t bytes_needed, std::string* out) {
+  char c = v;
+  out->assign(1, c);
+}
+
+template <>
+inline bool ColumnStats<bool>::DecodePlainValue(const std::string& buffer, 
void* slot,
+    parquet::Type::type parquet_type) {
+  bool* result = reinterpret_cast<bool*>(slot);
+  DCHECK(buffer.size() == 1);
+  *result = (buffer[0] != 0);
+  return true;
+}
+
+template <>
+inline int64_t ColumnStats<bool>::BytesNeeded(const bool& v) const {
+  return 1;
+}
+
+/// Timestamp values need validation.
+template <>
+inline bool ColumnStats<TimestampValue>::DecodePlainValue(
+    const std::string& buffer, void* slot, parquet::Type::type parquet_type) {
+  TimestampValue* result = reinterpret_cast<TimestampValue*>(slot);
+  int size = buffer.size();
+  const uint8_t* data = reinterpret_cast<const uint8_t*>(buffer.data());
+  if (parquet_type == parquet::Type::INT96) {
+    if (ParquetPlainEncoder::Decode<TimestampValue, parquet::Type::INT96>(data,
+        data + size, size, result) == -1) {
+      return false;
+    }
+  } else {
+    DCHECK(false);
+    return false;
+  }
+
+  // We don't need to convert the value here, because it is done by the caller.
+  // If this function were not static, then it would be possible to store the 
information
+  // needed for timezone conversion in the object and do the conversion here.
+  return TimestampValue::IsValidDate(result->date());
+}
+
+/// parquet::Statistics stores string values directly and does not use plain 
encoding.
+template <>
+inline void ColumnStats<StringValue>::EncodePlainValue(
+    const StringValue& v, int64_t bytes_needed, string* out) {
+  out->assign(v.ptr, v.len);
+}
+
+template <>
+inline bool ColumnStats<StringValue>::DecodePlainValue(
+    const std::string& buffer, void* slot, parquet::Type::type parquet_type) {
+  StringValue* result = reinterpret_cast<StringValue*>(slot);
+  result->ptr = const_cast<char*>(buffer.data());
+  result->len = buffer.size();
+  return true;
+}
+
+template <>
+inline void ColumnStats<StringValue>::Update(
+    const StringValue& min_value, const StringValue& max_value) {
+  if (!has_min_max_values_) {
+    has_min_max_values_ = true;
+    min_value_ = min_value;
+    min_buffer_.Clear();
+    max_value_ = max_value;
+    max_buffer_.Clear();
+  } else {
+    if (min_value < min_value_) {
+      min_value_ = min_value;
+      min_buffer_.Clear();
+    }
+    if (max_value > max_value_) {
+      max_value_ = max_value;
+      max_buffer_.Clear();
+    }
+  }
+}
+
+template <>
+inline void ColumnStats<StringValue>::Merge(const ColumnStatsBase& other) {
+  DCHECK(dynamic_cast<const ColumnStats<StringValue>*>(&other));
+  const ColumnStats<StringValue>* cs = static_cast<
+      const ColumnStats<StringValue>*>(&other);
+  if (cs->has_min_max_values_) {
+    if (has_min_max_values_) {
+      // Make sure that we copied the previous page's min/max values to their 
own buffer.
+      DCHECK_NE(static_cast<void*>(prev_page_min_value_.ptr),
+                static_cast<void*>(cs->min_value_.ptr));
+      DCHECK_NE(static_cast<void*>(prev_page_max_value_.ptr),
+                static_cast<void*>(cs->max_value_.ptr));
+      if (ascending_boundary_order_) {
+        if (prev_page_max_value_ > cs->max_value_ ||
+            prev_page_min_value_ > cs->min_value_) {
+          ascending_boundary_order_ = false;
+        }
+      }
+      if (descending_boundary_order_) {
+        if (prev_page_max_value_ < cs->max_value_ ||
+            prev_page_min_value_ < cs->min_value_) {
+          descending_boundary_order_ = false;
+        }
+      }
+    }
+    Update(cs->min_value_, cs->max_value_);
+    prev_page_min_value_ = cs->min_value_;
+    prev_page_max_value_ = cs->max_value_;
+    prev_page_min_buffer_.Clear();
+    prev_page_max_buffer_.Clear();
+  }
+  IncrementNullCount(cs->null_count_);
+}
+
+// StringValues need to be copied at the end of processing a row batch, since 
the batch
+// memory will be released.
+template <>
+inline Status 
ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() {
+  if (min_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&min_buffer_, 
&min_value_));
+  if (max_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&max_buffer_, 
&max_value_));
+  if (prev_page_min_buffer_.IsEmpty()) {
+    RETURN_IF_ERROR(CopyToBuffer(&prev_page_min_buffer_, 
&prev_page_min_value_));
+  }
+  if (prev_page_max_buffer_.IsEmpty()) {
+    RETURN_IF_ERROR(CopyToBuffer(&prev_page_max_buffer_, 
&prev_page_max_value_));
+  }
+  return Status::OK();
+}
+
+} // end ns impala
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-common.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-common.cc 
b/be/src/exec/parquet/parquet-common.cc
new file mode 100644
index 0000000..439d351
--- /dev/null
+++ b/be/src/exec/parquet/parquet-common.cc
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/parquet/parquet-common.h"
+
+namespace impala {
+
+/// Mapping of impala's internal types to parquet storage types. This is 
indexed by
+/// PrimitiveType enum
+const parquet::Type::type INTERNAL_TO_PARQUET_TYPES[] = {
+  parquet::Type::BOOLEAN,     // Invalid
+  parquet::Type::BOOLEAN,     // NULL type
+  parquet::Type::BOOLEAN,
+  parquet::Type::INT32,
+  parquet::Type::INT32,
+  parquet::Type::INT32,
+  parquet::Type::INT64,
+  parquet::Type::FLOAT,
+  parquet::Type::DOUBLE,
+  parquet::Type::INT96,       // Timestamp
+  parquet::Type::BYTE_ARRAY,  // String
+  parquet::Type::BYTE_ARRAY,  // Date, NYI
+  parquet::Type::BYTE_ARRAY,  // DateTime, NYI
+  parquet::Type::BYTE_ARRAY,  // Binary NYI
+  parquet::Type::FIXED_LEN_BYTE_ARRAY, // Decimal
+  parquet::Type::BYTE_ARRAY,  // VARCHAR(N)
+  parquet::Type::BYTE_ARRAY,  // CHAR(N)
+};
+
+const int INTERNAL_TO_PARQUET_TYPES_SIZE =
+  sizeof(INTERNAL_TO_PARQUET_TYPES) / sizeof(INTERNAL_TO_PARQUET_TYPES[0]);
+
+/// Mapping of Parquet codec enums to Impala enums
+const THdfsCompression::type PARQUET_TO_IMPALA_CODEC[] = {
+  THdfsCompression::NONE,
+  THdfsCompression::SNAPPY,
+  THdfsCompression::GZIP,
+  THdfsCompression::LZO
+};
+
+const int PARQUET_TO_IMPALA_CODEC_SIZE =
+    sizeof(PARQUET_TO_IMPALA_CODEC) / sizeof(PARQUET_TO_IMPALA_CODEC[0]);
+
+/// Mapping of Impala codec enums to Parquet enums
+const parquet::CompressionCodec::type IMPALA_TO_PARQUET_CODEC[] = {
+  parquet::CompressionCodec::UNCOMPRESSED,
+  parquet::CompressionCodec::SNAPPY,  // DEFAULT
+  parquet::CompressionCodec::GZIP,    // GZIP
+  parquet::CompressionCodec::GZIP,    // DEFLATE
+  parquet::CompressionCodec::SNAPPY,
+  parquet::CompressionCodec::SNAPPY,  // SNAPPY_BLOCKED
+  parquet::CompressionCodec::LZO,
+};
+
+const int IMPALA_TO_PARQUET_CODEC_SIZE =
+    sizeof(IMPALA_TO_PARQUET_CODEC) / sizeof(IMPALA_TO_PARQUET_CODEC[0]);
+
+parquet::Type::type ConvertInternalToParquetType(PrimitiveType type) {
+  DCHECK_GE(type, 0);
+  DCHECK_LT(type, INTERNAL_TO_PARQUET_TYPES_SIZE);
+  return INTERNAL_TO_PARQUET_TYPES[type];
+}
+
+THdfsCompression::type ConvertParquetToImpalaCodec(
+    parquet::CompressionCodec::type codec) {
+  DCHECK_GE(codec, 0);
+  DCHECK_LT(codec, PARQUET_TO_IMPALA_CODEC_SIZE);
+  return PARQUET_TO_IMPALA_CODEC[codec];
+}
+
+parquet::CompressionCodec::type ConvertImpalaToParquetCodec(
+    THdfsCompression::type codec) {
+  DCHECK_GE(codec, 0);
+  DCHECK_LT(codec, IMPALA_TO_PARQUET_CODEC_SIZE);
+  return IMPALA_TO_PARQUET_CODEC[codec];
+}
+
+ParquetTimestampDecoder::ParquetTimestampDecoder(const parquet::SchemaElement& 
e,
+    const Timezone* timezone, bool convert_int96_timestamps) {
+  bool needs_conversion = false;
+  if (e.__isset.logicalType) {
+    DCHECK(e.logicalType.__isset.TIMESTAMP);
+    needs_conversion = e.logicalType.TIMESTAMP.isAdjustedToUTC;
+    precision_ = e.logicalType.TIMESTAMP.unit.__isset.MILLIS
+        ? ParquetTimestampDecoder::MILLI : ParquetTimestampDecoder::MICRO;
+  } else {
+    if (e.__isset.converted_type) {
+      // Timestamp with converted type but without logical type are/were never 
written
+      // by Impala, so it is assumed that the writer is Parquet-mr and that 
timezone
+      // conversion is needed.
+      needs_conversion = true;
+      precision_ = e.converted_type == parquet::ConvertedType::TIMESTAMP_MILLIS
+          ? ParquetTimestampDecoder::MILLI : ParquetTimestampDecoder::MICRO;
+    } else {
+      // INT96 timestamps needs conversion depending on the writer.
+      needs_conversion = convert_int96_timestamps;
+      precision_ = ParquetTimestampDecoder::NANO;
+    }
+  }
+  if (needs_conversion) timezone_ = timezone;
+}
+
+void ParquetTimestampDecoder::ConvertMinStatToLocalTime(TimestampValue* v) 
const {
+  DCHECK(timezone_ != nullptr);
+  if (!v->HasDateAndTime()) return;
+  TimestampValue repeated_period_start;
+  v->UtcToLocal(*timezone_, &repeated_period_start);
+  if (repeated_period_start.HasDateAndTime()) *v = repeated_period_start;
+}
+
+void ParquetTimestampDecoder::ConvertMaxStatToLocalTime(TimestampValue* v) 
const {
+  DCHECK(timezone_ != nullptr);
+  if (!v->HasDateAndTime()) return;
+  TimestampValue repeated_period_end;
+  v->UtcToLocal(*timezone_, nullptr, &repeated_period_end);
+  if (repeated_period_end.HasDateAndTime()) *v = repeated_period_end;
+}
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-common.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-common.h 
b/be/src/exec/parquet/parquet-common.h
new file mode 100644
index 0000000..79baeda
--- /dev/null
+++ b/be/src/exec/parquet/parquet-common.h
@@ -0,0 +1,565 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_EXEC_PARQUET_COMMON_H
+#define IMPALA_EXEC_PARQUET_COMMON_H
+
+#include "common/compiler-util.h"
+#include "gen-cpp/Descriptors_types.h"
+#include "gen-cpp/parquet_types.h"
+#include "runtime/decimal-value.h"
+#include "runtime/string-value.h"
+#include "runtime/timestamp-value.inline.h"
+#include "util/bit-util.h"
+#include "util/decimal-util.h"
+#include "util/mem-util.h"
+
+/// This file contains common elements between the parquet Writer and Scanner.
+namespace impala {
+
+const uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
+const uint32_t PARQUET_CURRENT_VERSION = 1;
+
+/// Return the Parquet type corresponding to Impala's internal type. The 
caller must
+/// validate that the type is valid, otherwise this will DCHECK.
+parquet::Type::type ConvertInternalToParquetType(PrimitiveType type);
+
+/// Return the Impala compression type for the given Parquet codec. The caller 
must
+/// validate that the codec is a supported one, otherwise this will DCHECK.
+THdfsCompression::type 
ConvertParquetToImpalaCodec(parquet::CompressionCodec::type codec);
+
+/// Return the Parquet code for the given Impala compression type. The caller 
must
+/// validate that the codec is a supported one, otherwise this will DCHECK.
+parquet::CompressionCodec::type ConvertImpalaToParquetCodec(
+    THdfsCompression::type codec);
+
+/// The plain encoding does not maintain any state so all these functions
+/// are static helpers.
+/// TODO: we are using templates to provide a generic interface (over the
+/// types) to avoid performance penalties. This makes the code more complex
+/// and should be removed when we have codegen support to inline virtual
+/// calls.
+class ParquetPlainEncoder {
+ public:
+  /// Returns the byte size of 'v' where InternalType is the datatype that 
Impala uses
+  /// internally to store tuple data. Used in some template function 
implementations to
+  /// determine the encoded byte size for fixed-length types.
+  template <typename InternalType>
+  static int ByteSize(const InternalType& v) { return sizeof(InternalType); }
+
+  /// Returns the encoded size of values of type t. Returns -1 if it is 
variable
+  /// length. This can be different than the slot size of the types.
+  static int EncodedByteSize(const ColumnType& t) {
+    switch (t.type) {
+      case TYPE_STRING:
+      case TYPE_VARCHAR:
+      case TYPE_CHAR:
+        // CHAR is varlen here because we don't write the padding to the file
+        return -1;
+      case TYPE_TINYINT:
+      case TYPE_SMALLINT:
+      case TYPE_INT:
+      case TYPE_FLOAT:
+        return 4;
+      case TYPE_BIGINT:
+      case TYPE_DOUBLE:
+        return 8;
+      case TYPE_TIMESTAMP:
+        return 12;
+      case TYPE_DECIMAL:
+        return DecimalSize(t);
+      case TYPE_NULL:
+      case TYPE_BOOLEAN: // These types are not plain encoded.
+      default:
+        DCHECK(false);
+        return -1;
+    }
+  }
+
+  /// The minimum byte size to store decimals of with precision t.precision.
+  static int DecimalSize(const ColumnType& t) {
+    DCHECK(t.type == TYPE_DECIMAL);
+    // Numbers in the comment is the max positive value that can be represented
+    // with those number of bits (max negative is -(X + 1)).
+    // TODO: use closed form for this?
+    switch (t.precision) {
+      case 1: case 2:
+        return 1; // 127
+      case 3: case 4:
+        return 2; // 32,767
+      case 5: case 6:
+        return 3; // 8,388,607
+      case 7: case 8: case 9:
+        return 4; // 2,147,483,427
+      case 10: case 11:
+        return 5; // 549,755,813,887
+      case 12: case 13: case 14:
+        return 6; // 140,737,488,355,327
+      case 15: case 16:
+        return 7; // 36,028,797,018,963,967
+      case 17: case 18:
+        return 8; // 9,223,372,036,854,775,807
+      case 19: case 20: case 21:
+        return 9; // 2,361,183,241,434,822,606,847
+      case 22: case 23:
+        return 10; // 604,462,909,807,314,587,353,087
+      case 24: case 25: case 26:
+        return 11; // 154,742,504,910,672,534,362,390,527
+      case 27: case 28:
+        return 12; // 39,614,081,257,132,168,796,771,975,167
+      case 29: case 30: case 31:
+        return 13; // 10,141,204,801,825,835,211,973,625,643,007
+      case 32: case 33:
+        return 14; // 2,596,148,429,267,413,814,265,248,164,610,047
+      case 34: case 35:
+        return 15; // 664,613,997,892,457,936,451,903,530,140,172,287
+      case 36: case 37: case 38:
+        return 16; // 170,141,183,460,469,231,731,687,303,715,884,105,727
+      default:
+        DCHECK(false);
+        break;
+    }
+    return -1;
+  }
+
+  /// Encodes t into buffer. Returns the number of bytes added.  buffer must
+  /// be preallocated and big enough.  Buffer need not be aligned.
+  /// 'fixed_len_size' is only applicable for data encoded using 
FIXED_LEN_BYTE_ARRAY and
+  /// is the number of bytes the plain encoder should use.
+  template <typename InternalType>
+  static int Encode(const InternalType& t, int fixed_len_size, uint8_t* 
buffer) {
+    memcpy(buffer, &t, ByteSize(t));
+    return ByteSize(t);
+  }
+
+  template <typename InternalType>
+  static int DecodeByParquetType(const uint8_t* buffer, const uint8_t* 
buffer_end,
+      int fixed_len_size, InternalType* v, parquet::Type::type parquet_type) {
+    switch (parquet_type) {
+      case parquet::Type::BOOLEAN:
+        return ParquetPlainEncoder::Decode<InternalType, 
parquet::Type::BOOLEAN>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::INT32:
+        return ParquetPlainEncoder::Decode<InternalType, 
parquet::Type::INT32>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::INT64:
+        return ParquetPlainEncoder::Decode<InternalType, 
parquet::Type::INT64>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::INT96:
+        return ParquetPlainEncoder::Decode<InternalType, 
parquet::Type::INT96>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::FLOAT:
+        return ParquetPlainEncoder::Decode<InternalType, 
parquet::Type::FLOAT>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::DOUBLE:
+        return ParquetPlainEncoder::Decode<InternalType, 
parquet::Type::DOUBLE>(buffer,
+            buffer_end, fixed_len_size, v);
+      case parquet::Type::BYTE_ARRAY:
+        return ParquetPlainEncoder::Decode<InternalType,
+            parquet::Type::BYTE_ARRAY>(buffer, buffer_end, fixed_len_size, v);
+      case parquet::Type::FIXED_LEN_BYTE_ARRAY:
+        return ParquetPlainEncoder::Decode<InternalType,
+            parquet::Type::FIXED_LEN_BYTE_ARRAY>(buffer, buffer_end, 
fixed_len_size, v);
+      default:
+        DCHECK(false) << "Unexpected physical type";
+        return -1;
+    }
+  }
+
+  /// Decodes t from 'buffer', reading up to the byte before 'buffer_end'. 
'buffer'
+  /// need not be aligned. If PARQUET_TYPE is FIXED_LEN_BYTE_ARRAY then 
'fixed_len_size'
+  /// is the size of the object. Otherwise, it is unused.
+  /// Returns the number of bytes read or -1 if the value was not decoded 
successfully.
+  /// This generic template function is used with the following types:
+  /// =============================
+  /// InternalType   | PARQUET_TYPE
+  /// =============================
+  /// int32_t        | INT32
+  /// int64_t        | INT64
+  /// float          | FLOAT
+  /// double         | DOUBLE
+  /// Decimal4Value  | INT32
+  /// Decimal8Value  | INT64
+  /// TimestampValue | INT96
+  template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+  static int Decode(const uint8_t* buffer, const uint8_t* buffer_end, int 
fixed_len_size,
+      InternalType* v) {
+    int byte_size = ByteSize(*v);
+    if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
+    memcpy(v, buffer, byte_size);
+    return byte_size;
+  }
+
+  /// Batched version of Decode() that tries to decode 'num_values' values 
from the memory
+  /// range [buffer, buffer_end) and writes them to 'v' with a stride of 
'stride' bytes.
+  /// Returns the number of bytes read from 'buffer' or -1 if there was an 
error
+  /// decoding, e.g. invalid data or running out of input data before reading
+  /// 'num_values'.
+  template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+  static int64_t DecodeBatch(const uint8_t* buffer, const uint8_t* buffer_end,
+      int fixed_len_size, int64_t num_values, int64_t stride, InternalType* v);
+};
+
+/// Calling this with arguments of type ColumnType is certainly a programmer 
error, so we
+/// disallow it.
+template <> int ParquetPlainEncoder::ByteSize(const ColumnType& t);
+
+/// Disable for bools. Plain encoding is not used for booleans.
+template <> int ParquetPlainEncoder::ByteSize(const bool& b);
+template <> int ParquetPlainEncoder::Encode(const bool&, int fixed_len_size, 
uint8_t*);
+template <> int ParquetPlainEncoder::Decode<bool, 
parquet::Type::BOOLEAN>(const uint8_t*,
+    const uint8_t*, int fixed_len_size, bool* v);
+
+template <>
+inline int ParquetPlainEncoder::ByteSize(const Decimal4Value&) {
+  // Only used when the decimal is stored as INT32.
+  return sizeof(Decimal4Value::StorageType);
+}
+template <>
+inline int ParquetPlainEncoder::ByteSize(const Decimal8Value&) {
+  // Only used when the decimal is stored as INT64.
+  return sizeof(Decimal8Value::StorageType);
+}
+template <>
+inline int ParquetPlainEncoder::ByteSize(const Decimal16Value&) {
+  // Not used, since such big decimals can only be stored as BYTE_ARRAY or
+  // FIXED_LEN_BYTE_ARRAY.
+  DCHECK(false);
+  return -1;
+}
+
+/// Parquet doesn't have 8-bit or 16-bit ints. They are converted to 32-bit.
+template <>
+inline int ParquetPlainEncoder::ByteSize(const int8_t& v) { return 
sizeof(int32_t); }
+template <>
+inline int ParquetPlainEncoder::ByteSize(const int16_t& v) { return 
sizeof(int32_t); }
+
+template <>
+inline int ParquetPlainEncoder::ByteSize(const StringValue& v) {
+  return sizeof(int32_t) + v.len;
+}
+
+template <>
+inline int ParquetPlainEncoder::ByteSize(const TimestampValue& v) {
+  return 12;
+}
+
+template <typename From, typename To>
+inline int DecodeWithConversion(const uint8_t* buffer, const uint8_t* 
buffer_end, To* v) {
+  int byte_size = sizeof(From);
+  if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
+  From dest;
+  memcpy(&dest, buffer, byte_size);
+  *v = dest;
+  return byte_size;
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<int64_t, parquet::Type::INT32>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, 
int64_t* v) {
+  return DecodeWithConversion<int32_t, int64_t>(buffer, buffer_end, v);
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<double, parquet::Type::INT32>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, 
double* v) {
+  return DecodeWithConversion<int32_t, double>(buffer, buffer_end, v);
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<double, parquet::Type::FLOAT>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, 
double* v) {
+  return DecodeWithConversion<float, double>(buffer, buffer_end, v);
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<int8_t, parquet::Type::INT32>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, 
int8_t* v) {
+  int byte_size = ByteSize(*v);
+  if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
+  *v = *buffer;
+  return byte_size;
+}
+template <>
+inline int ParquetPlainEncoder::Decode<int16_t, parquet::Type::INT32>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, 
int16_t* v) {
+  int byte_size = ByteSize(*v);
+  if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
+  memcpy(v, buffer, sizeof(int16_t));
+  return byte_size;
+}
+
+template<typename T>
+inline int EncodeToInt32(const T& v, int fixed_len_size, uint8_t* buffer) {
+  int32_t val = v;
+  memcpy(buffer, &val, sizeof(int32_t));
+  return ParquetPlainEncoder::ByteSize(v);
+}
+
+template <>
+inline int ParquetPlainEncoder::Encode(
+    const int8_t& v, int fixed_len_size, uint8_t* buffer) {
+  return EncodeToInt32(v, fixed_len_size, buffer);
+}
+
+template <>
+inline int ParquetPlainEncoder::Encode(
+    const int16_t& v, int fixed_len_size, uint8_t* buffer) {
+  return EncodeToInt32(v, fixed_len_size, buffer);
+}
+
+template <>
+inline int ParquetPlainEncoder::Encode(
+    const StringValue& v, int fixed_len_size, uint8_t* buffer) {
+  memcpy(buffer, &v.len, sizeof(int32_t));
+  memcpy(buffer + sizeof(int32_t), v.ptr, v.len);
+  return ByteSize(v);
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<StringValue, parquet::Type::BYTE_ARRAY>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
+    StringValue* v) {
+  if (UNLIKELY(buffer_end - buffer < sizeof(int32_t))) return -1;
+  memcpy(&v->len, buffer, sizeof(int32_t));
+  int byte_size = ByteSize(*v);
+  if (UNLIKELY(v->len < 0 || buffer_end - buffer < byte_size)) return -1;
+  v->ptr = reinterpret_cast<char*>(const_cast<uint8_t*>(buffer)) + 
sizeof(int32_t);
+  if (fixed_len_size > 0) v->len = std::min(v->len, fixed_len_size);
+  // we still read byte_size bytes, even if we truncate
+  return byte_size;
+}
+
+/// Write decimals as big endian (byte comparable) to benefit from common 
prefixes.
+/// fixed_len_size can be less than sizeof(Decimal*Value) for space savings. 
This means
+/// that the value in the in-memory format has leading zeros or negative 1's.
+/// For example, precision 2 fits in 1 byte. All decimals stored as 
Decimal4Value
+/// will have 3 bytes of leading zeros, we will only store the interesting 
byte.
+template<typename T>
+inline int EncodeDecimal(const T& v, int fixed_len_size, uint8_t* buffer) {
+  DecimalUtil::EncodeToFixedLenByteArray(buffer, fixed_len_size, v);
+  return fixed_len_size;
+}
+
+template <>
+inline int ParquetPlainEncoder::Encode(
+    const Decimal4Value& v, int fixed_len_size, uint8_t* buffer) {
+  return EncodeDecimal(v, fixed_len_size, buffer);
+}
+
+template <>
+inline int ParquetPlainEncoder::Encode(
+    const Decimal8Value& v, int fixed_len_size, uint8_t* buffer) {
+  return EncodeDecimal(v, fixed_len_size, buffer);
+}
+
+template <>
+inline int ParquetPlainEncoder::Encode(
+    const Decimal16Value& v, int fixed_len_size, uint8_t* buffer) {
+  return EncodeDecimal(v, fixed_len_size, buffer);
+}
+
+template <typename InternalType, parquet::Type::type PARQUET_TYPE>
+inline int64_t ParquetPlainEncoder::DecodeBatch(const uint8_t* buffer,
+    const uint8_t* buffer_end, int fixed_len_size, int64_t num_values, int64_t 
stride,
+    InternalType* v) {
+  const uint8_t* buffer_pos = buffer;
+  StrideWriter<InternalType> out(v, stride);
+  for (int64_t i = 0; i < num_values; ++i) {
+    int encoded_len = Decode<InternalType, PARQUET_TYPE>(
+        buffer_pos, buffer_end, fixed_len_size, out.Advance());
+    if (UNLIKELY(encoded_len < 0)) return -1;
+    buffer_pos += encoded_len;
+  }
+  return buffer_pos - buffer;
+}
+
+template <typename T>
+inline int DecodeDecimalFixedLen(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, T* 
v) {
+  if (UNLIKELY(buffer_end - buffer < fixed_len_size)) return -1;
+  DecimalUtil::DecodeFromFixedLenByteArray(buffer, fixed_len_size, v);
+  return fixed_len_size;
+}
+
+template <>
+inline int ParquetPlainEncoder::
+Decode<bool, parquet::Type::BOOLEAN>(const uint8_t* buffer,
+    const uint8_t* buffer_end, int fixed_len_size, bool* v) {
+  DCHECK(false) << "Use ParquetBoolDecoder for decoding bools";
+  return -1;
+}
+
+template <>
+inline int ParquetPlainEncoder::
+Decode<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* 
buffer,
+    const uint8_t* buffer_end, int fixed_len_size, Decimal4Value* v) {
+  return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
+}
+
+template <>
+inline int ParquetPlainEncoder::
+Decode<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* 
buffer,
+    const uint8_t* buffer_end, int fixed_len_size, Decimal8Value* v) {
+  return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
+}
+
+template <>
+inline int ParquetPlainEncoder::
+Decode<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* 
buffer,
+    const uint8_t* buffer_end, int fixed_len_size, Decimal16Value* v) {
+  return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
+}
+
+/// Helper method to decode Decimal type stored as variable length byte array.
+template<typename T>
+inline int DecodeDecimalByteArray(const uint8_t* buffer, const uint8_t* 
buffer_end,
+    int fixed_len_size, T* v) {
+  if (UNLIKELY(buffer_end - buffer < sizeof(int32_t))) return -1;
+  int encoded_byte_size;
+  memcpy(&encoded_byte_size, buffer, sizeof(int32_t));
+  int byte_size = sizeof(int32_t) + encoded_byte_size;
+  if (UNLIKELY(encoded_byte_size < 0 || buffer_end - buffer < byte_size)) 
return -1;
+  uint8_t* val_ptr = const_cast<uint8_t*>(buffer) + sizeof(int32_t);
+  DecimalUtil::DecodeFromFixedLenByteArray(val_ptr, encoded_byte_size, v);
+  return byte_size;
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<Decimal4Value, 
parquet::Type::BYTE_ARRAY>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
+    Decimal4Value* v) {
+  return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v);
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<Decimal8Value, 
parquet::Type::BYTE_ARRAY>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
+    Decimal8Value* v) {
+  return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v);
+}
+
+template <>
+inline int ParquetPlainEncoder::Decode<Decimal16Value, 
parquet::Type::BYTE_ARRAY>(
+    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
+    Decimal16Value* v) {
+  return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v);
+}
+
+/// Helper class that contains the parameters needed for Timestamp decoding.
+/// Can be safely passed by value.
+class ParquetTimestampDecoder {
+public:
+  ParquetTimestampDecoder() {}
+
+  ParquetTimestampDecoder( const parquet::SchemaElement& e, const Timezone* 
timezone,
+      bool convert_int96_timestamps);
+
+  bool NeedsConversion() const { return timezone_ != nullptr; }
+
+  /// Decodes next PARQUET_TYPE from 'buffer', reading up to the byte before 
'buffer_end'
+  /// and converts it TimestampValue. 'buffer' need not be aligned.
+  template <parquet::Type::type PARQUET_TYPE>
+  int Decode(const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* 
v) const;
+
+  /// Batched version of Decode() that tries to decode 'num_values' values 
from the memory
+  /// range [buffer, buffer_end) and writes them to 'v' with a stride of 
'stride' bytes.
+  /// Returns the number of bytes read from 'buffer' or -1 if there was an 
error
+  /// decoding, e.g. invalid data or running out of input data before reading
+  /// 'num_values'.
+  template <parquet::Type::type PARQUET_TYPE>
+  int64_t DecodeBatch(const uint8_t* buffer, const uint8_t* buffer_end,
+      int64_t num_values, int64_t stride, TimestampValue* v);
+
+  TimestampValue Int64ToTimestampValue(int64_t unix_time) const {
+    DCHECK(precision_ == MILLI || precision_ == MICRO);
+    return precision_ == MILLI ? 
TimestampValue::UtcFromUnixTimeMillis(unix_time) :
+                                 
TimestampValue::UtcFromUnixTimeMicros(unix_time);
+  }
+
+  void ConvertToLocalTime(TimestampValue* v) const {
+    DCHECK(timezone_ != nullptr);
+    if (v->HasDateAndTime()) v->UtcToLocal(*timezone_);
+  }
+
+  /// Timezone conversion of min/max stats need some extra logic because 
UTC->local
+  /// conversion can change ordering near timezone rule changes. The max value 
is
+  /// increased and min value is decreased to avoid incorrectly dropping 
column chunks
+  /// (or pages once IMPALA-5843 is ready).
+
+  /// If timestamp t >= v before conversion, then this function converts v in 
such a
+  /// way that the same will be true after t is converted.
+  void ConvertMinStatToLocalTime(TimestampValue* v) const;
+
+  /// If timestamp t <= v before conversion, then this function converts v in 
such a
+  /// way that the same will be true after t is converted.
+  void ConvertMaxStatToLocalTime(TimestampValue* v) const;
+
+private:
+  enum Precision { MILLI, MICRO, NANO };
+
+  /// Timezone used for UTC->Local conversions. If nullptr, no conversion is 
needed.
+  const Timezone* timezone_ = nullptr;
+
+  /// Unit of the encoded timestamp. Used to decide between milli and 
microseconds during
+  /// INT64 decoding. INT64 with nanosecond precision (and reduced range) is 
also planned
+  /// to be implemented once it is added in Parquet (PARQUET-1387).
+  Precision precision_ = NANO;
+};
+
+template <>
+inline int ParquetTimestampDecoder::Decode<parquet::Type::INT64>(
+    const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* v) const 
{
+  DCHECK(precision_ == MILLI || precision_ == MICRO);
+  int64_t unix_time;
+  int bytes_read = ParquetPlainEncoder::Decode<int64_t, parquet::Type::INT64>(
+      buffer, buffer_end, 0, &unix_time);
+  if (UNLIKELY(bytes_read < 0)) {
+    return bytes_read;
+  }
+  *v = Int64ToTimestampValue(unix_time);
+  // TODO: It would be more efficient to do the timezone conversion in the 
same step
+  //       as the int64_t -> TimestampValue conversion. This would be also 
needed to
+  //       move conversion/validation to dictionary construction (IMPALA-4994) 
and to
+  //       implement dictionary filtering for TimestampValues.
+  return bytes_read;
+}
+
+template <>
+inline int ParquetTimestampDecoder::Decode<parquet::Type::INT96>(
+    const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* v) const 
{
+  DCHECK_EQ(precision_, NANO);
+  return ParquetPlainEncoder::Decode<TimestampValue, parquet::Type::INT96>(
+      buffer, buffer_end, 0, v);
+}
+
+template <parquet::Type::type PARQUET_TYPE>
+inline int64_t ParquetTimestampDecoder::DecodeBatch(const uint8_t* buffer,
+    const uint8_t* buffer_end, int64_t num_values, int64_t stride,
+    TimestampValue* v) {
+  const uint8_t* buffer_pos = buffer;
+  StrideWriter<TimestampValue> out(v, stride);
+  for (int64_t i = 0; i < num_values; ++i) {
+    int encoded_len = Decode<PARQUET_TYPE>(buffer_pos, buffer_end, 
out.Advance());
+    if (UNLIKELY(encoded_len < 0)) return -1;
+    buffer_pos += encoded_len;
+  }
+  return buffer_pos - buffer;
+}
+}
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet/parquet-level-decoder.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet/parquet-level-decoder.cc 
b/be/src/exec/parquet/parquet-level-decoder.cc
new file mode 100644
index 0000000..166230c
--- /dev/null
+++ b/be/src/exec/parquet/parquet-level-decoder.cc
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/parquet/parquet-level-decoder.h"
+
+#include "exec/read-write-util.h"
+#include "runtime/mem-pool.h"
+#include "runtime/mem-tracker.h"
+#include "util/bit-util.h"
+
+#include "common/names.h"
+
+using parquet::Encoding;
+
+namespace impala {
+
+const int16_t ParquetLevel::ROW_GROUP_END;
+const int16_t ParquetLevel::INVALID_LEVEL;
+const int16_t ParquetLevel::INVALID_POS;
+
+Status ParquetLevelDecoder::Init(const string& filename, Encoding::type 
encoding,
+    MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int* 
data_size) {
+  DCHECK(*data != nullptr);
+  DCHECK_GE(*data_size, 0);
+  DCHECK_GT(cache_size, 0);
+  cache_size = BitUtil::RoundUpToPowerOf2(cache_size, 32);
+  max_level_ = max_level;
+  filename_ = filename;
+  RETURN_IF_ERROR(InitCache(cache_pool, cache_size));
+
+  // Return because there is no level data to read, e.g., required field.
+  if (max_level == 0) return Status::OK();
+
+  int32_t num_bytes = 0;
+  switch (encoding) {
+    case Encoding::RLE: {
+      Status status;
+      if (!ReadWriteUtil::Read(data, data_size, &num_bytes, &status)) {
+        return status;
+      }
+      if (num_bytes < 0 || num_bytes > *data_size) {
+        return Status(TErrorCode::PARQUET_CORRUPT_RLE_BYTES, filename, 
num_bytes);
+      }
+      int bit_width = BitUtil::Log2Ceiling64(max_level + 1);
+      rle_decoder_.Reset(*data, num_bytes, bit_width);
+      break;
+    }
+    case parquet::Encoding::BIT_PACKED:
+      return Status(TErrorCode::PARQUET_BIT_PACKED_LEVELS, filename);
+    default: {
+      stringstream ss;
+      ss << "Unsupported encoding: " << encoding;
+      return Status(ss.str());
+    }
+  }
+  if (UNLIKELY(num_bytes < 0 || num_bytes > *data_size)) {
+    return Status(Substitute("Corrupt Parquet file '$0': $1 bytes of encoded 
levels but "
+                             "only $2 bytes left in page",
+        filename, num_bytes, *data_size));
+  }
+  *data += num_bytes;
+  *data_size -= num_bytes;
+  return Status::OK();
+}
+
+Status ParquetLevelDecoder::InitCache(MemPool* pool, int cache_size) {
+  num_cached_levels_ = 0;
+  cached_level_idx_ = 0;
+  // Memory has already been allocated.
+  if (cached_levels_ != nullptr) {
+    DCHECK_EQ(cache_size_, cache_size);
+    return Status::OK();
+  }
+
+  cached_levels_ = reinterpret_cast<uint8_t*>(pool->TryAllocate(cache_size));
+  if (cached_levels_ == nullptr) {
+    return pool->mem_tracker()->MemLimitExceeded(
+        nullptr, "Definition level cache", cache_size);
+  }
+  memset(cached_levels_, 0, cache_size);
+  cache_size_ = cache_size;
+  return Status::OK();
+}
+
+Status ParquetLevelDecoder::CacheNextBatch(int vals_remaining) {
+  /// Fill the cache completely if there are enough values remaining.
+  /// Otherwise don't try to read more values than are left.
+  int batch_size = min(vals_remaining, cache_size_);
+  if (max_level_ > 0) {
+    if (UNLIKELY(!FillCache(batch_size, &num_cached_levels_)
+            || num_cached_levels_ < batch_size)) {
+      return Status(decoding_error_code_, vals_remaining, filename_);
+    }
+  } else {
+    // No levels to read, e.g., because the field is required. The cache was
+    // already initialized with all zeros, so we can hand out those values.
+    DCHECK_EQ(max_level_, 0);
+    cached_level_idx_ = 0;
+    num_cached_levels_ = batch_size;
+  }
+  return Status::OK();
+}
+
+bool ParquetLevelDecoder::FillCache(int batch_size, int* num_cached_levels) {
+  DCHECK(!CacheHasNext());
+  DCHECK(num_cached_levels != nullptr);
+  DCHECK_GE(max_level_, 0);
+  DCHECK_GE(*num_cached_levels, 0);
+  cached_level_idx_ = 0;
+  if (max_level_ == 0) {
+    // No levels to read, e.g., because the field is required. The cache was
+    // already initialized with all zeros, so we can hand out those values.
+    *num_cached_levels = batch_size;
+    return true;
+  }
+  *num_cached_levels = rle_decoder_.GetValues(batch_size, cached_levels_);
+  return *num_cached_levels > 0;
+}
+
+} // namespace impala

Reply via email to