http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet-column-readers.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.h 
b/be/src/exec/parquet-column-readers.h
deleted file mode 100644
index d689aed..0000000
--- a/be/src/exec/parquet-column-readers.h
+++ /dev/null
@@ -1,607 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_PARQUET_COLUMN_READERS_H
-#define IMPALA_PARQUET_COLUMN_READERS_H
-
-#include <boost/scoped_ptr.hpp>
-
-#include "exec/hdfs-parquet-scanner.h"
-#include "util/codec.h"
-#include "util/bit-stream-utils.h"
-#include "util/dict-encoding.h"
-#include "util/rle-encoding.h"
-
-namespace impala {
-
-class Tuple;
-class MemPool;
-
-/// Decoder for all supported Parquet level encodings. Optionally reads, 
decodes, and
-/// caches level values in batches.
-/// Level values are unsigned 8-bit integers because we support a maximum 
nesting
-/// depth of 100, as enforced by the FE. Using a small type saves memory and 
speeds up
-/// populating the level cache (e.g., with RLE we can memset() repeated 
values).
-class ParquetLevelDecoder {
- public:
-  ParquetLevelDecoder(bool is_def_level_decoder)
-    : decoding_error_code_(is_def_level_decoder ? 
TErrorCode::PARQUET_DEF_LEVEL_ERROR :
-                                                  
TErrorCode::PARQUET_REP_LEVEL_ERROR) {}
-
-  /// Initialize the LevelDecoder. Reads and advances the provided data buffer 
if the
-  /// encoding requires reading metadata from the page header. 'cache_size' 
will be
-  /// rounded up to a multiple of 32 internally.
-  Status Init(const string& filename, parquet::Encoding::type encoding,
-      MemPool* cache_pool, int cache_size, int max_level, uint8_t** data, int* 
data_size);
-
-  /// Returns the next level or INVALID_LEVEL if there was an error. Not as 
efficient
-  /// as batched methods.
-  inline int16_t ReadLevel();
-
-  /// If the next value is part of a repeated run and is not cached, return 
the length
-  /// of the repeated run. A max level of 0 is treated as an arbitrarily long 
run of
-  /// zeroes, so this returns numeric_limits<int32_t>::max(). Otherwise return 
0.
-  int32_t NextRepeatedRunLength();
-
-  /// Get the value of the repeated run (if NextRepeatedRunLength() > 0) and 
consume
-  /// 'num_to_consume' items in the run. Not valid to call if there are cached 
levels
-  /// that have not been consumed.
-  uint8_t GetRepeatedValue(uint32_t num_to_consume);
-
-  /// Decodes and caches the next batch of levels given that there are 
'vals_remaining'
-  /// values left to decode in the page. Resets members associated with the 
cache.
-  /// Returns a non-ok status if there was a problem decoding a level, if a 
level was
-  /// encountered with a value greater than max_level_, or if fewer than
-  /// min(CacheSize(), vals_remaining) levels could be read, which indicates 
that the
-  /// input did not have the expected number of values. Only valid to call when
-  /// the cache has been exhausted, i.e. CacheHasNext() is false.
-  Status CacheNextBatch(int vals_remaining);
-
-  /// Functions for working with the level cache.
-  inline bool CacheHasNext() const { return cached_level_idx_ < 
num_cached_levels_; }
-  inline uint8_t CacheGetNext() {
-    DCHECK_LT(cached_level_idx_, num_cached_levels_);
-    return cached_levels_[cached_level_idx_++];
-  }
-  inline void CacheSkipLevels(int num_levels) {
-    DCHECK_LE(cached_level_idx_ + num_levels, num_cached_levels_);
-    cached_level_idx_ += num_levels;
-  }
-  inline int CacheSize() const { return num_cached_levels_; }
-  inline int CacheRemaining() const { return num_cached_levels_ - 
cached_level_idx_; }
-  inline int CacheCurrIdx() const { return cached_level_idx_; }
- private:
-  /// Initializes members associated with the level cache. Allocates memory for
-  /// the cache from pool, if necessary.
-  Status InitCache(MemPool* pool, int cache_size);
-
-  /// Decodes and writes a batch of levels into the cache. Returns true and 
sets
-  /// the number of values written to the cache via *num_cached_levels if no 
errors
-  /// are encountered. *num_cached_levels is < 'batch_size' in this case iff 
the
-  /// end of input was hit without any other errors. Returns false if there 
was an
-  /// error decoding a level or if there was an invalid level value greater 
than
-  /// 'max_level_'. Only valid to call when the cache has been exhausted, i.e.
-  /// CacheHasNext() is false.
-  bool FillCache(int batch_size, int* num_cached_levels);
-
-  /// RLE decoder, used if 'encoding_' is RLE and max_level_ > 0.
-  RleBatchDecoder<uint8_t> rle_decoder_;
-
-  /// Buffer for a batch of levels. The memory is allocated and owned by a 
pool passed
-  /// in Init().
-  uint8_t* cached_levels_ = nullptr;
-
-  /// Number of valid level values in the cache.
-  int num_cached_levels_ = 0;
-
-  /// Current index into cached_levels_.
-  int cached_level_idx_ = 0;
-
-  /// The parquet encoding used for the levels. Only RLE is supported for now.
-  parquet::Encoding::type encoding_ = parquet::Encoding::PLAIN;
-
-  /// For error checking and reporting.
-  int max_level_ = 0;
-
-  /// Number of level values cached_levels_ has memory allocated for. Always
-  /// a multiple of 32 to allow reading directly from 'bit_reader_' in batches.
-  int cache_size_ = 0;
-
-  /// Name of the parquet file. Used for reporting level decoding errors.
-  string filename_;
-
-  /// Error code to use when reporting level decoding errors.
-  TErrorCode::type decoding_error_code_;
-};
-
-/// Base class for reading a Parquet column. Reads a logical column, not 
necessarily a
-/// column materialized in the file (e.g. collections). The two subclasses are
-/// BaseScalarColumnReader and CollectionColumnReader. Column readers read one 
def and rep
-/// level pair at a time. The current def and rep level are exposed to the 
user, and the
-/// corresponding value (if defined) can optionally be copied into a slot via
-/// ReadValue(). Can also write position slots.
-///
-/// The constructor adds the object to the obj_pool of the parent 
HdfsParquetScanner.
-class ParquetColumnReader {
- public:
-  /// Creates a column reader for 'node' and associates it with the given 
parent scanner.
-  /// The constructor of column readers add the new object to the parent's 
object pool.
-  /// 'slot_desc' may be NULL, in which case the returned column reader can 
only be used
-  /// to read def/rep levels.
-  /// 'is_collection_field' should be set to true if the returned reader is 
reading a
-  /// collection. This cannot be determined purely by 'node' because a 
repeated scalar
-  /// node represents both an array and the array's items (in this case
-  /// 'is_collection_field' should be true if the reader reads one value per 
array, and
-  /// false if it reads one value per item).  The reader is added to the 
runtime state's
-  /// object pool. Does not create child readers for collection readers; these 
must be
-  /// added by the caller.
-  ///
-  /// It supports the following primitive type widening that does not have any 
loss of
-  /// precision.
-  /// - tinyint (INT32) -> smallint (INT32), int (INT32), bigint (INT64), 
double (DOUBLE)
-  /// - smallint (INT32) -> int (INT32), bigint (INT64), double (DOUBLE)
-  /// - int (INT32) -> bigint (INT64), double (DOUBLE)
-  /// - float (FLOAT) -> double (DOUBLE)
-  static ParquetColumnReader* Create(const SchemaNode& node, bool 
is_collection_field,
-      const SlotDescriptor* slot_desc, HdfsParquetScanner* parent);
-
-  static ParquetColumnReader* CreateTimestampColumnReader(const SchemaNode& 
node,
-      const SlotDescriptor* slot_desc, HdfsParquetScanner* parent);
-
-  virtual ~ParquetColumnReader() { }
-
-  int def_level() const { return def_level_; }
-  int rep_level() const { return rep_level_; }
-
-  const SlotDescriptor* slot_desc() const { return slot_desc_; }
-  const parquet::SchemaElement& schema_element() const { return 
*node_.element; }
-  int16_t max_def_level() const { return max_def_level_; }
-  int16_t max_rep_level() const { return max_rep_level_; }
-  int def_level_of_immediate_repeated_ancestor() const {
-    return node_.def_level_of_immediate_repeated_ancestor;
-  }
-  const SlotDescriptor* pos_slot_desc() const { return pos_slot_desc_; }
-  void set_pos_slot_desc(const SlotDescriptor* pos_slot_desc) {
-    DCHECK(pos_slot_desc_ == NULL);
-    pos_slot_desc_ = pos_slot_desc;
-  }
-
-  /// Returns true if this reader materializes collections (i.e. 
CollectionValues).
-  virtual bool IsCollectionReader() const { return false; }
-
-  const char* filename() const { return parent_->filename(); }
-
-  /// Read the current value (or null) into 'tuple' for this column. This 
should only be
-  /// called when a value is defined, i.e., def_level() >=
-  /// def_level_of_immediate_repeated_ancestor() (since empty or NULL 
collections produce
-  /// no output values), otherwise NextLevels() should be called instead.
-  ///
-  /// Advances this column reader to the next value (i.e. NextLevels() doesn't 
need to be
-  /// called after calling ReadValue()).
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
-  /// true.
-  ///
-  /// NextLevels() must be called on this reader before calling ReadValue() 
for the first
-  /// time. This is to initialize the current value that ReadValue() will read.
-  ///
-  /// TODO: this is the function that needs to be codegen'd (e.g. 
CodegenReadValue())
-  /// The codegened functions from all the materialized cols will then be 
combined
-  /// into one function.
-  /// TODO: another option is to materialize col by col for the entire row 
batch in
-  /// one call.  e.g. MaterializeCol would write out 1024 values.  Our row 
batches
-  /// are currently dense so we'll need to figure out something there.
-  virtual bool ReadValue(MemPool* pool, Tuple* tuple) = 0;
-
-  /// Same as ReadValue() but does not advance repetition level. Only valid 
for columns
-  /// not in collections.
-  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple) = 0;
-
-  /// Batched version of ReadValue() that reads up to max_values at once and 
materializes
-  /// them into tuples in tuple_mem. Returns the number of values actually 
materialized
-  /// in *num_values. The return value, error behavior and state changes are 
generally
-  /// the same as in ReadValue(). For example, if an error occurs in the 
middle of
-  /// materializing a batch then false is returned, and num_values, tuple_mem, 
as well as
-  /// this column reader are left in an undefined state, assuming that the 
caller will
-  /// immediately abort execution. NextLevels() does *not* need to be called 
before
-  /// ReadValueBatch(), unlike ReadValue().
-  virtual bool ReadValueBatch(MemPool* pool, int max_values, int tuple_size,
-      uint8_t* tuple_mem, int* num_values);
-
-  /// Batched version of ReadNonRepeatedValue() that reads up to max_values at 
once and
-  /// materializes them into tuples in tuple_mem.
-  /// The return value and error behavior are the same as in ReadValueBatch().
-  virtual bool ReadNonRepeatedValueBatch(MemPool* pool, int max_values, int 
tuple_size,
-      uint8_t* tuple_mem, int* num_values);
-
-  /// Advances this column reader's def and rep levels to the next logical 
value, i.e. to
-  /// the next scalar value or the beginning of the next collection, without 
attempting to
-  /// read the value. This is used to skip past def/rep levels that don't 
materialize a
-  /// value, such as the def/rep levels corresponding to an empty containing 
collection.
-  ///
-  /// NextLevels() must be called on this reader before calling ReadValue() 
for the first
-  /// time. This is to initialize the current value that ReadValue() will read.
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
-  /// true.
-  virtual bool NextLevels() = 0;
-
-  /// Writes pos_current_value_ (i.e. "reads" the synthetic position field of 
the
-  /// parent collection) to 'pos' and increments pos_current_value_. Only 
valid to
-  /// call when doing non-batched reading, i.e. NextLevels() must have been 
called
-  /// before each call to this function to advance to the next element in the
-  /// collection.
-  void ReadPositionNonBatched(int64_t* pos);
-
-  /// Returns true if this column reader has reached the end of the row group.
-  inline bool RowGroupAtEnd() {
-    DCHECK_EQ(rep_level_ == HdfsParquetScanner::ROW_GROUP_END,
-              def_level_ == HdfsParquetScanner::ROW_GROUP_END);
-    return rep_level_ == HdfsParquetScanner::ROW_GROUP_END;
-  }
-
-  /// If 'row_batch' is non-NULL, transfers the remaining resources backing 
tuples to it,
-  /// and frees up other resources. If 'row_batch' is NULL frees all resources 
instead.
-  virtual void Close(RowBatch* row_batch) = 0;
-
- protected:
-  HdfsParquetScanner* parent_;
-  const SchemaNode& node_;
-  const SlotDescriptor* const slot_desc_;
-
-  /// The slot descriptor for the position field of the tuple, if there is 
one. NULL if
-  /// there's not. Only one column reader for a given tuple desc will have 
this set.
-  const SlotDescriptor* pos_slot_desc_;
-
-  /// The next value to write into the position slot, if there is one. 64-bit 
int because
-  /// the pos slot is always a BIGINT Set to INVALID_POS when this column 
reader does not
-  /// have a current rep and def level (i.e. before the first NextLevels() 
call or after
-  /// the last value in the column has been read).
-  int64_t pos_current_value_;
-
-  /// The current repetition and definition levels of this reader. Advanced via
-  /// ReadValue() and NextLevels(). Set to INVALID_LEVEL before the first 
NextLevels()
-  /// call for a row group or if an error is encountered decoding a level. Set 
to
-  /// ROW_GROUP_END after the last value in the column has been read). If this 
is not
-  /// inside a collection, rep_level_ is always 0, INVALID_LEVEL or 
ROW_GROUP_END.
-  /// int16_t is large enough to hold the valid levels 0-255 and negative 
sentinel values
-  /// INVALID_LEVEL and ROW_GROUP_END. The maximum values are cached here 
because they
-  /// are accessed in inner loops.
-  int16_t rep_level_;
-  const int16_t max_rep_level_;
-  int16_t def_level_;
-  const int16_t max_def_level_;
-
-  // Cache frequently accessed members of slot_desc_ for perf.
-
-  /// slot_desc_->tuple_offset(). -1 if slot_desc_ is NULL.
-  const int tuple_offset_;
-
-  /// slot_desc_->null_indicator_offset(). Invalid if slot_desc_ is NULL.
-  const NullIndicatorOffset null_indicator_offset_;
-
-  ParquetColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
-      const SlotDescriptor* slot_desc)
-    : parent_(parent),
-      node_(node),
-      slot_desc_(slot_desc),
-      pos_slot_desc_(NULL),
-      pos_current_value_(HdfsParquetScanner::INVALID_POS),
-      rep_level_(HdfsParquetScanner::INVALID_LEVEL),
-      max_rep_level_(node_.max_rep_level),
-      def_level_(HdfsParquetScanner::INVALID_LEVEL),
-      max_def_level_(node_.max_def_level),
-      tuple_offset_(slot_desc == NULL ? -1 : slot_desc->tuple_offset()),
-      null_indicator_offset_(slot_desc == NULL ? NullIndicatorOffset() :
-          slot_desc->null_indicator_offset()) {
-    DCHECK(parent != nullptr);
-    parent->obj_pool_.Add(this);
-
-    DCHECK_GE(node_.max_rep_level, 0);
-    DCHECK_LE(node_.max_rep_level, std::numeric_limits<int16_t>::max());
-    DCHECK_GE(node_.max_def_level, 0);
-    DCHECK_LE(node_.max_def_level, std::numeric_limits<int16_t>::max());
-    // rep_level_ is always valid and equal to 0 if col not in collection.
-    if (max_rep_level() == 0) rep_level_ = 0;
-  }
-
-  /// Called in the middle of creating a scratch tuple batch to simulate 
failures
-  /// such as exceeding memory limit or cancellation. Returns false if the 
debug
-  /// action deems that the parquet column reader should halt execution. 
'val_count'
-  /// is the counter which the column reader uses to track the number of tuples
-  /// produced so far. If the column reader should halt execution, 
'parse_status_'
-  /// is updated with the error status and 'val_count' is set to 0.
-  bool ColReaderDebugAction(int* val_count);
-};
-
-/// Reader for a single column from the parquet file.  It's associated with a
-/// ScannerContext::Stream and is responsible for decoding the data.  Super 
class for
-/// per-type column readers. This contains most of the logic, the type 
specific functions
-/// must be implemented in the subclass.
-class BaseScalarColumnReader : public ParquetColumnReader {
- public:
-  BaseScalarColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
-      const SlotDescriptor* slot_desc)
-    : ParquetColumnReader(parent, node, slot_desc),
-      data_page_pool_(new MemPool(parent->scan_node_->mem_tracker())) {
-    DCHECK_GE(node_.col_idx, 0) << node_.DebugString();
-  }
-
-  virtual ~BaseScalarColumnReader() { }
-
-  /// Resets the reader for each row group in the file and creates the scan
-  /// range for the column, but does not start it. To start scanning,
-  /// set_io_reservation() must be called to assign reservation to this
-  /// column, followed by StartScan().
-  Status Reset(const HdfsFileDesc& file_desc, const parquet::ColumnChunk& 
col_chunk,
-    int row_group_idx);
-
-  /// Starts the column scan range. The reader must be Reset() and have a
-  /// reservation assigned via set_io_reservation(). This must be called
-  /// before any of the column data can be read (including dictionary and
-  /// data pages). Returns an error status if there was an error starting the
-  /// scan or allocating buffers for it.
-  Status StartScan();
-
-  /// Helper to start scans for multiple columns at once.
-  static Status StartScans(const std::vector<BaseScalarColumnReader*> readers) 
{
-    for (BaseScalarColumnReader* reader : readers) 
RETURN_IF_ERROR(reader->StartScan());
-    return Status::OK();
-  }
-
-  virtual void Close(RowBatch* row_batch) {
-    if (row_batch != nullptr && PageContainsTupleData(page_encoding_)) {
-      row_batch->tuple_data_pool()->AcquireData(data_page_pool_.get(), false);
-    } else {
-      data_page_pool_->FreeAll();
-    }
-    if (decompressor_ != nullptr) decompressor_->Close();
-    DictDecoderBase* dict_decoder = GetDictionaryDecoder();
-    if (dict_decoder != nullptr) dict_decoder->Close();
-  }
-
-  io::ScanRange* scan_range() const { return scan_range_; }
-  int64_t total_len() const { return metadata_->total_compressed_size; }
-  int col_idx() const { return node_.col_idx; }
-  THdfsCompression::type codec() const {
-    if (metadata_ == NULL) return THdfsCompression::NONE;
-    return ConvertParquetToImpalaCodec(metadata_->codec);
-  }
-  void set_io_reservation(int bytes) { io_reservation_ = bytes; }
-
-  /// Reads the next definition and repetition levels for this column. 
Initializes the
-  /// next data page if necessary.
-  virtual bool NextLevels() { return NextLevels<true>(); }
-
-  /// Check the data stream to see if there is a dictionary page. If there is,
-  /// use that page to initialize dict_decoder_ and advance the data stream
-  /// past the dictionary page.
-  Status InitDictionary();
-
-  /// Convenience function to initialize multiple dictionaries.
-  static Status InitDictionaries(const std::vector<BaseScalarColumnReader*> 
readers);
-
-  // Returns the dictionary or NULL if the dictionary doesn't exist
-  virtual DictDecoderBase* GetDictionaryDecoder() { return nullptr; }
-
-  // Returns whether the datatype for this column requires conversion from the 
on-disk
-  // format for correctness. For example, timestamps can require an offset to 
be
-  // applied.
-  virtual bool NeedsConversion() { return false; }
-
-  // Returns whether the datatype for this column requires validation. For 
example,
-  // the timestamp format has certain bit combinations that are invalid, and 
these
-  // need to be validated when read from disk.
-  virtual bool NeedsValidation() { return false; }
-
-  // TODO: Some encodings might benefit a lot from a SkipValues(int num_rows) 
if
-  // we know this row can be skipped. This could be very useful with stats and 
big
-  // sections can be skipped. Implement that when we can benefit from it.
-
- protected:
-  // Friend parent scanner so it can perform validation (e.g. 
ValidateEndOfRowGroup())
-  friend class HdfsParquetScanner;
-
-  // Class members that are accessed for every column should be included up 
here so they
-  // fit in as few cache lines as possible.
-
-  /// Pointer to start of next value in data page
-  uint8_t* data_ = nullptr;
-
-  /// End of the data page.
-  const uint8_t* data_end_ = nullptr;
-
-  /// Decoder for definition levels.
-  ParquetLevelDecoder def_levels_{true};
-
-  /// Decoder for repetition levels.
-  ParquetLevelDecoder rep_levels_{false};
-
-  /// Page encoding for values of the current data page. Cached here for perf. 
Set in
-  /// InitDataPage().
-  parquet::Encoding::type page_encoding_ = parquet::Encoding::PLAIN_DICTIONARY;
-
-  /// Num values remaining in the current data page
-  int num_buffered_values_ = 0;
-
-  // Less frequently used members that are not accessed in inner loop should 
go below
-  // here so they do not occupy precious cache line space.
-
-  /// The number of values seen so far. Updated per data page.
-  int64_t num_values_read_ = 0;
-
-  /// Metadata for the column for the current row group.
-  const parquet::ColumnMetaData* metadata_ = nullptr;
-
-  boost::scoped_ptr<Codec> decompressor_;
-
-  /// The scan range for the column's data. Initialized for each row group by 
Reset().
-  io::ScanRange* scan_range_ = nullptr;
-
-  // Stream used to read data from 'scan_range_'. Initialized by StartScan().
-  ScannerContext::Stream* stream_ = nullptr;
-
-  /// Reservation in bytes to use for I/O buffers in 'scan_range_'/'stream_'. 
Must be set
-  /// with set_io_reservation() before 'stream_' is initialized. Reset for 
each row group
-  /// by Reset().
-  int64_t io_reservation_ = 0;
-
-  /// Pool to allocate storage for data pages from - either decompression 
buffers for
-  /// compressed data pages or copies of the data page with var-len data to 
attach to
-  /// batches.
-  boost::scoped_ptr<MemPool> data_page_pool_;
-
-  /// Header for current data page.
-  parquet::PageHeader current_page_header_;
-
-  /// Reads the next page header into next_page_header/next_header_size.
-  /// If the stream reaches the end before reading a complete page header,
-  /// eos is set to true. If peek is false, the stream position is advanced
-  /// past the page header. If peek is true, the stream position is not moved.
-  /// Returns an error status if the next page header could not be read.
-  Status ReadPageHeader(bool peek, parquet::PageHeader* next_page_header,
-      uint32_t* next_header_size, bool* eos);
-
-  /// Read the next data page. If a dictionary page is encountered, that will 
be read and
-  /// this function will continue reading the next data page.
-  Status ReadDataPage();
-
-  /// Try to move the the next page and buffer more values. Return false and 
sets rep_level_,
-  /// def_level_ and pos_current_value_ to -1 if no more pages or an error 
encountered.
-  bool NextPage();
-
-  /// Implementation for NextLevels().
-  template <bool ADVANCE_REP_LEVEL>
-  bool NextLevels();
-
-  /// Creates a dictionary decoder from values/size. 'decoder' is set to point 
to a
-  /// dictionary decoder stored in this object. Subclass must implement this. 
Returns
-  /// an error status if the dictionary values could not be decoded 
successfully.
-  virtual Status CreateDictionaryDecoder(uint8_t* values, int size,
-      DictDecoderBase** decoder) = 0;
-
-  /// Return true if the column has a dictionary decoder. Subclass must 
implement this.
-  virtual bool HasDictionaryDecoder() = 0;
-
-  /// Clear the dictionary decoder so HasDictionaryDecoder() will return 
false. Subclass
-  /// must implement this.
-  virtual void ClearDictionaryDecoder() = 0;
-
-  /// Initializes the reader with the data contents. This is the content for 
the entire
-  /// decompressed data page. Decoders can initialize state from here. The 
caller must
-  /// validate the input such that 'size' is non-negative and that 'data' has 
at least
-  /// 'size' bytes remaining.
-  virtual Status InitDataPage(uint8_t* data, int size) = 0;
-
-  /// Allocate memory for the uncompressed contents of a data page of 'size' 
bytes from
-  /// 'data_page_pool_'. 'err_ctx' provides context for error messages. On 
success, 'buffer'
-  /// points to the allocated memory. Otherwise an error status is returned.
-  Status AllocateUncompressedDataPage(
-      int64_t size, const char* err_ctx, uint8_t** buffer);
-
-  /// Returns true if a data page for this column with the specified 
'encoding' may
-  /// contain strings referenced by returned batches. Cases where this is not 
true are:
-  /// * Dictionary-compressed pages, where any string data lives in 
'dictionary_pool_'.
-  /// * Fixed-length slots, where there is no string data.
-  bool PageContainsTupleData(parquet::Encoding::type page_encoding) {
-    return page_encoding != parquet::Encoding::PLAIN_DICTIONARY
-        && slot_desc_ != nullptr && slot_desc_->type().IsVarLenStringType();
-  }
-
-  /// Slow-path status construction code for def/rep decoding errors. 
'level_name' is
-  /// either "rep" or "def", 'decoded_level' is the value returned from
-  /// ParquetLevelDecoder::ReadLevel() and 'max_level' is the maximum allowed 
value.
-  void __attribute__((noinline)) SetLevelDecodeError(const char* level_name,
-      int decoded_level, int max_level);
-
-  // Returns a detailed error message about unsupported encoding.
-  Status GetUnsupportedDecodingError();
-};
-
-/// Collections are not materialized directly in parquet files; only scalar 
values appear
-/// in the file. CollectionColumnReader uses the definition and repetition 
levels of child
-/// column readers to figure out the boundaries of each collection in this 
column.
-class CollectionColumnReader : public ParquetColumnReader {
- public:
-  CollectionColumnReader(HdfsParquetScanner* parent, const SchemaNode& node,
-      const SlotDescriptor* slot_desc)
-    : ParquetColumnReader(parent, node, slot_desc) {
-    DCHECK(node_.is_repeated());
-    if (slot_desc != NULL) DCHECK(slot_desc->type().IsCollectionType());
-  }
-
-  virtual ~CollectionColumnReader() { }
-
-  vector<ParquetColumnReader*>* children() { return &children_; }
-
-  virtual bool IsCollectionReader() const { return true; }
-
-  /// The repetition level indicating that the current value is the first in a 
new
-  /// collection (meaning the last value read was the final item in the 
previous
-  /// collection).
-  int new_collection_rep_level() const { return max_rep_level() - 1; }
-
-  /// Materializes CollectionValue into tuple slot (if materializing) and 
advances to next
-  /// value.
-  virtual bool ReadValue(MemPool* pool, Tuple* tuple);
-
-  /// Same as ReadValue but does not advance repetition level. Only valid for 
columns not
-  /// in collections.
-  virtual bool ReadNonRepeatedValue(MemPool* pool, Tuple* tuple);
-
-  /// Advances all child readers to the beginning of the next collection and 
updates this
-  /// reader's state.
-  virtual bool NextLevels();
-
-  /// This is called once for each row group in the file.
-  void Reset() {
-    def_level_ = HdfsParquetScanner::INVALID_LEVEL;
-    rep_level_ = HdfsParquetScanner::INVALID_LEVEL;
-    pos_current_value_ = HdfsParquetScanner::INVALID_POS;
-  }
-
-  virtual void Close(RowBatch* row_batch) {
-    for (ParquetColumnReader* child_reader: children_) {
-      child_reader->Close(row_batch);
-    }
-  }
-
- private:
-  /// Column readers of fields contained within this collection. There is at 
least one
-  /// child reader per collection reader. Child readers either materialize 
slots in the
-  /// collection item tuples, or there is a single child reader that does not 
materialize
-  /// any slot and is only used by this reader to read def and rep levels.
-  vector<ParquetColumnReader*> children_;
-
-  /// Updates this reader's def_level_, rep_level_, and pos_current_value_ 
based on child
-  /// reader's state.
-  void UpdateDerivedState();
-
-  /// Recursively reads from children_ to assemble a single CollectionValue 
into
-  /// 'slot'. Also advances rep_level_ and def_level_ via NextLevels().
-  ///
-  /// Returns false if execution should be aborted for some reason, e.g. 
parse_error_ is
-  /// set, the query is cancelled, or the scan node limit was reached. 
Otherwise returns
-  /// true.
-  inline bool ReadSlot(CollectionValue* slot, MemPool* pool);
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet-column-stats.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.cc 
b/be/src/exec/parquet-column-stats.cc
deleted file mode 100644
index 478bba4..0000000
--- a/be/src/exec/parquet-column-stats.cc
+++ /dev/null
@@ -1,193 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet-column-stats.inline.h"
-
-#include <algorithm>
-#include <cmath>
-#include <limits>
-
-#include "common/names.h"
-
-namespace impala {
-
-bool ColumnStatsReader::ReadFromThrift(StatsField stats_field, void* slot) 
const {
-  if (!(col_chunk_.__isset.meta_data && 
col_chunk_.meta_data.__isset.statistics)) {
-    return false;
-  }
-  const parquet::Statistics& stats = col_chunk_.meta_data.statistics;
-
-  // Try to read the requested stats field. If it is not set, we may fall back 
to reading
-  // the old stats, based on the column type.
-  const string* stat_value = nullptr;
-  switch (stats_field) {
-    case StatsField::MIN:
-      if (stats.__isset.min_value && CanUseStats()) {
-        stat_value = &stats.min_value;
-        break;
-      }
-      if (stats.__isset.min && CanUseDeprecatedStats()) {
-        stat_value = &stats.min;
-      }
-      break;
-    case StatsField::MAX:
-      if (stats.__isset.max_value && CanUseStats()) {
-        stat_value = &stats.max_value;
-        break;
-      }
-      if (stats.__isset.max && CanUseDeprecatedStats()) {
-        stat_value = &stats.max;
-      }
-      break;
-    default:
-      DCHECK(false) << "Unsupported statistics field requested";
-  }
-  if (stat_value == nullptr) return false;
-
-  switch (col_type_.type) {
-    case TYPE_BOOLEAN:
-      return ColumnStats<bool>::DecodePlainValue(*stat_value, slot,
-          parquet::Type::BOOLEAN);
-    case TYPE_TINYINT: {
-      // parquet::Statistics encodes INT_8 values using 4 bytes.
-      int32_t col_stats;
-      bool ret = ColumnStats<int32_t>::DecodePlainValue(*stat_value, 
&col_stats,
-          parquet::Type::INT32);
-      if (!ret || col_stats < std::numeric_limits<int8_t>::min() ||
-          col_stats > std::numeric_limits<int8_t>::max()) {
-        return false;
-      }
-      *static_cast<int8_t*>(slot) = col_stats;
-      return true;
-    }
-    case TYPE_SMALLINT: {
-      // parquet::Statistics encodes INT_16 values using 4 bytes.
-      int32_t col_stats;
-      bool ret = ColumnStats<int32_t>::DecodePlainValue(*stat_value, 
&col_stats,
-          parquet::Type::INT32);
-      if (!ret || col_stats < std::numeric_limits<int16_t>::min() ||
-          col_stats > std::numeric_limits<int16_t>::max()) {
-        return false;
-      }
-      *static_cast<int16_t*>(slot) = col_stats;
-      return true;
-    }
-    case TYPE_INT:
-      return ColumnStats<int32_t>::DecodePlainValue(*stat_value, slot, 
element_.type);
-    case TYPE_BIGINT:
-      return ColumnStats<int64_t>::DecodePlainValue(*stat_value, slot, 
element_.type);
-    case TYPE_FLOAT:
-      // IMPALA-6527, IMPALA-6538: ignore min/max stats if NaN
-      return ColumnStats<float>::DecodePlainValue(*stat_value, slot, 
element_.type)
-          && !std::isnan(*reinterpret_cast<float*>(slot));
-    case TYPE_DOUBLE:
-      // IMPALA-6527, IMPALA-6538: ignore min/max stats if NaN
-      return ColumnStats<double>::DecodePlainValue(*stat_value, slot, 
element_.type)
-          && !std::isnan(*reinterpret_cast<double*>(slot));
-    case TYPE_TIMESTAMP:
-      return DecodeTimestamp(*stat_value, stats_field,
-          static_cast<TimestampValue*>(slot));
-    case TYPE_STRING:
-    case TYPE_VARCHAR:
-      return ColumnStats<StringValue>::DecodePlainValue(*stat_value, slot, 
element_.type);
-    case TYPE_CHAR:
-      /// We don't read statistics for CHAR columns, since CHAR support is 
broken in
-      /// Impala (IMPALA-1652).
-      return false;
-    case TYPE_DECIMAL:
-      switch (col_type_.GetByteSize()) {
-        case 4:
-          return ColumnStats<Decimal4Value>::DecodePlainValue(*stat_value, 
slot,
-              element_.type);
-        case 8:
-          return ColumnStats<Decimal8Value>::DecodePlainValue(*stat_value, 
slot,
-              element_.type);
-        case 16:
-          return ColumnStats<Decimal16Value>::DecodePlainValue(*stat_value, 
slot,
-              element_.type);
-        }
-      DCHECK(false) << "Unknown decimal byte size: " << 
col_type_.GetByteSize();
-    default:
-      DCHECK(false) << col_type_.DebugString();
-  }
-  return false;
-}
-
-bool ColumnStatsReader::DecodeTimestamp(const std::string& stat_value,
-    ColumnStatsReader::StatsField stats_field, TimestampValue* slot) const {
-  bool stats_read = false;
-  if (element_.type == parquet::Type::INT96) {
-    stats_read =
-        ColumnStats<TimestampValue>::DecodePlainValue(stat_value, slot, 
element_.type);
-  } else if (element_.type == parquet::Type::INT64) {
-    int64_t tmp;
-    stats_read = ColumnStats<int64_t>::DecodePlainValue(stat_value, &tmp, 
element_.type);
-    if (stats_read) *slot = timestamp_decoder_.Int64ToTimestampValue(tmp);
-  } else {
-    DCHECK(false) << element_.name;
-    return false;
-  }
-
-  if (stats_read && timestamp_decoder_.NeedsConversion()) {
-    if (stats_field == ColumnStatsReader::StatsField::MIN) {
-      timestamp_decoder_.ConvertMinStatToLocalTime(slot);
-    } else {
-      timestamp_decoder_.ConvertMaxStatToLocalTime(slot);
-    }
-  }
-  return stats_read && slot->HasDateAndTime();
-}
-
-bool ColumnStatsReader::ReadNullCountStat(int64_t* null_count) const {
-  if (!(col_chunk_.__isset.meta_data && 
col_chunk_.meta_data.__isset.statistics)) {
-    return false;
-  }
-  const parquet::Statistics& stats = col_chunk_.meta_data.statistics;
-  if (stats.__isset.null_count) {
-    *null_count = stats.null_count;
-    return true;
-  }
-  return false;
-}
-
-Status ColumnStatsBase::CopyToBuffer(StringBuffer* buffer, StringValue* value) 
{
-  if (value->ptr == buffer->buffer()) return Status::OK();
-  buffer->Clear();
-  RETURN_IF_ERROR(buffer->Append(value->ptr, value->len));
-  value->ptr = buffer->buffer();
-  return Status::OK();
-}
-
-bool ColumnStatsReader::CanUseStats() const {
-  // If column order is not set, only statistics for numeric types can be 
trusted.
-  if (col_order_ == nullptr) {
-    return col_type_.IsBooleanType() || col_type_.IsIntegerType()
-        || col_type_.IsFloatingPointType();
-  }
-  // Stats can be used if the column order is TypeDefinedOrder (see 
parquet.thrift).
-  return col_order_->__isset.TYPE_ORDER;
-}
-
-bool ColumnStatsReader::CanUseDeprecatedStats() const {
-  // If column order is set to something other than TypeDefinedOrder, we shall 
not use the
-  // stats (see parquet.thrift).
-  if (col_order_ != nullptr && !col_order_->__isset.TYPE_ORDER) return false;
-  return col_type_.IsBooleanType() || col_type_.IsIntegerType()
-      || col_type_.IsFloatingPointType();
-}
-
-}  // end ns impala

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet-column-stats.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.h 
b/be/src/exec/parquet-column-stats.h
deleted file mode 100644
index fc880f9..0000000
--- a/be/src/exec/parquet-column-stats.h
+++ /dev/null
@@ -1,299 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_EXEC_PARQUET_COLUMN_STATS_H
-#define IMPALA_EXEC_PARQUET_COLUMN_STATS_H
-
-#include <string>
-#include <type_traits>
-
-#include "exec/parquet-common.h"
-#include "runtime/decimal-value.h"
-#include "runtime/string-buffer.h"
-#include "runtime/timestamp-value.h"
-#include "runtime/types.h"
-
-#include "gen-cpp/parquet_types.h"
-
-namespace impala {
-
-/// This class, together with its derivatives, is used to update column 
statistics when
-/// writing parquet files. It provides an interface to populate a 
parquet::Statistics
-/// object and attach it to an object supplied by the caller. It can also be 
used to
-/// decode parquet::Statistics into slots.
-///
-/// We currently support writing the 'min_value' and 'max_value' fields in
-/// parquet::Statistics. The other two statistical values - 'null_count' and
-/// 'distinct_count' - are not tracked or populated. We do not populate the 
deprecated
-/// 'min' and 'max' fields.
-///
-/// Regarding the ordering of values, we follow the parquet-format 
specification for
-/// logical types (LogicalTypes.md in parquet-format):
-///
-/// - Numeric values (BOOLEAN, INT, FLOAT, DOUBLE, DECIMAL) are ordered by 
their numeric
-///   value (as opposed to their binary representation).
-///
-/// - Strings are ordered using bytewise, unsigned comparison.
-///
-/// - Timestamps are compared by numerically comparing the points in time they 
represent.
-///
-/// NULL values are not considered for min/max statistics, and if a column 
consists only
-/// of NULL values, then no min/max statistics are written.
-///
-/// Updating the statistics is handled in derived classes to alleviate the 
need for
-/// virtual function calls.
-///
-/// TODO: Populate null_count and distinct_count.
-class ColumnStatsBase {
- public:
-  /// min and max functions for types that are not floating point numbers
-  template <typename T, typename Enable = void>
-  struct MinMaxTrait {
-    static decltype(auto) MinValue(const T& a, const T& b) { return 
std::min(a, b); }
-    static decltype(auto) MaxValue(const T& a, const T& b) { return 
std::max(a, b); }
-    static int Compare(const T& a, const T& b) {
-      if (a < b) return -1;
-      if (a > b) return 1;
-      return 0;
-    }
-  };
-
-  /// min and max functions for floating point types
-  template <typename T>
-  struct MinMaxTrait<T, std::enable_if_t<std::is_floating_point<T>::value>> {
-    static decltype(auto) MinValue(const T& a, const T& b) { return 
std::fmin(a, b); }
-    static decltype(auto) MaxValue(const T& a, const T& b) { return 
std::fmax(a, b); }
-    static int Compare(const T& a, const T& b) {
-      //TODO: Should be aligned with PARQUET-1222, once resolved
-      if (a == b) return 0;
-      if (std::isnan(a) && std::isnan(b)) return 0;
-      if (MaxValue(a, b) == a) return 1;
-      return -1;
-    }
-  };
-
-  ColumnStatsBase() : has_min_max_values_(false), null_count_(0) {}
-  virtual ~ColumnStatsBase() {}
-
-  /// Merges this statistics object with values from 'other'. If other has not 
been
-  /// initialized, then this object will not be changed. It maintains internal 
state that
-  /// tracks whether the min/max values are ordered.
-  virtual void Merge(const ColumnStatsBase& other) = 0;
-
-  /// Copies the contents of this object's statistics values to internal 
buffers. Some
-  /// data types (e.g. StringValue) need to be copied at the end of processing 
a row
-  /// batch, since the batch memory will be released. Overwrite this method in 
derived
-  /// classes to provide the functionality.
-  virtual Status MaterializeStringValuesToInternalBuffers() WARN_UNUSED_RESULT 
{
-    return Status::OK();
-  }
-
-  /// Returns the number of bytes needed to encode the current statistics into 
a
-  /// parquet::Statistics object.
-  virtual int64_t BytesNeeded() const = 0;
-
-  /// Encodes the current values into a Statistics thrift message.
-  virtual void EncodeToThrift(parquet::Statistics* out) const = 0;
-
-  /// Resets the state of this object.
-  void Reset();
-
-  /// Update the statistics by incrementing the null_count. It is called each 
time a null
-  /// value is appended to the column or the statistics are merged.
-  void IncrementNullCount(int64_t count) { null_count_ += count; }
-
-  /// Returns the boundary order of the pages. That is, whether the lists of 
min/max
-  /// elements inside the ColumnIndex are ordered and if so, in which 
direction.
-  /// If both 'ascending_boundary_order_' and 'descending_boundary_order_' is 
true,
-  /// it means all elements are equal, we choose ascending order in this case.
-  /// If only one flag is true, or both of them is false, then we return the 
identified
-  /// ordering, or unordered.
-  parquet::BoundaryOrder::type GetBoundaryOrder() const {
-    if (ascending_boundary_order_) return parquet::BoundaryOrder::ASCENDING;
-    if (descending_boundary_order_) return parquet::BoundaryOrder::DESCENDING;
-    return parquet::BoundaryOrder::UNORDERED;
-  }
-
- protected:
-  // Copies the memory of 'value' into 'buffer' and make 'value' point to 
'buffer'.
-  // 'buffer' is reset before making the copy.
-  static Status CopyToBuffer(StringBuffer* buffer, StringValue* value) 
WARN_UNUSED_RESULT;
-
-  /// Stores whether the min and max values of the current object have been 
initialized.
-  bool has_min_max_values_;
-
-  // Number of null values since the last call to Reset().
-  int64_t null_count_;
-
-  // If true, min/max values are ascending.
-  // We assume the values are ascending, so start with true and only make it 
false when
-  // we find a descending value. If not all values are equal, then at least 
one of
-  // 'ascending_boundary_order_' and 'descending_boundary_order_' will be 
false.
-  bool ascending_boundary_order_ = true;
-
-  // If true, min/max values are descending.
-  // See description of 'ascending_boundary_order_'.
-  bool descending_boundary_order_ = true;
-};
-
-/// This class contains behavior specific to our in-memory formats for 
different types.
-template <typename T>
-class ColumnStats : public ColumnStatsBase {
-  friend class ColumnStatsBase;
-  // We explicitly require types to be listed here in order to support column 
statistics.
-  // When adding a type here, users of this class need to ensure that the 
statistics
-  // follow the ordering semantics of parquet's min/max statistics for the new 
type.
-  // Details on how the values should be ordered can be found in the 
'parquet-format'
-  // project in 'parquet.thrift' and 'LogicalTypes.md'.
-  using value_type = typename std::enable_if<
-      std::is_arithmetic<T>::value
-        || std::is_same<bool, T>::value
-        || std::is_same<StringValue, T>::value
-        || std::is_same<TimestampValue, T>::value
-        || std::is_same<Decimal4Value, T>::value
-        || std::is_same<Decimal8Value, T>::value
-        || std::is_same<Decimal16Value, T>::value,
-      T>::type;
-
- public:
-  /// 'mem_pool' is used to materialize string values so that the user of this 
class can
-  /// free the memory of the original values.
-  /// 'plain_encoded_value_size' specifies the size of each encoded value in 
plain
-  /// encoding, -1 if the type is variable-length.
-  ColumnStats(MemPool* mem_pool, int plain_encoded_value_size)
-    : ColumnStatsBase(),
-      plain_encoded_value_size_(plain_encoded_value_size),
-      mem_pool_(mem_pool),
-      min_buffer_(mem_pool),
-      max_buffer_(mem_pool),
-      prev_page_min_buffer_(mem_pool),
-      prev_page_max_buffer_(mem_pool) {}
-
-  /// Updates the statistics based on the values min_value and max_value. If 
necessary,
-  /// initializes the statistics. It may keep a reference to either value until
-  /// MaterializeStringValuesToInternalBuffers() gets called.
-  void Update(const T& min_value, const T& max_value);
-
-  /// Wrapper to call the Update function which takes in the min_value and 
max_value.
-  void Update(const T& v) { Update(v, v); }
-
-  virtual void Merge(const ColumnStatsBase& other) override;
-  virtual Status MaterializeStringValuesToInternalBuffers() override {
-    return Status::OK();
-  }
-
-  virtual int64_t BytesNeeded() const override;
-  virtual void EncodeToThrift(parquet::Statistics* out) const override;
-
-  /// Decodes the plain encoded stats value from 'buffer' and writes the 
result into the
-  /// buffer pointed to by 'slot'. Returns true if decoding was successful, 
false
-  /// otherwise. For timestamps, an additional validation will be performed.
-  static bool DecodePlainValue(const std::string& buffer, void* slot,
-      parquet::Type::type parquet_type);
-
- protected:
-  /// Encodes a single value using parquet's plain encoding and stores it into 
the binary
-  /// string 'out'. String values are stored without additional encoding. 
'bytes_needed'
-  /// must be positive.
-  static void EncodePlainValue(const T& v, int64_t bytes_needed, std::string* 
out);
-
-  /// Returns the number of bytes needed to encode value 'v'.
-  int64_t BytesNeeded(const T& v) const;
-
-  // Size of each encoded value in plain encoding, -1 if the type is 
variable-length.
-  int plain_encoded_value_size_;
-
-  // Minimum value since the last call to Reset().
-  T min_value_;
-
-  // Maximum value since the last call to Reset().
-  T max_value_;
-
-  // Minimum value of the previous page. Need to store that to calculate 
boundary order.
-  T prev_page_min_value_;
-
-  // Maximum value of the previous page. Need to store that to calculate 
boundary order.
-  T prev_page_max_value_;
-
-  // Memory pool to allocate from when making copies of the statistics data.
-  MemPool* mem_pool_;
-
-  // Local buffers to copy statistics data into.
-  StringBuffer min_buffer_;
-  StringBuffer max_buffer_;
-  StringBuffer prev_page_min_buffer_;
-  StringBuffer prev_page_max_buffer_;
-};
-
-/// Class that handles the decoding of Parquet stats (min/max/null_count) for 
a given
-/// column chunk.
-class ColumnStatsReader {
-public:
-  /// Enum to select whether to read minimum or maximum statistics. Values do 
not
-  /// correspond to fields in parquet::Statistics, but instead select between 
retrieving
-  /// the minimum or maximum value.
-  enum class StatsField { MIN, MAX };
-
-  ColumnStatsReader(const parquet::ColumnChunk& col_chunk,  const ColumnType& 
col_type,
-      const parquet::ColumnOrder* col_order, const parquet::SchemaElement& 
element)
-  : col_chunk_(col_chunk),
-    col_type_(col_type),
-    col_order_(col_order),
-    element_(element) {}
-
-  /// Sets extra information that is only needed for decoding TIMESTAMP stats.
-  void SetTimestampDecoder(ParquetTimestampDecoder timestamp_decoder) {
-    timestamp_decoder_ = timestamp_decoder;
-  }
-
-  /// Decodes the parquet::Statistics from 'col_chunk_' and writes the value 
selected by
-  /// 'stats_field' into the buffer pointed to by 'slot', based on 
'col_type_'. Returns
-  /// true if reading statistics for columns of type 'col_type_' is supported 
and decoding
-  /// was successful, false otherwise.
-  bool ReadFromThrift(StatsField stats_field, void* slot) const;
-
-  // Gets the null_count statistics from the column chunk's metadata and 
returns
-  // it via an output parameter.
-  // Returns true if the null_count stats were read successfully, false 
otherwise.
-  bool ReadNullCountStat(int64_t* null_count) const;
-
-private:
-  /// Returns true if we support reading statistics stored in the fields 
'min_value' and
-  /// 'max_value' in parquet::Statistics for the type 'col_type_' and the 
column order
-  /// 'col_order_'. Otherwise, returns false. If 'col_order_' is nullptr, only 
primitive
-  /// numeric types are supported.
-  bool CanUseStats() const;
-
-  /// Returns true if we consider statistics stored in the deprecated fields 
'min' and
-  /// 'max' in parquet::Statistics to be correct for the type 'col_type_' and 
the column
-  /// order 'col_order_'. Otherwise, returns false.
-  bool CanUseDeprecatedStats() const;
-
-  /// Decodes 'stat_value' and does INT64->TimestampValue and timezone 
conversions if
-  /// necessary. Returns true if the decoding and conversions were successful.
-  bool DecodeTimestamp(const std::string& stat_value,
-      ColumnStatsReader::StatsField stats_field,
-      TimestampValue* slot) const;
-
-  const parquet::ColumnChunk& col_chunk_;
-  const ColumnType& col_type_;
-  const parquet::ColumnOrder* col_order_;
-  const parquet::SchemaElement& element_;
-  ParquetTimestampDecoder timestamp_decoder_;
-};
-} // end ns impala
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet-column-stats.inline.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-stats.inline.h 
b/be/src/exec/parquet-column-stats.inline.h
deleted file mode 100644
index 6e78b82..0000000
--- a/be/src/exec/parquet-column-stats.inline.h
+++ /dev/null
@@ -1,254 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_EXEC_PARQUET_COLUMN_STATS_INLINE_H
-#define IMPALA_EXEC_PARQUET_COLUMN_STATS_INLINE_H
-
-#include "exec/parquet-common.h"
-#include "gen-cpp/parquet_types.h"
-#include "parquet-column-stats.h"
-#include "runtime/string-value.inline.h"
-
-namespace impala {
-
-inline void ColumnStatsBase::Reset() {
-  has_min_max_values_ = false;
-  null_count_ = 0;
-  ascending_boundary_order_ = true;
-  descending_boundary_order_ = true;
-}
-
-template <typename T>
-inline void ColumnStats<T>::Update(const T& min_value, const T& max_value) {
-  if (!has_min_max_values_) {
-    has_min_max_values_ = true;
-    min_value_ = min_value;
-    max_value_ = max_value;
-  } else {
-    min_value_ = MinMaxTrait<T>::MinValue(min_value_, min_value);
-    max_value_ = MinMaxTrait<T>::MaxValue(max_value_, max_value);
-  }
-}
-
-template <typename T>
-inline void ColumnStats<T>::Merge(const ColumnStatsBase& other) {
-  DCHECK(dynamic_cast<const ColumnStats<T>*>(&other));
-  const ColumnStats<T>* cs = static_cast<const ColumnStats<T>*>(&other);
-  if (cs->has_min_max_values_) {
-    if (has_min_max_values_) {
-      if (ascending_boundary_order_) {
-        if (MinMaxTrait<T>::Compare(prev_page_max_value_, cs->max_value_) > 0 
||
-            MinMaxTrait<T>::Compare(prev_page_min_value_, cs->min_value_) > 0) 
{
-          ascending_boundary_order_ = false;
-        }
-      }
-      if (descending_boundary_order_) {
-        if (MinMaxTrait<T>::Compare(prev_page_max_value_, cs->max_value_) < 0 
||
-            MinMaxTrait<T>::Compare(prev_page_min_value_, cs->min_value_) < 0) 
{
-          descending_boundary_order_ = false;
-        }
-      }
-    }
-    Update(cs->min_value_, cs->max_value_);
-    prev_page_min_value_ = cs->min_value_;
-    prev_page_max_value_ = cs->max_value_;
-  }
-  IncrementNullCount(cs->null_count_);
-}
-
-template <typename T>
-inline int64_t ColumnStats<T>::BytesNeeded() const {
-  return BytesNeeded(min_value_) + BytesNeeded(max_value_)
-      + ParquetPlainEncoder::ByteSize(null_count_);
-}
-
-template <typename T>
-inline void ColumnStats<T>::EncodeToThrift(parquet::Statistics* out) const {
-  if (has_min_max_values_) {
-    std::string min_str;
-    EncodePlainValue(min_value_, BytesNeeded(min_value_), &min_str);
-    out->__set_min_value(move(min_str));
-    std::string max_str;
-    EncodePlainValue(max_value_, BytesNeeded(max_value_), &max_str);
-    out->__set_max_value(move(max_str));
-  }
-  out->__set_null_count(null_count_);
-}
-
-template <typename T>
-inline void ColumnStats<T>::EncodePlainValue(
-    const T& v, int64_t bytes_needed, std::string* out) {
-  DCHECK_GT(bytes_needed, 0);
-  out->resize(bytes_needed);
-  const int64_t bytes_written = ParquetPlainEncoder::Encode(
-      v, bytes_needed, reinterpret_cast<uint8_t*>(&(*out)[0]));
-  DCHECK_EQ(bytes_needed, bytes_written);
-}
-
-template <typename T>
-inline bool ColumnStats<T>::DecodePlainValue(const std::string& buffer, void* 
slot,
-    parquet::Type::type parquet_type) {
-  T* result = reinterpret_cast<T*>(slot);
-  int size = buffer.size();
-  const uint8_t* data = reinterpret_cast<const uint8_t*>(buffer.data());
-  if (ParquetPlainEncoder::DecodeByParquetType<T>(data, data + size, size, 
result,
-      parquet_type) == -1) {
-    return false;
-  }
-  return true;
-}
-
-template <typename T>
-inline int64_t ColumnStats<T>::BytesNeeded(const T& v) const {
-  return plain_encoded_value_size_ < 0 ? ParquetPlainEncoder::ByteSize<T>(v) :
-      plain_encoded_value_size_;
-}
-
-/// Plain encoding for Boolean values is not handled by the 
ParquetPlainEncoder and thus
-/// needs special handling here.
-template <>
-inline void ColumnStats<bool>::EncodePlainValue(
-    const bool& v, int64_t bytes_needed, std::string* out) {
-  char c = v;
-  out->assign(1, c);
-}
-
-template <>
-inline bool ColumnStats<bool>::DecodePlainValue(const std::string& buffer, 
void* slot,
-    parquet::Type::type parquet_type) {
-  bool* result = reinterpret_cast<bool*>(slot);
-  DCHECK(buffer.size() == 1);
-  *result = (buffer[0] != 0);
-  return true;
-}
-
-template <>
-inline int64_t ColumnStats<bool>::BytesNeeded(const bool& v) const {
-  return 1;
-}
-
-/// Timestamp values need validation.
-template <>
-inline bool ColumnStats<TimestampValue>::DecodePlainValue(
-    const std::string& buffer, void* slot, parquet::Type::type parquet_type) {
-  TimestampValue* result = reinterpret_cast<TimestampValue*>(slot);
-  int size = buffer.size();
-  const uint8_t* data = reinterpret_cast<const uint8_t*>(buffer.data());
-  if (parquet_type == parquet::Type::INT96) {
-    if (ParquetPlainEncoder::Decode<TimestampValue, parquet::Type::INT96>(data,
-        data + size, size, result) == -1) {
-      return false;
-    }
-  } else {
-    DCHECK(false);
-    return false;
-  }
-
-  // We don't need to convert the value here, because it is done by the caller.
-  // If this function were not static, then it would be possible to store the 
information
-  // needed for timezone conversion in the object and do the conversion here.
-  return TimestampValue::IsValidDate(result->date());
-}
-
-/// parquet::Statistics stores string values directly and does not use plain 
encoding.
-template <>
-inline void ColumnStats<StringValue>::EncodePlainValue(
-    const StringValue& v, int64_t bytes_needed, string* out) {
-  out->assign(v.ptr, v.len);
-}
-
-template <>
-inline bool ColumnStats<StringValue>::DecodePlainValue(
-    const std::string& buffer, void* slot, parquet::Type::type parquet_type) {
-  StringValue* result = reinterpret_cast<StringValue*>(slot);
-  result->ptr = const_cast<char*>(buffer.data());
-  result->len = buffer.size();
-  return true;
-}
-
-template <>
-inline void ColumnStats<StringValue>::Update(
-    const StringValue& min_value, const StringValue& max_value) {
-  if (!has_min_max_values_) {
-    has_min_max_values_ = true;
-    min_value_ = min_value;
-    min_buffer_.Clear();
-    max_value_ = max_value;
-    max_buffer_.Clear();
-  } else {
-    if (min_value < min_value_) {
-      min_value_ = min_value;
-      min_buffer_.Clear();
-    }
-    if (max_value > max_value_) {
-      max_value_ = max_value;
-      max_buffer_.Clear();
-    }
-  }
-}
-
-template <>
-inline void ColumnStats<StringValue>::Merge(const ColumnStatsBase& other) {
-  DCHECK(dynamic_cast<const ColumnStats<StringValue>*>(&other));
-  const ColumnStats<StringValue>* cs = static_cast<
-      const ColumnStats<StringValue>*>(&other);
-  if (cs->has_min_max_values_) {
-    if (has_min_max_values_) {
-      // Make sure that we copied the previous page's min/max values to their 
own buffer.
-      DCHECK_NE(static_cast<void*>(prev_page_min_value_.ptr),
-                static_cast<void*>(cs->min_value_.ptr));
-      DCHECK_NE(static_cast<void*>(prev_page_max_value_.ptr),
-                static_cast<void*>(cs->max_value_.ptr));
-      if (ascending_boundary_order_) {
-        if (prev_page_max_value_ > cs->max_value_ ||
-            prev_page_min_value_ > cs->min_value_) {
-          ascending_boundary_order_ = false;
-        }
-      }
-      if (descending_boundary_order_) {
-        if (prev_page_max_value_ < cs->max_value_ ||
-            prev_page_min_value_ < cs->min_value_) {
-          descending_boundary_order_ = false;
-        }
-      }
-    }
-    Update(cs->min_value_, cs->max_value_);
-    prev_page_min_value_ = cs->min_value_;
-    prev_page_max_value_ = cs->max_value_;
-    prev_page_min_buffer_.Clear();
-    prev_page_max_buffer_.Clear();
-  }
-  IncrementNullCount(cs->null_count_);
-}
-
-// StringValues need to be copied at the end of processing a row batch, since 
the batch
-// memory will be released.
-template <>
-inline Status 
ColumnStats<StringValue>::MaterializeStringValuesToInternalBuffers() {
-  if (min_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&min_buffer_, 
&min_value_));
-  if (max_buffer_.IsEmpty()) RETURN_IF_ERROR(CopyToBuffer(&max_buffer_, 
&max_value_));
-  if (prev_page_min_buffer_.IsEmpty()) {
-    RETURN_IF_ERROR(CopyToBuffer(&prev_page_min_buffer_, 
&prev_page_min_value_));
-  }
-  if (prev_page_max_buffer_.IsEmpty()) {
-    RETURN_IF_ERROR(CopyToBuffer(&prev_page_max_buffer_, 
&prev_page_max_value_));
-  }
-  return Status::OK();
-}
-
-} // end ns impala
-#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet-common.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-common.cc b/be/src/exec/parquet-common.cc
deleted file mode 100644
index f967f9f..0000000
--- a/be/src/exec/parquet-common.cc
+++ /dev/null
@@ -1,132 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/parquet-common.h"
-
-namespace impala {
-
-/// Mapping of impala's internal types to parquet storage types. This is 
indexed by
-/// PrimitiveType enum
-const parquet::Type::type INTERNAL_TO_PARQUET_TYPES[] = {
-  parquet::Type::BOOLEAN,     // Invalid
-  parquet::Type::BOOLEAN,     // NULL type
-  parquet::Type::BOOLEAN,
-  parquet::Type::INT32,
-  parquet::Type::INT32,
-  parquet::Type::INT32,
-  parquet::Type::INT64,
-  parquet::Type::FLOAT,
-  parquet::Type::DOUBLE,
-  parquet::Type::INT96,       // Timestamp
-  parquet::Type::BYTE_ARRAY,  // String
-  parquet::Type::BYTE_ARRAY,  // Date, NYI
-  parquet::Type::BYTE_ARRAY,  // DateTime, NYI
-  parquet::Type::BYTE_ARRAY,  // Binary NYI
-  parquet::Type::FIXED_LEN_BYTE_ARRAY, // Decimal
-  parquet::Type::BYTE_ARRAY,  // VARCHAR(N)
-  parquet::Type::BYTE_ARRAY,  // CHAR(N)
-};
-
-const int INTERNAL_TO_PARQUET_TYPES_SIZE =
-  sizeof(INTERNAL_TO_PARQUET_TYPES) / sizeof(INTERNAL_TO_PARQUET_TYPES[0]);
-
-/// Mapping of Parquet codec enums to Impala enums
-const THdfsCompression::type PARQUET_TO_IMPALA_CODEC[] = {
-  THdfsCompression::NONE,
-  THdfsCompression::SNAPPY,
-  THdfsCompression::GZIP,
-  THdfsCompression::LZO
-};
-
-const int PARQUET_TO_IMPALA_CODEC_SIZE =
-    sizeof(PARQUET_TO_IMPALA_CODEC) / sizeof(PARQUET_TO_IMPALA_CODEC[0]);
-
-/// Mapping of Impala codec enums to Parquet enums
-const parquet::CompressionCodec::type IMPALA_TO_PARQUET_CODEC[] = {
-  parquet::CompressionCodec::UNCOMPRESSED,
-  parquet::CompressionCodec::SNAPPY,  // DEFAULT
-  parquet::CompressionCodec::GZIP,    // GZIP
-  parquet::CompressionCodec::GZIP,    // DEFLATE
-  parquet::CompressionCodec::SNAPPY,
-  parquet::CompressionCodec::SNAPPY,  // SNAPPY_BLOCKED
-  parquet::CompressionCodec::LZO,
-};
-
-const int IMPALA_TO_PARQUET_CODEC_SIZE =
-    sizeof(IMPALA_TO_PARQUET_CODEC) / sizeof(IMPALA_TO_PARQUET_CODEC[0]);
-
-parquet::Type::type ConvertInternalToParquetType(PrimitiveType type) {
-  DCHECK_GE(type, 0);
-  DCHECK_LT(type, INTERNAL_TO_PARQUET_TYPES_SIZE);
-  return INTERNAL_TO_PARQUET_TYPES[type];
-}
-
-THdfsCompression::type ConvertParquetToImpalaCodec(
-    parquet::CompressionCodec::type codec) {
-  DCHECK_GE(codec, 0);
-  DCHECK_LT(codec, PARQUET_TO_IMPALA_CODEC_SIZE);
-  return PARQUET_TO_IMPALA_CODEC[codec];
-}
-
-parquet::CompressionCodec::type ConvertImpalaToParquetCodec(
-    THdfsCompression::type codec) {
-  DCHECK_GE(codec, 0);
-  DCHECK_LT(codec, IMPALA_TO_PARQUET_CODEC_SIZE);
-  return IMPALA_TO_PARQUET_CODEC[codec];
-}
-
-ParquetTimestampDecoder::ParquetTimestampDecoder(const parquet::SchemaElement& 
e,
-    const Timezone* timezone, bool convert_int96_timestamps) {
-  bool needs_conversion = false;
-  if (e.__isset.logicalType) {
-    DCHECK(e.logicalType.__isset.TIMESTAMP);
-    needs_conversion = e.logicalType.TIMESTAMP.isAdjustedToUTC;
-    precision_ = e.logicalType.TIMESTAMP.unit.__isset.MILLIS
-        ? ParquetTimestampDecoder::MILLI : ParquetTimestampDecoder::MICRO;
-  } else {
-    if (e.__isset.converted_type) {
-      // Timestamp with converted type but without logical type are/were never 
written
-      // by Impala, so it is assumed that the writer is Parquet-mr and that 
timezone
-      // conversion is needed.
-      needs_conversion = true;
-      precision_ = e.converted_type == parquet::ConvertedType::TIMESTAMP_MILLIS
-          ? ParquetTimestampDecoder::MILLI : ParquetTimestampDecoder::MICRO;
-    } else {
-      // INT96 timestamps needs conversion depending on the writer.
-      needs_conversion = convert_int96_timestamps;
-      precision_ = ParquetTimestampDecoder::NANO;
-    }
-  }
-  if (needs_conversion) timezone_ = timezone;
-}
-
-void ParquetTimestampDecoder::ConvertMinStatToLocalTime(TimestampValue* v) 
const {
-  DCHECK(timezone_ != nullptr);
-  if (!v->HasDateAndTime()) return;
-  TimestampValue repeated_period_start;
-  v->UtcToLocal(*timezone_, &repeated_period_start);
-  if (repeated_period_start.HasDateAndTime()) *v = repeated_period_start;
-}
-
-void ParquetTimestampDecoder::ConvertMaxStatToLocalTime(TimestampValue* v) 
const {
-  DCHECK(timezone_ != nullptr);
-  if (!v->HasDateAndTime()) return;
-  TimestampValue repeated_period_end;
-  v->UtcToLocal(*timezone_, nullptr, &repeated_period_end);
-  if (repeated_period_end.HasDateAndTime()) *v = repeated_period_end;
-}
-}

http://git-wip-us.apache.org/repos/asf/impala/blob/07fd3320/be/src/exec/parquet-common.h
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-common.h b/be/src/exec/parquet-common.h
deleted file mode 100644
index 806db45..0000000
--- a/be/src/exec/parquet-common.h
+++ /dev/null
@@ -1,535 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-
-#ifndef IMPALA_EXEC_PARQUET_COMMON_H
-#define IMPALA_EXEC_PARQUET_COMMON_H
-
-#include "common/compiler-util.h"
-#include "gen-cpp/Descriptors_types.h"
-#include "gen-cpp/parquet_types.h"
-#include "runtime/decimal-value.h"
-#include "runtime/string-value.h"
-#include "runtime/timestamp-value.inline.h"
-#include "util/bit-util.h"
-#include "util/decimal-util.h"
-#include "util/mem-util.h"
-
-/// This file contains common elements between the parquet Writer and Scanner.
-namespace impala {
-
-const uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
-const uint32_t PARQUET_CURRENT_VERSION = 1;
-
-/// Return the Parquet type corresponding to Impala's internal type. The 
caller must
-/// validate that the type is valid, otherwise this will DCHECK.
-parquet::Type::type ConvertInternalToParquetType(PrimitiveType type);
-
-/// Return the Impala compression type for the given Parquet codec. The caller 
must
-/// validate that the codec is a supported one, otherwise this will DCHECK.
-THdfsCompression::type 
ConvertParquetToImpalaCodec(parquet::CompressionCodec::type codec);
-
-/// Return the Parquet code for the given Impala compression type. The caller 
must
-/// validate that the codec is a supported one, otherwise this will DCHECK.
-parquet::CompressionCodec::type ConvertImpalaToParquetCodec(
-    THdfsCompression::type codec);
-
-/// The plain encoding does not maintain any state so all these functions
-/// are static helpers.
-/// TODO: we are using templates to provide a generic interface (over the
-/// types) to avoid performance penalties. This makes the code more complex
-/// and should be removed when we have codegen support to inline virtual
-/// calls.
-class ParquetPlainEncoder {
- public:
-  /// Returns the byte size of 'v' where InternalType is the datatype that 
Impala uses
-  /// internally to store tuple data. Used in some template function 
implementations to
-  /// determine the encoded byte size for fixed-length types.
-  template <typename InternalType>
-  static int ByteSize(const InternalType& v) { return sizeof(InternalType); }
-
-  /// Returns the encoded size of values of type t. Returns -1 if it is 
variable
-  /// length. This can be different than the slot size of the types.
-  static int EncodedByteSize(const ColumnType& t) {
-    switch (t.type) {
-      case TYPE_STRING:
-      case TYPE_VARCHAR:
-      case TYPE_CHAR:
-        // CHAR is varlen here because we don't write the padding to the file
-        return -1;
-      case TYPE_TINYINT:
-      case TYPE_SMALLINT:
-      case TYPE_INT:
-      case TYPE_FLOAT:
-        return 4;
-      case TYPE_BIGINT:
-      case TYPE_DOUBLE:
-        return 8;
-      case TYPE_TIMESTAMP:
-        return 12;
-      case TYPE_DECIMAL:
-        return DecimalSize(t);
-      case TYPE_NULL:
-      case TYPE_BOOLEAN: // These types are not plain encoded.
-      default:
-        DCHECK(false);
-        return -1;
-    }
-  }
-
-  /// The minimum byte size to store decimals of with precision t.precision.
-  static int DecimalSize(const ColumnType& t) {
-    DCHECK(t.type == TYPE_DECIMAL);
-    // Numbers in the comment is the max positive value that can be represented
-    // with those number of bits (max negative is -(X + 1)).
-    // TODO: use closed form for this?
-    switch (t.precision) {
-      case 1: case 2:
-        return 1; // 127
-      case 3: case 4:
-        return 2; // 32,767
-      case 5: case 6:
-        return 3; // 8,388,607
-      case 7: case 8: case 9:
-        return 4; // 2,147,483,427
-      case 10: case 11:
-        return 5; // 549,755,813,887
-      case 12: case 13: case 14:
-        return 6; // 140,737,488,355,327
-      case 15: case 16:
-        return 7; // 36,028,797,018,963,967
-      case 17: case 18:
-        return 8; // 9,223,372,036,854,775,807
-      case 19: case 20: case 21:
-        return 9; // 2,361,183,241,434,822,606,847
-      case 22: case 23:
-        return 10; // 604,462,909,807,314,587,353,087
-      case 24: case 25: case 26:
-        return 11; // 154,742,504,910,672,534,362,390,527
-      case 27: case 28:
-        return 12; // 39,614,081,257,132,168,796,771,975,167
-      case 29: case 30: case 31:
-        return 13; // 10,141,204,801,825,835,211,973,625,643,007
-      case 32: case 33:
-        return 14; // 2,596,148,429,267,413,814,265,248,164,610,047
-      case 34: case 35:
-        return 15; // 664,613,997,892,457,936,451,903,530,140,172,287
-      case 36: case 37: case 38:
-        return 16; // 170,141,183,460,469,231,731,687,303,715,884,105,727
-      default:
-        DCHECK(false);
-        break;
-    }
-    return -1;
-  }
-
-  /// Encodes t into buffer. Returns the number of bytes added.  buffer must
-  /// be preallocated and big enough.  Buffer need not be aligned.
-  /// 'fixed_len_size' is only applicable for data encoded using 
FIXED_LEN_BYTE_ARRAY and
-  /// is the number of bytes the plain encoder should use.
-  template <typename InternalType>
-  static int Encode(const InternalType& t, int fixed_len_size, uint8_t* 
buffer) {
-    memcpy(buffer, &t, ByteSize(t));
-    return ByteSize(t);
-  }
-
-  template <typename InternalType>
-  static int DecodeByParquetType(const uint8_t* buffer, const uint8_t* 
buffer_end,
-      int fixed_len_size, InternalType* v, parquet::Type::type parquet_type) {
-    switch (parquet_type) {
-      case parquet::Type::BOOLEAN:
-        return ParquetPlainEncoder::Decode<InternalType, 
parquet::Type::BOOLEAN>(buffer,
-            buffer_end, fixed_len_size, v);
-      case parquet::Type::INT32:
-        return ParquetPlainEncoder::Decode<InternalType, 
parquet::Type::INT32>(buffer,
-            buffer_end, fixed_len_size, v);
-      case parquet::Type::INT64:
-        return ParquetPlainEncoder::Decode<InternalType, 
parquet::Type::INT64>(buffer,
-            buffer_end, fixed_len_size, v);
-      case parquet::Type::INT96:
-        return ParquetPlainEncoder::Decode<InternalType, 
parquet::Type::INT96>(buffer,
-            buffer_end, fixed_len_size, v);
-      case parquet::Type::FLOAT:
-        return ParquetPlainEncoder::Decode<InternalType, 
parquet::Type::FLOAT>(buffer,
-            buffer_end, fixed_len_size, v);
-      case parquet::Type::DOUBLE:
-        return ParquetPlainEncoder::Decode<InternalType, 
parquet::Type::DOUBLE>(buffer,
-            buffer_end, fixed_len_size, v);
-      case parquet::Type::BYTE_ARRAY:
-        return ParquetPlainEncoder::Decode<InternalType,
-            parquet::Type::BYTE_ARRAY>(buffer, buffer_end, fixed_len_size, v);
-      case parquet::Type::FIXED_LEN_BYTE_ARRAY:
-        return ParquetPlainEncoder::Decode<InternalType,
-            parquet::Type::FIXED_LEN_BYTE_ARRAY>(buffer, buffer_end, 
fixed_len_size, v);
-      default:
-        DCHECK(false) << "Unexpected physical type";
-        return -1;
-    }
-  }
-
-  /// Decodes t from 'buffer', reading up to the byte before 'buffer_end'. 
'buffer'
-  /// need not be aligned. If PARQUET_TYPE is FIXED_LEN_BYTE_ARRAY then 
'fixed_len_size'
-  /// is the size of the object. Otherwise, it is unused.
-  /// Returns the number of bytes read or -1 if the value was not decoded 
successfully.
-  /// This generic template function is used with the following types:
-  /// =============================
-  /// InternalType   | PARQUET_TYPE
-  /// =============================
-  /// int32_t        | INT32
-  /// int64_t        | INT64
-  /// float          | FLOAT
-  /// double         | DOUBLE
-  /// Decimal4Value  | INT32
-  /// Decimal8Value  | INT64
-  /// TimestampValue | INT96
-  template <typename InternalType, parquet::Type::type PARQUET_TYPE>
-  static int Decode(const uint8_t* buffer, const uint8_t* buffer_end, int 
fixed_len_size,
-      InternalType* v) {
-    int byte_size = ByteSize(*v);
-    if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
-    memcpy(v, buffer, byte_size);
-    return byte_size;
-  }
-
-  /// Batched version of Decode() that tries to decode 'num_values' values 
from the memory
-  /// range [buffer, buffer_end) and writes them to 'v' with a stride of 
'stride' bytes.
-  /// Returns the number of bytes read from 'buffer' or -1 if there was an 
error
-  /// decoding, e.g. invalid data or running out of input data before reading
-  /// 'num_values'.
-  template <typename InternalType, parquet::Type::type PARQUET_TYPE>
-  static int64_t DecodeBatch(const uint8_t* buffer, const uint8_t* buffer_end,
-      int fixed_len_size, int64_t num_values, int64_t stride, InternalType* v);
-};
-
-/// Calling this with arguments of type ColumnType is certainly a programmer 
error, so we
-/// disallow it.
-template <> int ParquetPlainEncoder::ByteSize(const ColumnType& t);
-
-/// Disable for bools. Plain encoding is not used for booleans.
-template <> int ParquetPlainEncoder::ByteSize(const bool& b);
-template <> int ParquetPlainEncoder::Encode(const bool&, int fixed_len_size, 
uint8_t*);
-template <> int ParquetPlainEncoder::Decode<bool, 
parquet::Type::BOOLEAN>(const uint8_t*,
-    const uint8_t*, int fixed_len_size, bool* v);
-
-template <>
-inline int ParquetPlainEncoder::ByteSize(const Decimal4Value&) {
-  // Only used when the decimal is stored as INT32.
-  return sizeof(Decimal4Value::StorageType);
-}
-template <>
-inline int ParquetPlainEncoder::ByteSize(const Decimal8Value&) {
-  // Only used when the decimal is stored as INT64.
-  return sizeof(Decimal8Value::StorageType);
-}
-template <>
-inline int ParquetPlainEncoder::ByteSize(const Decimal16Value&) {
-  // Not used, since such big decimals can only be stored as BYTE_ARRAY or
-  // FIXED_LEN_BYTE_ARRAY.
-  DCHECK(false);
-  return -1;
-}
-
-/// Parquet doesn't have 8-bit or 16-bit ints. They are converted to 32-bit.
-template <>
-inline int ParquetPlainEncoder::ByteSize(const int8_t& v) { return 
sizeof(int32_t); }
-template <>
-inline int ParquetPlainEncoder::ByteSize(const int16_t& v) { return 
sizeof(int32_t); }
-
-template <>
-inline int ParquetPlainEncoder::ByteSize(const StringValue& v) {
-  return sizeof(int32_t) + v.len;
-}
-
-template <>
-inline int ParquetPlainEncoder::ByteSize(const TimestampValue& v) {
-  return 12;
-}
-
-template <typename From, typename To>
-inline int DecodeWithConversion(const uint8_t* buffer, const uint8_t* 
buffer_end, To* v) {
-  int byte_size = sizeof(From);
-  if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
-  From dest;
-  memcpy(&dest, buffer, byte_size);
-  *v = dest;
-  return byte_size;
-}
-
-template <>
-inline int ParquetPlainEncoder::Decode<int64_t, parquet::Type::INT32>(
-    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, 
int64_t* v) {
-  return DecodeWithConversion<int32_t, int64_t>(buffer, buffer_end, v);
-}
-
-template <>
-inline int ParquetPlainEncoder::Decode<double, parquet::Type::INT32>(
-    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, 
double* v) {
-  return DecodeWithConversion<int32_t, double>(buffer, buffer_end, v);
-}
-
-template <>
-inline int ParquetPlainEncoder::Decode<double, parquet::Type::FLOAT>(
-    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, 
double* v) {
-  return DecodeWithConversion<float, double>(buffer, buffer_end, v);
-}
-
-template <>
-inline int ParquetPlainEncoder::Decode<int8_t, parquet::Type::INT32>(
-    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, 
int8_t* v) {
-  int byte_size = ByteSize(*v);
-  if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
-  *v = *buffer;
-  return byte_size;
-}
-template <>
-inline int ParquetPlainEncoder::Decode<int16_t, parquet::Type::INT32>(
-    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, 
int16_t* v) {
-  int byte_size = ByteSize(*v);
-  if (UNLIKELY(buffer_end - buffer < byte_size)) return -1;
-  memcpy(v, buffer, sizeof(int16_t));
-  return byte_size;
-}
-
-template<typename T>
-inline int EncodeToInt32(const T& v, int fixed_len_size, uint8_t* buffer) {
-  int32_t val = v;
-  memcpy(buffer, &val, sizeof(int32_t));
-  return ParquetPlainEncoder::ByteSize(v);
-}
-
-template <>
-inline int ParquetPlainEncoder::Encode(
-    const int8_t& v, int fixed_len_size, uint8_t* buffer) {
-  return EncodeToInt32(v, fixed_len_size, buffer);
-}
-
-template <>
-inline int ParquetPlainEncoder::Encode(
-    const int16_t& v, int fixed_len_size, uint8_t* buffer) {
-  return EncodeToInt32(v, fixed_len_size, buffer);
-}
-
-template <>
-inline int ParquetPlainEncoder::Encode(
-    const StringValue& v, int fixed_len_size, uint8_t* buffer) {
-  memcpy(buffer, &v.len, sizeof(int32_t));
-  memcpy(buffer + sizeof(int32_t), v.ptr, v.len);
-  return ByteSize(v);
-}
-
-template <>
-inline int ParquetPlainEncoder::Decode<StringValue, parquet::Type::BYTE_ARRAY>(
-    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
-    StringValue* v) {
-  if (UNLIKELY(buffer_end - buffer < sizeof(int32_t))) return -1;
-  memcpy(&v->len, buffer, sizeof(int32_t));
-  int byte_size = ByteSize(*v);
-  if (UNLIKELY(v->len < 0 || buffer_end - buffer < byte_size)) return -1;
-  v->ptr = reinterpret_cast<char*>(const_cast<uint8_t*>(buffer)) + 
sizeof(int32_t);
-  if (fixed_len_size > 0) v->len = std::min(v->len, fixed_len_size);
-  // we still read byte_size bytes, even if we truncate
-  return byte_size;
-}
-
-/// Write decimals as big endian (byte comparable) to benefit from common 
prefixes.
-/// fixed_len_size can be less than sizeof(Decimal*Value) for space savings. 
This means
-/// that the value in the in-memory format has leading zeros or negative 1's.
-/// For example, precision 2 fits in 1 byte. All decimals stored as 
Decimal4Value
-/// will have 3 bytes of leading zeros, we will only store the interesting 
byte.
-template<typename T>
-inline int EncodeDecimal(const T& v, int fixed_len_size, uint8_t* buffer) {
-  DecimalUtil::EncodeToFixedLenByteArray(buffer, fixed_len_size, v);
-  return fixed_len_size;
-}
-
-template <>
-inline int ParquetPlainEncoder::Encode(
-    const Decimal4Value& v, int fixed_len_size, uint8_t* buffer) {
-  return EncodeDecimal(v, fixed_len_size, buffer);
-}
-
-template <>
-inline int ParquetPlainEncoder::Encode(
-    const Decimal8Value& v, int fixed_len_size, uint8_t* buffer) {
-  return EncodeDecimal(v, fixed_len_size, buffer);
-}
-
-template <>
-inline int ParquetPlainEncoder::Encode(
-    const Decimal16Value& v, int fixed_len_size, uint8_t* buffer) {
-  return EncodeDecimal(v, fixed_len_size, buffer);
-}
-
-template <typename InternalType, parquet::Type::type PARQUET_TYPE>
-inline int64_t ParquetPlainEncoder::DecodeBatch(const uint8_t* buffer,
-    const uint8_t* buffer_end, int fixed_len_size, int64_t num_values, int64_t 
stride,
-    InternalType* v) {
-  const uint8_t* buffer_pos = buffer;
-  StrideWriter<InternalType> out(v, stride);
-  for (int64_t i = 0; i < num_values; ++i) {
-    int encoded_len = Decode<InternalType, PARQUET_TYPE>(
-        buffer_pos, buffer_end, fixed_len_size, out.Advance());
-    if (UNLIKELY(encoded_len < 0)) return -1;
-    buffer_pos += encoded_len;
-  }
-  return buffer_pos - buffer;
-}
-
-template <typename T>
-inline int DecodeDecimalFixedLen(
-    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size, T* 
v) {
-  if (UNLIKELY(buffer_end - buffer < fixed_len_size)) return -1;
-  DecimalUtil::DecodeFromFixedLenByteArray(buffer, fixed_len_size, v);
-  return fixed_len_size;
-}
-
-template <>
-inline int ParquetPlainEncoder::
-Decode<Decimal4Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* 
buffer,
-    const uint8_t* buffer_end, int fixed_len_size, Decimal4Value* v) {
-  return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
-}
-
-template <>
-inline int ParquetPlainEncoder::
-Decode<Decimal8Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* 
buffer,
-    const uint8_t* buffer_end, int fixed_len_size, Decimal8Value* v) {
-  return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
-}
-
-template <>
-inline int ParquetPlainEncoder::
-Decode<Decimal16Value, parquet::Type::FIXED_LEN_BYTE_ARRAY>(const uint8_t* 
buffer,
-    const uint8_t* buffer_end, int fixed_len_size, Decimal16Value* v) {
-  return DecodeDecimalFixedLen(buffer, buffer_end, fixed_len_size, v);
-}
-
-/// Helper method to decode Decimal type stored as variable length byte array.
-template<typename T>
-inline int DecodeDecimalByteArray(const uint8_t* buffer, const uint8_t* 
buffer_end,
-    int fixed_len_size, T* v) {
-  if (UNLIKELY(buffer_end - buffer < sizeof(int32_t))) return -1;
-  int encoded_byte_size;
-  memcpy(&encoded_byte_size, buffer, sizeof(int32_t));
-  int byte_size = sizeof(int32_t) + encoded_byte_size;
-  if (UNLIKELY(encoded_byte_size < 0 || buffer_end - buffer < byte_size)) 
return -1;
-  uint8_t* val_ptr = const_cast<uint8_t*>(buffer) + sizeof(int32_t);
-  DecimalUtil::DecodeFromFixedLenByteArray(val_ptr, encoded_byte_size, v);
-  return byte_size;
-}
-
-template <>
-inline int ParquetPlainEncoder::Decode<Decimal4Value, 
parquet::Type::BYTE_ARRAY>(
-    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
-    Decimal4Value* v) {
-  return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v);
-}
-
-template <>
-inline int ParquetPlainEncoder::Decode<Decimal8Value, 
parquet::Type::BYTE_ARRAY>(
-    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
-    Decimal8Value* v) {
-  return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v);
-}
-
-template <>
-inline int ParquetPlainEncoder::Decode<Decimal16Value, 
parquet::Type::BYTE_ARRAY>(
-    const uint8_t* buffer, const uint8_t* buffer_end, int fixed_len_size,
-    Decimal16Value* v) {
-  return DecodeDecimalByteArray(buffer, buffer_end, fixed_len_size, v);
-}
-
-/// Helper class that contains the parameters needed for Timestamp decoding.
-/// Can be safely passed by value.
-class ParquetTimestampDecoder {
-public:
-  ParquetTimestampDecoder() {}
-
-  ParquetTimestampDecoder( const parquet::SchemaElement& e, const Timezone* 
timezone,
-      bool convert_int96_timestamps);
-
-  bool NeedsConversion() const { return timezone_ != nullptr; }
-
-  /// Decodes next PARQUET_TYPE from 'buffer', reading up to the byte before 
'buffer_end'
-  /// and converts it TimestampValue. 'buffer' need not be aligned.
-  template <parquet::Type::type PARQUET_TYPE>
-  int Decode(const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* 
v) const;
-
-  TimestampValue Int64ToTimestampValue(int64_t unix_time) const {
-    DCHECK(precision_ == MILLI || precision_ == MICRO);
-    return precision_ == MILLI
-        ? TimestampValue::UtcFromUnixTimeMillis(unix_time)
-        : TimestampValue::UtcFromUnixTimeMicros(unix_time);
-  }
-
-  void ConvertToLocalTime(TimestampValue* v) const {
-    DCHECK(timezone_ != nullptr);
-    if (v->HasDateAndTime()) v->UtcToLocal(*timezone_);
-  }
-
-  /// Timezone conversion of min/max stats need some extra logic because 
UTC->local
-  /// conversion can change ordering near timezone rule changes. The max value 
is
-  /// increased and min value is decreased to avoid incorrectly dropping 
column chunks
-  /// (or pages once IMPALA-5843 is ready).
-
-  /// If timestamp t >= v before conversion, then this function converts v in 
such a
-  /// way that the same will be true after t is converted.
-  void ConvertMinStatToLocalTime(TimestampValue* v) const;
-
-  /// If timestamp t <= v before conversion, then this function converts v in 
such a
-  /// way that the same will be true after t is converted.
-  void ConvertMaxStatToLocalTime(TimestampValue* v) const;
-
-private:
-  enum Precision { MILLI, MICRO, NANO };
-
-  /// Timezone used for UTC->Local conversions. If nullptr, no conversion is 
needed.
-  const Timezone* timezone_ = nullptr;
-
-  /// Unit of the encoded timestamp. Used to decide between milli and 
microseconds during
-  /// INT64 decoding. INT64 with nanosecond precision (and reduced range) is 
also planned
-  /// to be implemented once it is added in Parquet (PARQUET-1387).
-  Precision precision_ = NANO;
-};
-
-template <>
-inline int ParquetTimestampDecoder::Decode<parquet::Type::INT64>(
-    const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* v) const 
{
-  DCHECK(precision_ == MILLI || precision_ == MICRO);
-  int64_t unix_time;
-  int bytes_read = ParquetPlainEncoder::Decode<int64_t, parquet::Type::INT64>(
-      buffer, buffer_end, 0, &unix_time);
-  if (UNLIKELY(bytes_read < 0)) {
-    return bytes_read;
-  }
-  *v = Int64ToTimestampValue(unix_time);
-  // TODO: It would be more efficient to do the timezone conversion in the 
same step
-  //       as the int64_t -> TimestampValue conversion. This would be also 
needed to
-  //       move conversion/validation to dictionary construction (IMPALA-4994) 
and to
-  //       implement dictionary filtering for TimestampValues.
-  return bytes_read;
-}
-
-template <>
-inline int ParquetTimestampDecoder::Decode<parquet::Type::INT96>(
-    const uint8_t* buffer, const uint8_t* buffer_end, TimestampValue* v) const 
{
-  DCHECK_EQ(precision_, NANO);
-  return ParquetPlainEncoder::Decode<TimestampValue, parquet::Type::INT96>(
-      buffer, buffer_end, 0, v);
-}
-}
-#endif

Reply via email to