ORC-178 Implement Basic C++ Writer and Writer Option

1. Add basic Writer and WriterOption
2. Add StructColumnWriter and IntegerColumnWriter. With them, we will be
able to write a complete ORC file that contains only int columns. To
limit the scope of this change, we will add more column writers later.
3. Add a base class for column statistics impl classes. This
is to be used by the base class of ColumnWriter so we don't have to
duplicate a bunch of logics everywhere.
4. Right now the UTs are pretty primative. We will add more UTs
(especially for stats and index) as we are adding more column writers.
At this moment, it's really hard to extract more UTs from our code base
without intorducing additional column writers.

Fixes #128

Signed-off-by: Owen O'Malley <omal...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/orc/repo
Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/b82e2f4f
Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/b82e2f4f
Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/b82e2f4f

Branch: refs/heads/master
Commit: b82e2f4f85afc3b0ca406afeb681e15e075de44c
Parents: 268fccb
Author: Xiening.Dai <xiening....@alibaba-inc.com>
Authored: Wed May 24 15:01:52 2017 -0700
Committer: Owen O'Malley <omal...@apache.org>
Committed: Tue Aug 8 10:37:53 2017 -0700

----------------------------------------------------------------------
 c++/include/orc/Common.hh        |  36 +++
 c++/include/orc/OrcFile.hh       |  12 +
 c++/include/orc/Reader.hh        |   6 +-
 c++/include/orc/Writer.hh        | 199 ++++++++++++
 c++/src/CMakeLists.txt           |   2 +
 c++/src/ColumnWriter.cc          | 495 ++++++++++++++++++++++++++++
 c++/src/ColumnWriter.hh          | 197 ++++++++++++
 c++/src/Common.cc                |   5 +
 c++/src/Compression.cc           |   7 +-
 c++/src/Compression.hh           |   9 +-
 c++/src/RLE.hh                   |  10 +-
 c++/src/Reader.cc                |  20 +-
 c++/src/Reader.hh                |   4 +-
 c++/src/Statistics.cc            |  65 ++--
 c++/src/Statistics.hh            | 276 +++++++++-------
 c++/src/Vector.cc                |   2 +-
 c++/src/Writer.cc                | 584 ++++++++++++++++++++++++++++++++++
 c++/src/io/OutputStream.cc       |   4 +
 c++/test/CMakeLists.txt          |   1 +
 c++/test/TestColumnStatistics.cc |  21 +-
 c++/test/TestWriter.cc           | 213 +++++++++++++
 tools/src/FileMetadata.cc        |   2 +-
 tools/test/TestMatch.cc          |   4 +-
 23 files changed, 1988 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/include/orc/Common.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/Common.hh b/c++/include/orc/Common.hh
index accc77d..afe075b 100644
--- a/c++/include/orc/Common.hh
+++ b/c++/include/orc/Common.hh
@@ -27,6 +27,42 @@
 #include <string>
 
 namespace orc {
+
+  class FileVersion {
+  private:
+    uint32_t majorVersion;
+    uint32_t minorVersion;
+  public:
+    FileVersion(uint32_t major, uint32_t minor) :
+                majorVersion(major), minorVersion(minor) {
+    }
+
+    /**
+     * Get major version
+     */
+    uint32_t getMajor() const {
+        return this->majorVersion;
+    }
+
+    /**
+     * Get minor version
+     */
+    uint32_t getMinor() const {
+        return this->minorVersion;
+    }
+
+    bool operator == (const FileVersion & right) const {
+      return this->majorVersion == right.getMajor() &&
+              this->minorVersion == right.getMinor();
+    }
+
+    bool operator != (const FileVersion & right) const {
+      return !(*this == right);
+    }
+
+    std::string toString() const;
+  };
+
   enum CompressionKind {
     CompressionKind_NONE = 0,
     CompressionKind_ZLIB = 1,

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/include/orc/OrcFile.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/OrcFile.hh b/c++/include/orc/OrcFile.hh
index 38e17e6..cb2f8e5 100644
--- a/c++/include/orc/OrcFile.hh
+++ b/c++/include/orc/OrcFile.hh
@@ -23,6 +23,7 @@
 
 #include "orc/orc-config.hh"
 #include "orc/Reader.hh"
+#include "orc/Writer.hh"
 
 /** /file orc/OrcFile.hh
     @brief The top level interface to ORC.
@@ -119,6 +120,17 @@ namespace orc {
    * @param path the name of the file in the local file system
    */
   ORC_UNIQUE_PTR<OutputStream> writeLocalFile(const std::string& path);
+
+  /**
+   * Create a writer to write the ORC file.
+   * @param type the type of data to be written
+   * @param stream the stream to write to
+   * @param options the options for writing the file
+   */
+  ORC_UNIQUE_PTR<Writer> createWriter(
+                                      const Type& type,
+                                      OutputStream* stream,
+                                      const WriterOptions& options);
 }
 
 #endif

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/include/orc/Reader.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/Reader.hh b/c++/include/orc/Reader.hh
index 077b4ce..a004e46 100644
--- a/c++/include/orc/Reader.hh
+++ b/c++/include/orc/Reader.hh
@@ -244,10 +244,10 @@ namespace orc {
 
     /**
      * Get the format version of the file. Currently known values are:
-     * "0.11" and "0.12"
-     * @return the version string
+     * 0.11 and 0.12
+     * @return the FileVersion object
      */
-    virtual std::string getFormatVersion() const = 0;
+    virtual FileVersion getFormatVersion() const = 0;
 
     /**
      * Get the number of rows in the file.

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/include/orc/Writer.hh
----------------------------------------------------------------------
diff --git a/c++/include/orc/Writer.hh b/c++/include/orc/Writer.hh
new file mode 100644
index 0000000..c91399a
--- /dev/null
+++ b/c++/include/orc/Writer.hh
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_WRITER_HH
+#define ORC_WRITER_HH
+
+#include "orc/Common.hh"
+#include "orc/orc-config.hh"
+#include "orc/Type.hh"
+#include "orc/Vector.hh"
+
+#include <memory>
+#include <string>
+#include <vector>
+
+namespace orc {
+
+  // classes that hold data members so we can maintain binary compatibility
+  struct WriterOptionsPrivate;
+
+  enum CompressionStrategy {
+    CompressionStrategy_SPEED = 0,
+    CompressionStrategy_COMPRESSION
+  };
+
+  class Timezone;
+
+  /**
+   * Options for creating a Writer.
+   */
+  class WriterOptions {
+  private:
+    ORC_UNIQUE_PTR<WriterOptionsPrivate> privateBits;
+
+  public:
+    WriterOptions();
+    WriterOptions(const WriterOptions&);
+    WriterOptions(WriterOptions&);
+    WriterOptions& operator=(const WriterOptions&);
+    virtual ~WriterOptions();
+
+    /**
+     * Set the strip size.
+     */
+    WriterOptions& setStripeSize(uint64_t size);
+
+    /**
+     * Get the strip size.
+     * @return if not set, return default value.
+     */
+    uint64_t getStripeSize() const;
+
+    /**
+     * Set the data compression block size.
+     */
+    WriterOptions& setCompressionBlockSize(uint64_t size);
+
+    /**
+     * Get the data compression block size.
+     * @return if not set, return default value.
+     */
+    uint64_t getCompressionBlockSize() const;
+
+    /**
+     * Set row index stride. Use value 0 to disable row index.
+     */
+    WriterOptions& setRowIndexStride(uint64_t stride);
+
+    /**
+     * Get the index stride size.
+     * @return if not set, return default value.
+     */
+    uint64_t getRowIndexStride() const;
+
+    /**
+     * Set the dictionary key size threshold.
+     * 0 to disable dictionary encoding.
+     * 1 to always enable dictionary encoding.
+     */
+    WriterOptions& setDictionaryKeySizeThreshold(double val);
+
+    /**
+     * Get the dictionary key size threshold.
+     */
+    double getDictionaryKeySizeThreshold() const;
+
+    /**
+     * Set Orc file version
+     */
+    WriterOptions& setFileVersion(const FileVersion& version);
+
+    /**
+     * Get Orc file version
+     */
+    FileVersion getFileVersion() const;
+
+    /**
+     * Set compression kind.
+     */
+    WriterOptions& setCompression(CompressionKind comp);
+
+    /**
+     * Get the compression kind.
+     * @return if not set, return default value which is ZLIB.
+     */
+    CompressionKind getCompression() const;
+
+    /**
+     * Set the compression strategy.
+     */
+    WriterOptions& setCompressionStrategy(CompressionStrategy strategy);
+
+    /**
+     * Get the compression strategy.
+     * @return if not set, return default value which is speed.
+     */
+    CompressionStrategy getCompressionStrategy() const;
+
+    /**
+     * Set the padding tolerance.
+     */
+    WriterOptions& setPaddingTolerance(double tolerance);
+
+    /**
+     * Get the padding tolerance.
+     * @return if not set, return default value which is zero.
+     */
+    double getPaddingTolerance() const;
+
+    /**
+     * Set the memory pool.
+     */
+    WriterOptions& setMemoryPool(MemoryPool * memoryPool);
+
+    /**
+     * Get the strip size.
+     * @return if not set, return default memory pool.
+     */
+    MemoryPool * getMemoryPool() const;
+
+    /**
+     * Set the error stream.
+     */
+    WriterOptions& setErrorStream(std::ostream& errStream);
+
+    /**
+     * Get the error stream.
+     * @return if not set, return std::err.
+     */
+    std::ostream * getErrorStream() const;
+
+    /**
+     * Get whether or not to write row group index
+     * @return if not set, the default is false
+     */
+    bool getEnableIndex() const;
+  };
+
+  class Writer {
+  public:
+    virtual ~Writer();
+
+    /**
+     * Create a row batch for writing the columns into this file.
+     * @param size the number of rows to read
+     * @return a new ColumnVectorBatch to write into
+     */
+    virtual ORC_UNIQUE_PTR<ColumnVectorBatch> createRowBatch(uint64_t size
+                                                             ) const = 0;
+
+    /**
+     * Add a row batch into current writer.
+     * @param rowsToAdd the row batch data to write.
+     */
+    virtual void add(ColumnVectorBatch& rowsToAdd) = 0;
+
+    /**
+     * Close the write and flush any pending data to the output stream.
+     */
+    virtual void close() = 0;
+  };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/src/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/c++/src/CMakeLists.txt b/c++/src/CMakeLists.txt
index 2427327..243efa6 100644
--- a/c++/src/CMakeLists.txt
+++ b/c++/src/CMakeLists.txt
@@ -141,6 +141,7 @@ add_library (orc STATIC
   ByteRLE.cc
   ColumnPrinter.cc
   ColumnReader.cc
+  ColumnWriter.cc
   Common.cc
   Compression.cc
   Exceptions.cc
@@ -157,6 +158,7 @@ add_library (orc STATIC
   Timezone.cc
   TypeImpl.cc
   Vector.cc
+  Writer.cc
   )
 
 install(TARGETS orc DESTINATION lib)

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/src/ColumnWriter.cc
----------------------------------------------------------------------
diff --git a/c++/src/ColumnWriter.cc b/c++/src/ColumnWriter.cc
new file mode 100644
index 0000000..ad18b0c
--- /dev/null
+++ b/c++/src/ColumnWriter.cc
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "orc/Int128.hh"
+#include "orc/Writer.hh"
+
+#include "ByteRLE.hh"
+#include "ColumnWriter.hh"
+#include "RLE.hh"
+#include "Statistics.hh"
+#include "Timezone.hh"
+
+namespace orc {
+  StreamsFactory::~StreamsFactory() {
+    //PASS
+  }
+
+  class StreamsFactoryImpl : public StreamsFactory {
+  public:
+    StreamsFactoryImpl(
+                       const WriterOptions& writerOptions,
+                       OutputStream* outputStream) :
+                       options(writerOptions),
+                       outStream(outputStream) {
+                       }
+
+    virtual std::unique_ptr<BufferedOutputStream>
+                    createStream(proto::Stream_Kind kind) const override;
+  private:
+    const WriterOptions& options;
+    OutputStream* outStream;
+  };
+
+  std::unique_ptr<BufferedOutputStream> StreamsFactoryImpl::createStream(
+                                                    proto::Stream_Kind) const {
+    // In the future, we can decide compression strategy and modifier
+    // based on stream kind. But for now we just use the setting from
+    // WriterOption
+    return createCompressor(
+                            options.getCompression(),
+                            outStream,
+                            options.getCompressionStrategy(),
+                            // BufferedOutputStream initial capacity
+                            1 * 1024 * 1024,
+                            options.getCompressionBlockSize(),
+                            *options.getMemoryPool());
+  }
+
+  std::unique_ptr<StreamsFactory> createStreamsFactory(
+                                        const WriterOptions& options,
+                                        OutputStream* outStream) {
+    return std::unique_ptr<StreamsFactory>(
+                                   new StreamsFactoryImpl(options, outStream));
+  }
+
+  RowIndexPositionRecorder::~RowIndexPositionRecorder() {
+    // PASS
+  }
+
+  ColumnWriter::ColumnWriter(
+                             const Type& type,
+                             const StreamsFactory& factory,
+                             const WriterOptions& options) :
+                                columnId(type.getColumnId()),
+                                colIndexStatistics(),
+                                colStripeStatistics(),
+                                colFileStatistics(),
+                                enableIndex(options.getEnableIndex()),
+                                rowIndex(),
+                                rowIndexEntry(),
+                                rowIndexPosition(),
+                                memPool(*options.getMemoryPool()),
+                                indexStream() {
+
+    std::unique_ptr<BufferedOutputStream> presentStream =
+        factory.createStream(proto::Stream_Kind_PRESENT);
+    notNullEncoder = createBooleanRleEncoder(std::move(presentStream));
+
+    colIndexStatistics = createColumnStatistics(type);
+    colStripeStatistics = createColumnStatistics(type);
+    colFileStatistics = createColumnStatistics(type);
+
+    if (enableIndex) {
+      rowIndex = std::unique_ptr<proto::RowIndex>(new proto::RowIndex());
+      rowIndexEntry =
+        std::unique_ptr<proto::RowIndexEntry>(new proto::RowIndexEntry());
+      rowIndexPosition = std::unique_ptr<RowIndexPositionRecorder>(
+                     new RowIndexPositionRecorder(*rowIndexEntry));
+      indexStream =
+        factory.createStream(proto::Stream_Kind_ROW_INDEX);
+    }
+  }
+
+  ColumnWriter::~ColumnWriter() {
+    // PASS
+  }
+
+  void ColumnWriter::add(ColumnVectorBatch& batch,
+                         uint64_t offset,
+                         uint64_t numValues) {
+    notNullEncoder->add(batch.notNull.data() + offset, numValues, nullptr);
+  }
+
+  void ColumnWriter::flush(std::vector<proto::Stream>& streams) {
+    proto::Stream stream;
+    stream.set_kind(proto::Stream_Kind_PRESENT);
+    stream.set_column(static_cast<uint32_t>(columnId));
+    stream.set_length(notNullEncoder->flush());
+    streams.push_back(stream);
+  }
+
+  uint64_t ColumnWriter::getEstimatedSize() const {
+    return notNullEncoder->getBufferSize();
+  }
+
+  void ColumnWriter::getStripeStatistics(
+    std::vector<proto::ColumnStatistics>& stats) const {
+    getProtoBufStatistics(stats, colStripeStatistics.get());
+  }
+
+  void ColumnWriter::mergeStripeStatsIntoFileStats() {
+    colFileStatistics->merge(*colStripeStatistics);
+    colStripeStatistics->reset();
+  }
+
+  void ColumnWriter::mergeRowGroupStatsIntoStripeStats() {
+    colStripeStatistics->merge(*colIndexStatistics);
+    colIndexStatistics->reset();
+  }
+
+  void ColumnWriter::getFileStatistics(
+    std::vector<proto::ColumnStatistics>& stats) const {
+    getProtoBufStatistics(stats, colFileStatistics.get());
+  }
+
+  void ColumnWriter::createRowIndexEntry() {
+    proto::ColumnStatistics *indexStats = rowIndexEntry->mutable_statistics();
+    colIndexStatistics->toProtoBuf(*indexStats);
+
+    *rowIndex->add_entry() = *rowIndexEntry;
+
+    rowIndexEntry->clear_positions();
+    rowIndexEntry->clear_statistics();
+
+    colStripeStatistics->merge(*colIndexStatistics);
+    colIndexStatistics->reset();
+
+    recordPosition();
+  }
+
+  void ColumnWriter::writeIndex(std::vector<proto::Stream> &streams) const {
+    // write row index to output stream
+    rowIndex->SerializeToZeroCopyStream(indexStream.get());
+
+    // construct row index stream
+    proto::Stream stream;
+    stream.set_kind(proto::Stream_Kind_ROW_INDEX);
+    stream.set_column(static_cast<uint32_t>(columnId));
+    stream.set_length(indexStream->flush());
+    streams.push_back(stream);
+  }
+
+  void ColumnWriter::recordPosition() const {
+    notNullEncoder->recordPosition(rowIndexPosition.get());
+  }
+
+  void ColumnWriter::resetIndex() {
+    // clear row index
+    rowIndex->clear_entry();
+    rowIndexEntry->clear_positions();
+    rowIndexEntry->clear_statistics();
+
+    // write current positions
+    recordPosition();
+  }
+
+  class StructColumnWriter : public ColumnWriter {
+  public:
+    StructColumnWriter(
+                       const Type& type,
+                       const StreamsFactory& factory,
+                       const WriterOptions& options);
+    ~StructColumnWriter();
+
+    virtual void add(ColumnVectorBatch& rowBatch,
+                     uint64_t offset,
+                     uint64_t numValues) override;
+
+    virtual void flush(std::vector<proto::Stream>& streams) override;
+
+    virtual uint64_t getEstimatedSize() const override;
+    virtual void getColumnEncoding(
+      std::vector<proto::ColumnEncoding>& encodings) const override;
+
+    virtual void getStripeStatistics(
+      std::vector<proto::ColumnStatistics>& stats) const override;
+
+    virtual void getFileStatistics(
+      std::vector<proto::ColumnStatistics>& stats) const override;
+
+    virtual void mergeStripeStatsIntoFileStats() override;
+
+    virtual void mergeRowGroupStatsIntoStripeStats() override;
+
+    virtual void createRowIndexEntry() override;
+
+    virtual void writeIndex(
+      std::vector<proto::Stream> &streams) const override;
+
+    virtual void resetIndex() override;
+
+  private:
+    std::vector<ColumnWriter *> children;
+  };
+
+  StructColumnWriter::StructColumnWriter(
+                                         const Type& type,
+                                         const StreamsFactory& factory,
+                                         const WriterOptions& options) :
+                                         ColumnWriter(type, factory, options) {
+    for(unsigned int i = 0; i < type.getSubtypeCount(); ++i) {
+      const Type& child = *type.getSubtype(i);
+      children.push_back(buildWriter(child, factory, options).release());
+    }
+
+    if (enableIndex) {
+      recordPosition();
+    }
+  }
+
+  StructColumnWriter::~StructColumnWriter() {
+    for (uint32_t i = 0; i < children.size(); ++i) {
+      delete children[i];
+    }
+  }
+
+  void StructColumnWriter::add(
+                              ColumnVectorBatch& rowBatch,
+                              uint64_t offset,
+                              uint64_t numValues) {
+    ColumnWriter::add(rowBatch, offset, numValues);
+
+    const StructVectorBatch & structBatch =
+          dynamic_cast<const StructVectorBatch &>(rowBatch);
+
+    for (uint32_t i = 0; i < children.size(); ++i) {
+      children[i]->add(*structBatch.fields[i], offset, numValues);
+    }
+
+    // update stats
+    bool hasNull = false;
+    if (!structBatch.hasNulls) {
+      colIndexStatistics->increase(numValues);
+    } else {
+      const char* notNull = structBatch.notNull.data() + offset;
+      for (uint64_t i = 0; i < numValues; ++i) {
+        if (notNull[i]) {
+          colIndexStatistics->increase(1);
+        } else if (!hasNull) {
+          hasNull = true;
+        }
+      }
+    }
+    colIndexStatistics->setHasNull(hasNull);
+  }
+
+  void StructColumnWriter::flush(std::vector<proto::Stream>& streams) {
+    ColumnWriter::flush(streams);
+    for (uint32_t i = 0; i < children.size(); ++i) {
+      children[i]->flush(streams);
+    }
+  }
+
+  void StructColumnWriter::writeIndex(
+                      std::vector<proto::Stream> &streams) const {
+    ColumnWriter::writeIndex(streams);
+    for (uint32_t i = 0; i < children.size(); ++i) {
+      children[i]->writeIndex(streams);
+    }
+  }
+
+  uint64_t StructColumnWriter::getEstimatedSize() const {
+    uint64_t size = ColumnWriter::getEstimatedSize();
+    for (uint32_t i = 0; i < children.size(); ++i) {
+      size += children[i]->getEstimatedSize();
+    }
+    return size;
+  }
+
+  void StructColumnWriter::getColumnEncoding(
+                      std::vector<proto::ColumnEncoding>& encodings) const {
+    proto::ColumnEncoding encoding;
+    encoding.set_kind(proto::ColumnEncoding_Kind_DIRECT);
+    encoding.set_dictionarysize(0);
+    encodings.push_back(encoding);
+    for (uint32_t i = 0; i < children.size(); ++i) {
+      children[i]->getColumnEncoding(encodings);
+    }
+  }
+
+  void StructColumnWriter::getStripeStatistics(
+    std::vector<proto::ColumnStatistics>& stats) const {
+    ColumnWriter::getStripeStatistics(stats);
+
+    for (uint32_t i = 0; i < children.size(); ++i) {
+      children[i]->getStripeStatistics(stats);
+    }
+  }
+
+  void StructColumnWriter::mergeStripeStatsIntoFileStats() {
+    ColumnWriter::mergeStripeStatsIntoFileStats();
+
+    for (uint32_t i = 0; i < children.size(); ++i) {
+      children[i]->mergeStripeStatsIntoFileStats();
+    }
+  }
+
+  void StructColumnWriter::getFileStatistics(
+    std::vector<proto::ColumnStatistics>& stats) const {
+    ColumnWriter::getFileStatistics(stats);
+
+    for (uint32_t i = 0; i < children.size(); ++i) {
+      children[i]->getFileStatistics(stats);
+    }
+  }
+
+  void StructColumnWriter::mergeRowGroupStatsIntoStripeStats()  {
+    ColumnWriter::mergeRowGroupStatsIntoStripeStats();
+
+    for (uint32_t i = 0; i < children.size(); ++i) {
+      children[i]->mergeRowGroupStatsIntoStripeStats();
+    }
+  }
+
+  void StructColumnWriter::createRowIndexEntry() {
+    ColumnWriter::createRowIndexEntry();
+
+    for (uint32_t i = 0; i < children.size(); ++i) {
+      children[i]->createRowIndexEntry();
+    }
+  }
+
+  void StructColumnWriter::resetIndex() {
+    ColumnWriter::resetIndex();
+
+    for (uint32_t i = 0; i < children.size(); ++i) {
+      children[i]->resetIndex();
+    }
+  }
+
+  class IntegerColumnWriter : public ColumnWriter {
+  public:
+    IntegerColumnWriter(
+                        const Type& type,
+                        const StreamsFactory& factory,
+                        const WriterOptions& options);
+
+    virtual void add(ColumnVectorBatch& rowBatch,
+                     uint64_t offset,
+                     uint64_t numValues) override;
+
+    virtual void flush(std::vector<proto::Stream>& streams) override;
+
+    virtual uint64_t getEstimatedSize() const override;
+
+    virtual void getColumnEncoding(
+              std::vector<proto::ColumnEncoding>& encodings) const override;
+
+    virtual void recordPosition() const override;
+
+  protected:
+    std::unique_ptr<RleEncoder> rleEncoder;
+
+  private:
+    RleVersion rleVersion;
+  };
+
+  IntegerColumnWriter::IntegerColumnWriter(
+                        const Type& type,
+                        const StreamsFactory& factory,
+                        const WriterOptions& options) :
+                          ColumnWriter(type, factory, options),
+                          rleVersion(RleVersion_1) {
+    std::unique_ptr<BufferedOutputStream> dataStream =
+      factory.createStream(proto::Stream_Kind_DATA);
+    rleEncoder = createRleEncoder(
+                                  std::move(dataStream),
+                                  true,
+                                  rleVersion,
+                                  memPool);
+
+    if (enableIndex) {
+      recordPosition();
+    }
+  }
+
+  void IntegerColumnWriter::add(
+                              ColumnVectorBatch& rowBatch,
+                              uint64_t offset,
+                              uint64_t numValues) {
+    ColumnWriter::add(rowBatch, offset, numValues);
+
+    const LongVectorBatch & longBatch =
+                    dynamic_cast<const LongVectorBatch &>(rowBatch);
+
+    const int64_t* data = longBatch.data.data() + offset;
+    const char* notNull = longBatch.hasNulls ?
+                          longBatch.notNull.data() + offset : nullptr;
+
+    rleEncoder->add(data, numValues, notNull);
+
+    // update stats
+    IntegerColumnStatisticsImpl* intStats =
+      dynamic_cast<IntegerColumnStatisticsImpl*>(colIndexStatistics.get());
+    for (uint64_t i = 0; i < numValues; ++i) {
+      if (notNull == nullptr || notNull[i]) {
+        intStats->increase(1);
+        intStats->update(data[i], 1);
+      } else if (!intStats->hasNull()) {
+        intStats->setHasNull(true);
+      }
+    }
+  }
+
+  void IntegerColumnWriter::flush(std::vector<proto::Stream>& streams) {
+    ColumnWriter::flush(streams);
+
+    proto::Stream stream;
+    stream.set_kind(proto::Stream_Kind_DATA);
+    stream.set_column(static_cast<uint32_t>(columnId));
+    stream.set_length(rleEncoder->flush());
+    streams.push_back(stream);
+  }
+
+  uint64_t IntegerColumnWriter::getEstimatedSize() const {
+    uint64_t size = ColumnWriter::getEstimatedSize();
+    size += rleEncoder->getBufferSize();
+    return size;
+  }
+
+  void IntegerColumnWriter::getColumnEncoding(
+                       std::vector<proto::ColumnEncoding>& encodings) const {
+    proto::ColumnEncoding encoding;
+    encoding.set_kind(rleVersion == RleVersion_1 ?
+                                proto::ColumnEncoding_Kind_DIRECT :
+                                proto::ColumnEncoding_Kind_DIRECT_V2);
+    encoding.set_dictionarysize(0);
+    encodings.push_back(encoding);
+  }
+
+  void IntegerColumnWriter::recordPosition() const {
+    ColumnWriter::recordPosition();
+    rleEncoder->recordPosition(rowIndexPosition.get());
+  }
+
+  std::unique_ptr<ColumnWriter> buildWriter(
+                                            const Type& type,
+                                            const StreamsFactory& factory,
+                                            const WriterOptions& options) {
+    switch (static_cast<int64_t>(type.getKind())) {
+      case STRUCT:
+        return std::unique_ptr<ColumnWriter>(
+          new StructColumnWriter(
+                                 type,
+                                 factory,
+                                 options));
+      case INT:
+      case LONG:
+      case SHORT:
+        return std::unique_ptr<ColumnWriter>(
+          new IntegerColumnWriter(
+                                  type,
+                                  factory,
+                                  options));
+      default:
+        throw NotImplementedYet("Type is not supported yet for creating "
+                                  "ColumnWriter.");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/src/ColumnWriter.hh
----------------------------------------------------------------------
diff --git a/c++/src/ColumnWriter.hh b/c++/src/ColumnWriter.hh
new file mode 100644
index 0000000..774f9b5
--- /dev/null
+++ b/c++/src/ColumnWriter.hh
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ORC_COLUMN_WRITER_HH
+#define ORC_COLUMN_WRITER_HH
+
+#include "orc/Vector.hh"
+
+#include "ByteRLE.hh"
+#include "Compression.hh"
+#include "Exceptions.hh"
+#include "Statistics.hh"
+
+#include "wrap/orc-proto-wrapper.hh"
+
+namespace orc {
+
+  class StreamsFactory {
+  public:
+    virtual ~StreamsFactory();
+
+    /**
+     * Get the stream for the given column/kind in this stripe.
+     * @param kind the kind of the stream
+     * @return the buffer output stream
+     */
+    virtual std::unique_ptr<BufferedOutputStream>
+                    createStream(proto::Stream_Kind kind) const = 0;
+  };
+
+  std::unique_ptr<StreamsFactory> createStreamsFactory(
+                                        const WriterOptions& options,
+                                        OutputStream * outStream);
+
+  /**
+   * record stream positions for row index
+   */
+  class RowIndexPositionRecorder : public PositionRecorder {
+  public:
+    virtual ~RowIndexPositionRecorder();
+
+    RowIndexPositionRecorder(proto::RowIndexEntry& entry):
+      rowIndexEntry(entry) {}
+
+    virtual void add(uint64_t pos) override {
+      rowIndexEntry.add_positions(pos);
+    }
+
+  private:
+    proto::RowIndexEntry& rowIndexEntry;
+  };
+
+  /**
+   * The interface for writing ORC data types.
+   */
+  class ColumnWriter {
+  protected:
+    std::unique_ptr<ByteRleEncoder> notNullEncoder;
+    uint64_t columnId;
+    std::unique_ptr<MutableColumnStatistics> colIndexStatistics;
+    std::unique_ptr<MutableColumnStatistics> colStripeStatistics;
+    std::unique_ptr<MutableColumnStatistics> colFileStatistics;
+
+    bool enableIndex;
+    // row index for this column, contains all RowIndexEntries in 1 stripe
+    std::unique_ptr<proto::RowIndex> rowIndex;
+    std::unique_ptr<proto::RowIndexEntry> rowIndexEntry;
+    std::unique_ptr<RowIndexPositionRecorder> rowIndexPosition;
+
+  public:
+    ColumnWriter(const Type& type, const StreamsFactory& factory,
+                 const WriterOptions& options);
+
+    virtual ~ColumnWriter();
+
+    /**
+     * Write the next group of values from this rowBatch.
+     * @param rowBatch the row batch data to write
+     * @param offset the starting point of row batch to write
+     * @param numValues the number of values to write
+     */
+    virtual void add(ColumnVectorBatch& rowBatch,
+                     uint64_t offset,
+                     uint64_t numValues);
+    /**
+     * Flush column writer output steams
+     * @param streams vector to store generated stream by flush()
+     */
+    virtual void flush(std::vector<proto::Stream>& streams);
+
+    /**
+     * Get estimated sized of buffer used
+     */
+    virtual uint64_t getEstimatedSize() const;
+
+    /**
+     * Get the encoding used by the writer for this column.
+     * ColumnEncoding info is pushed into the vector
+     */
+    virtual void getColumnEncoding(
+      std::vector<proto::ColumnEncoding>& encodings) const = 0;
+
+    /**
+     * Get the stripe statistics for this column
+     */
+    virtual void getStripeStatistics(
+      std::vector<proto::ColumnStatistics>& stats) const;
+
+    /**
+     * Get the file statistics for this column
+     */
+    virtual void getFileStatistics(
+      std::vector<proto::ColumnStatistics>& stats) const;
+
+    /**
+     * Merge index stats into stripe stats and reset index stats
+     */
+    virtual void mergeRowGroupStatsIntoStripeStats();
+
+    /**
+     * Merge stripe stats into file stats and reset stripe stats
+     */
+    virtual void mergeStripeStatsIntoFileStats();
+
+    /**
+     * Create a row index entry with the previous location and the current
+     * index statistics. Also merges the index statistics into the stripe
+     * statistics before they are cleared. Finally, it records the start of the
+     * next index and ensures all of the children columns also create an entry.
+     */
+    virtual void createRowIndexEntry();
+
+    /**
+     * Write row index streams for this column
+     * @param streams output list of ROW_INDEX streams
+     */
+    virtual void writeIndex(std::vector<proto::Stream> &streams) const;
+
+    /**
+     * Record positions for index
+     *
+     * This function is called by createRowIndexEntry() and ColumnWrtier's
+     * constructor. So base classes do not need to call inherited classes'
+     * recordPosition() function.
+     */
+    virtual void recordPosition() const;
+
+    /**
+     * Reset positions for index
+     */
+    virtual void resetIndex();
+
+  protected:
+    /**
+     * Utility function to translate ColumnStatistics into protobuf form and
+     * add it to output list
+     * @param statsList output list for protobuf stats
+     * @param stats ColumnStatistics to be transformed and added
+     */
+     void getProtoBufStatistics(
+                                std::vector<proto::ColumnStatistics>& 
statsList,
+                                const MutableColumnStatistics* stats) const {
+       proto::ColumnStatistics pbStats;
+       stats->toProtoBuf(pbStats);
+       statsList.push_back(pbStats);
+     }
+
+  protected:
+    MemoryPool& memPool;
+    std::unique_ptr<BufferedOutputStream> indexStream;
+  };
+
+  /**
+   * Create a writer for the given type.
+   */
+  std::unique_ptr<ColumnWriter> buildWriter(
+                                            const Type& type,
+                                            const StreamsFactory& factory,
+                                            const WriterOptions& options);
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/src/Common.cc
----------------------------------------------------------------------
diff --git a/c++/src/Common.cc b/c++/src/Common.cc
index 7813612..0542188 100644
--- a/c++/src/Common.cc
+++ b/c++/src/Common.cc
@@ -104,4 +104,9 @@ namespace orc {
     return buffer.str();
   }
 
+  std::string FileVersion::toString() const {
+    std::stringstream ss;
+    ss << getMajor() << '.' << getMinor();
+    return ss.str();
+  }
 }

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/src/Compression.cc
----------------------------------------------------------------------
diff --git a/c++/src/Compression.cc b/c++/src/Compression.cc
index fe61e31..a28bcf0 100644
--- a/c++/src/Compression.cc
+++ b/c++/src/Compression.cc
@@ -890,19 +890,20 @@ DIAGNOSTIC_POP
                       OutputStream * outStream,
                       CompressionStrategy strategy,
                       uint64_t bufferCapacity,
-                      uint64_t blockSize,
+                      uint64_t compressionBlockSize,
                       MemoryPool& pool) {
     switch (static_cast<int64_t>(kind)) {
     case CompressionKind_NONE: {
       return std::unique_ptr<BufferedOutputStream>
-        (new BufferedOutputStream(pool, outStream, bufferCapacity, blockSize));
+        (new BufferedOutputStream(
+                pool, outStream, bufferCapacity, compressionBlockSize));
     }
     case CompressionKind_ZLIB: {
       int level = (strategy == CompressionStrategy_SPEED) ?
               Z_BEST_SPEED + 1 : Z_DEFAULT_COMPRESSION;
       return std::unique_ptr<BufferedOutputStream>
         (new ZlibCompressionStream(
-                outStream, level, bufferCapacity, blockSize, pool));
+                outStream, level, bufferCapacity, compressionBlockSize, pool));
     }
     case CompressionKind_SNAPPY:
     case CompressionKind_LZO:

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/src/Compression.hh
----------------------------------------------------------------------
diff --git a/c++/src/Compression.hh b/c++/src/Compression.hh
index f557408..18f7cfd 100644
--- a/c++/src/Compression.hh
+++ b/c++/src/Compression.hh
@@ -24,11 +24,6 @@
 
 namespace orc {
 
-  enum CompressionStrategy {
-    CompressionStrategy_SPEED = 0,
-    CompressionStrategy_COMPRESSION
-  };
-
   /**
    * Create a decompressor for the given compression kind.
    * @param kind the compression type to implement
@@ -48,7 +43,7 @@ namespace orc {
    * @param outStream the output stream that is the underlying target
    * @param strategy compression strategy
    * @param bufferCapacity compression stream buffer total capacity
-   * @param blockSize compresssion buffer block size
+   * @param compressionBlockSize compresssion buffer block size
    * @param pool the memory pool
    */
   std::unique_ptr<BufferedOutputStream>
@@ -56,7 +51,7 @@ namespace orc {
                       OutputStream * outStream,
                       CompressionStrategy strategy,
                       uint64_t bufferCapacity,
-                      uint64_t blockSize,
+                      uint64_t compressionBlockSize,
                       MemoryPool& pool);
 }
 

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/src/RLE.hh
----------------------------------------------------------------------
diff --git a/c++/src/RLE.hh b/c++/src/RLE.hh
index 43f7aa7..b1d654c 100644
--- a/c++/src/RLE.hh
+++ b/c++/src/RLE.hh
@@ -26,6 +26,11 @@
 
 namespace orc {
 
+  enum RleVersion {
+    RleVersion_1,
+    RleVersion_2
+  };
+
   inline int64_t zigZag(int64_t value) {
     return (value << 1) ^ (value >> 63);
   }
@@ -92,11 +97,6 @@ namespace orc {
                       const char* notNull) = 0;
   };
 
-  enum RleVersion {
-    RleVersion_1,
-    RleVersion_2
-  };
-
   /**
    * Create an RLE encoder.
    * @param output the output stream to write to

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/src/Reader.cc
----------------------------------------------------------------------
diff --git a/c++/src/Reader.cc b/c++/src/Reader.cc
index 18a2910..113f759 100644
--- a/c++/src/Reader.cc
+++ b/c++/src/Reader.cc
@@ -404,15 +404,13 @@ namespace orc {
         contents->blockSize));
   }
 
-  std::string ReaderImpl::getFormatVersion() const {
-    std::stringstream result;
-    for(int i=0; i < contents->postscript->version_size(); ++i) {
-      if (i != 0) {
-        result << ".";
-      }
-      result << contents->postscript->version(i);
+  FileVersion ReaderImpl::getFormatVersion() const {
+    if (contents->postscript->version_size() != 2) {
+      throw std::logic_error("Unrecognized file version.");
     }
-    return result.str();
+    return FileVersion(
+                contents->postscript->version(0),
+                contents->postscript->version(1));
   }
 
   uint64_t ReaderImpl::getNumberOfRows() const {
@@ -593,12 +591,12 @@ namespace orc {
   }
 
   void ReaderImpl::checkOrcVersion() {
-    std::string version = getFormatVersion();
-    if (version != "0.11" && version != "0.12") {
+    FileVersion version = getFormatVersion();
+    if (version != FileVersion(0, 11) && version != FileVersion(0, 12)) {
       *(options.getErrorStream())
         << "Warning: ORC file " << contents->stream->getName()
         << " was written in an unknown format version "
-        << version << "\n";
+        << version.toString() << "\n";
     }
   }
 

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/src/Reader.hh
----------------------------------------------------------------------
diff --git a/c++/src/Reader.hh b/c++/src/Reader.hh
index 837d831..cd54cbb 100644
--- a/c++/src/Reader.hh
+++ b/c++/src/Reader.hh
@@ -178,7 +178,7 @@ namespace orc {
     std::shared_ptr<FileContents> contents;
 
     // inputs
-    const ReaderOptions& options;
+    const ReaderOptions options;
     const uint64_t fileLength;
     const uint64_t postscriptLength;
 
@@ -214,7 +214,7 @@ namespace orc {
 
     CompressionKind getCompression() const override;
 
-    std::string getFormatVersion() const override;
+    FileVersion getFormatVersion() const override;
 
     WriterVersion getWriterVersion() const override;
 

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/src/Statistics.cc
----------------------------------------------------------------------
diff --git a/c++/src/Statistics.cc b/c++/src/Statistics.cc
index 718275e..3bebe1d 100644
--- a/c++/src/Statistics.cc
+++ b/c++/src/Statistics.cc
@@ -50,13 +50,16 @@ namespace orc {
   StatisticsImpl::StatisticsImpl(const proto::StripeStatistics& stripeStats,
                                  const StatContext& statContext) {
     for(int i = 0; i < stripeStats.colstats_size(); i++) {
-      colStats.push_back(convertColumnStatistics(stripeStats.colstats(i), 
statContext));
+      colStats.push_back(
+                convertColumnStatistics(stripeStats.colstats(i), statContext));
     }
   }
 
-  StatisticsImpl::StatisticsImpl(const proto::Footer& footer, const 
StatContext& statContext) {
+  StatisticsImpl::StatisticsImpl(const proto::Footer& footer,
+                                 const StatContext& statContext) {
     for(int i = 0; i < footer.statistics_size(); i++) {
-      colStats.push_back(convertColumnStatistics(footer.statistics(i), 
statContext));
+      colStats.push_back(
+                convertColumnStatistics(footer.statistics(i), statContext));
     }
   }
 
@@ -80,9 +83,10 @@ namespace orc {
     // PASS
   }
 
-  StripeStatisticsImpl::StripeStatisticsImpl(const proto::StripeStatistics& 
stripeStats,
-                                 
std::vector<std::vector<proto::ColumnStatistics> >& indexStats,
-                                 const StatContext& statContext) {
+  StripeStatisticsImpl::StripeStatisticsImpl(
+                const proto::StripeStatistics& stripeStats,
+                std::vector<std::vector<proto::ColumnStatistics> >& indexStats,
+                const StatContext& statContext) {
     columnStats.reset(new StatisticsImpl(stripeStats, statContext));
     rowIndexStats.resize(indexStats.size());
     for(size_t i = 0; i < rowIndexStats.size(); i++) {
@@ -131,6 +135,10 @@ namespace orc {
     // PASS
   }
 
+  MutableColumnStatistics::~MutableColumnStatistics() {
+    // PASS
+  }
+
   ColumnStatisticsImpl::~ColumnStatisticsImpl() {
     // PASS
   }
@@ -300,9 +308,11 @@ namespace orc {
     }else{
       const proto::TimestampStatistics& stats = pb.timestampstatistics();
       _stats.setHasMinimum(
-                  stats.has_minimumutc() || (stats.has_minimum() && 
(statContext.writerTimezone != NULL)));
+                stats.has_minimumutc() ||
+                (stats.has_minimum() && (statContext.writerTimezone != NULL)));
       _stats.setHasMaximum(
-                  stats.has_maximumutc() || (stats.has_maximum() && 
(statContext.writerTimezone != NULL)));
+                stats.has_maximumutc() ||
+                (stats.has_maximum() && (statContext.writerTimezone != NULL)));
       _hasLowerBound = stats.has_minimumutc() || stats.has_minimum();
       _hasUpperBound = stats.has_maximumutc() || stats.has_maximum();
 
@@ -314,12 +324,16 @@ namespace orc {
       } else if (statContext.writerTimezone) {
         int64_t writerTimeSec = stats.minimum() / 1000;
         // multiply the offset by 1000 to convert to millisecond
-        int64_t minimum = stats.minimum() + 
(statContext.writerTimezone->getVariant(writerTimeSec).gmtOffset) * 1000;
+        int64_t minimum =
+          stats.minimum() +
+            (statContext.writerTimezone->getVariant(writerTimeSec).gmtOffset)
+              * 1000;
         _stats.setMinimum(minimum);
         _lowerBound = minimum;
       } else {
         _stats.setMinimum(0);
-        // subtract 1 day 1 hour (25 hours) in milliseconds to handle unknown 
TZ and daylight savings
+        // subtract 1 day 1 hour (25 hours) in milliseconds to handle unknown
+        // TZ and daylight savings
         _lowerBound = stats.minimum() - (25 * SECONDS_PER_HOUR * 1000);
       }
 
@@ -331,12 +345,15 @@ namespace orc {
       } else if (statContext.writerTimezone) {
         int64_t writerTimeSec = stats.maximum() / 1000;
         // multiply the offset by 1000 to convert to millisecond
-        int64_t maximum = stats.maximum() + 
(statContext.writerTimezone->getVariant(writerTimeSec).gmtOffset) * 1000;
+        int64_t maximum = stats.maximum() +
+          (statContext.writerTimezone->getVariant(writerTimeSec).gmtOffset)
+            * 1000;
         _stats.setMaximum(maximum);
         _upperBound = maximum;
       } else {
         _stats.setMaximum(0);
-        // add 1 day 1 hour (25 hours) in milliseconds to handle unknown TZ 
and daylight savings
+        // add 1 day 1 hour (25 hours) in milliseconds to handle unknown
+        // TZ and daylight savings
         _upperBound = stats.maximum() +  (25 * SECONDS_PER_HOUR * 1000);
       }
       // Add 1 millisecond to account for microsecond precision of values
@@ -344,44 +361,44 @@ namespace orc {
     }
   }
 
-  std::unique_ptr<ColumnStatistics> createColumnStatistics(
-    const Type& type, bool enableStringComparison) {
+  std::unique_ptr<MutableColumnStatistics> createColumnStatistics(
+    const Type& type) {
     switch (static_cast<int64_t>(type.getKind())) {
       case BOOLEAN:
-        return std::unique_ptr<ColumnStatistics>(
+        return std::unique_ptr<MutableColumnStatistics>(
           new BooleanColumnStatisticsImpl());
       case BYTE:
       case INT:
       case LONG:
       case SHORT:
-        return std::unique_ptr<ColumnStatistics>(
+        return std::unique_ptr<MutableColumnStatistics>(
           new IntegerColumnStatisticsImpl());
       case STRUCT:
       case MAP:
       case LIST:
       case UNION:
-        return std::unique_ptr<ColumnStatistics>(
+        return std::unique_ptr<MutableColumnStatistics>(
           new ColumnStatisticsImpl());
       case FLOAT:
       case DOUBLE:
-        return std::unique_ptr<ColumnStatistics>(
+        return std::unique_ptr<MutableColumnStatistics>(
           new DoubleColumnStatisticsImpl());
       case BINARY:
-        return std::unique_ptr<ColumnStatistics>(
+        return std::unique_ptr<MutableColumnStatistics>(
           new BinaryColumnStatisticsImpl());
       case STRING:
       case CHAR:
       case VARCHAR:
-        return std::unique_ptr<ColumnStatistics>(
-          new StringColumnStatisticsImpl(enableStringComparison));
+        return std::unique_ptr<MutableColumnStatistics>(
+          new StringColumnStatisticsImpl());
       case DATE:
-        return std::unique_ptr<ColumnStatistics>(
+        return std::unique_ptr<MutableColumnStatistics>(
           new DateColumnStatisticsImpl());
       case TIMESTAMP:
-        return std::unique_ptr<ColumnStatistics>(
+        return std::unique_ptr<MutableColumnStatistics>(
           new TimestampColumnStatisticsImpl());
       case DECIMAL:
-        return std::unique_ptr<ColumnStatistics>(
+        return std::unique_ptr<MutableColumnStatistics>(
           new DecimalColumnStatisticsImpl());
       default:
         throw NotImplementedYet("Not supported type: " + type.toString());

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/src/Statistics.hh
----------------------------------------------------------------------
diff --git a/c++/src/Statistics.hh b/c++/src/Statistics.hh
index 5773c7e..e878d48 100644
--- a/c++/src/Statistics.hh
+++ b/c++/src/Statistics.hh
@@ -74,7 +74,9 @@ namespace orc {
     // GET / SET _totalLength
     bool hasTotalLength() const { return _hasTotalLength; }
 
-    void setHasTotalLength(bool hasTotalLength) { _hasTotalLength = 
hasTotalLength; }
+    void setHasTotalLength(bool hasTotalLength) {
+      _hasTotalLength = hasTotalLength;
+    }
 
     uint64_t getTotalLength() const { return _totalLength; }
 
@@ -172,11 +174,32 @@ namespace orc {
   typedef InternalStatisticsImpl<Decimal> InternalDecimalStatistics;
   typedef InternalStatisticsImpl<std::string> InternalStringStatistics;
 
+  /**
+   * Mutable column statistics for use by the writer.
+   */
+  class MutableColumnStatistics {
+  public:
+    virtual ~MutableColumnStatistics();
+
+    virtual void increase(uint64_t count) = 0;
+
+    virtual void setNumberOfValues(uint64_t value) = 0;
+
+    virtual void setHasNull(bool hasNull) = 0;
+
+    virtual void merge(const MutableColumnStatistics& other) = 0;
+
+    virtual void reset() = 0;
+
+    virtual void toProtoBuf(proto::ColumnStatistics& pbStats) const = 0;
+  };
+
 /**
  * ColumnStatistics Implementation
  */
 
-  class ColumnStatisticsImpl: public ColumnStatistics {
+  class ColumnStatisticsImpl: public ColumnStatistics,
+                             public MutableColumnStatistics {
   private:
     InternalCharStatistics _stats;
   public:
@@ -188,11 +211,11 @@ namespace orc {
       return _stats.getNumberOfValues();
     }
 
-    void setNumberOfValues(uint64_t value) {
+    void setNumberOfValues(uint64_t value) override {
       _stats.setNumberOfValues(value);
     }
 
-    void increase(uint64_t count) {
+    void increase(uint64_t count) override {
       _stats.setNumberOfValues(_stats.getNumberOfValues() + count);
     }
 
@@ -200,19 +223,19 @@ namespace orc {
       return _stats.hasNull();
     }
 
-    void setHasNull(bool hasNull) {
+    void setHasNull(bool hasNull) override {
       _stats.setHasNull(hasNull);
     }
 
-    void merge(const ColumnStatisticsImpl& other) {
-      _stats.merge(other._stats);
+    void merge(const MutableColumnStatistics& other) override {
+      _stats.merge(dynamic_cast<const ColumnStatisticsImpl&>(other)._stats);
     }
 
-    void reset() {
+    void reset() override {
       _stats.reset();
     }
 
-    void toProtoBuf(proto::ColumnStatistics& pbStats) const {
+    void toProtoBuf(proto::ColumnStatistics& pbStats) const override {
       pbStats.set_hasnull(_stats.hasNull());
       pbStats.set_numberofvalues(_stats.getNumberOfValues());
     }
@@ -226,7 +249,8 @@ namespace orc {
     }
   };
 
-  class BinaryColumnStatisticsImpl: public BinaryColumnStatistics {
+  class BinaryColumnStatisticsImpl: public BinaryColumnStatistics,
+                                    public MutableColumnStatistics {
   private:
     InternalCharStatistics _stats;
   public:
@@ -239,11 +263,11 @@ namespace orc {
       return _stats.getNumberOfValues();
     }
 
-    void setNumberOfValues(uint64_t value) {
+    void setNumberOfValues(uint64_t value) override {
       _stats.setNumberOfValues(value);
     }
 
-    void increase(uint64_t count) {
+    void increase(uint64_t count) override {
       _stats.setNumberOfValues(_stats.getNumberOfValues() + count);
     }
 
@@ -251,7 +275,7 @@ namespace orc {
       return _stats.hasNull();
     }
 
-    void setHasNull(bool hasNull) {
+    void setHasNull(bool hasNull) override {
       _stats.setHasNull(hasNull);
     }
 
@@ -276,16 +300,18 @@ namespace orc {
       _stats.setTotalLength(_stats.getTotalLength() + length);
     }
 
-    void merge(const BinaryColumnStatisticsImpl& other) {
-      _stats.merge(other._stats);
+    void merge(const MutableColumnStatistics& other) override {
+      const BinaryColumnStatisticsImpl& binStats =
+        dynamic_cast<const BinaryColumnStatisticsImpl&>(other);
+      _stats.merge(binStats._stats);
     }
 
-    void reset() {
+    void reset() override {
       _stats.reset();
       setTotalLength(0);
     }
 
-    void toProtoBuf(proto::ColumnStatistics& pbStats) const {
+    void toProtoBuf(proto::ColumnStatistics& pbStats) const override {
       pbStats.set_hasnull(_stats.hasNull());
       pbStats.set_numberofvalues(_stats.getNumberOfValues());
 
@@ -307,7 +333,8 @@ namespace orc {
     }
   };
 
-  class BooleanColumnStatisticsImpl: public BooleanColumnStatistics {
+  class BooleanColumnStatisticsImpl: public BooleanColumnStatistics,
+                                     public MutableColumnStatistics {
   private:
     InternalBooleanStatistics _stats;
     bool _hasCount;
@@ -315,14 +342,15 @@ namespace orc {
 
   public:
     BooleanColumnStatisticsImpl() { reset(); }
-    BooleanColumnStatisticsImpl(const proto::ColumnStatistics& stats, const 
StatContext& statContext);
+    BooleanColumnStatisticsImpl(const proto::ColumnStatistics& stats,
+                                const StatContext& statContext);
     virtual ~BooleanColumnStatisticsImpl();
 
     bool hasCount() const override {
       return _hasCount;
     }
 
-    void increase(uint64_t count) {
+    void increase(uint64_t count) override {
       _stats.setNumberOfValues(_stats.getNumberOfValues() + count);
       _hasCount = true;
     }
@@ -331,7 +359,7 @@ namespace orc {
       return _stats.getNumberOfValues();
     }
 
-    void setNumberOfValues(uint64_t value) {
+    void setNumberOfValues(uint64_t value) override {
       _stats.setNumberOfValues(value);
     }
 
@@ -339,7 +367,7 @@ namespace orc {
       return _stats.hasNull();
     }
 
-    void setHasNull(bool hasNull) {
+    void setHasNull(bool hasNull) override {
       _stats.setHasNull(hasNull);
     }
 
@@ -370,18 +398,20 @@ namespace orc {
       }
     }
 
-    void merge(const BooleanColumnStatisticsImpl& other) {
-      _stats.merge(other._stats);
-      _hasCount = _hasCount && other._hasCount;
-      _trueCount += other._trueCount;
+    void merge(const MutableColumnStatistics& other) override {
+      const BooleanColumnStatisticsImpl& boolStats =
+        dynamic_cast<const BooleanColumnStatisticsImpl&>(other);
+      _stats.merge(boolStats._stats);
+      _hasCount = _hasCount && boolStats._hasCount;
+      _trueCount += boolStats._trueCount;
     }
 
-    void reset() {
+    void reset() override {
       _stats.reset();
       setTrueCount(0);
     }
 
-    void toProtoBuf(proto::ColumnStatistics& pbStats) const {
+    void toProtoBuf(proto::ColumnStatistics& pbStats) const override {
       pbStats.set_hasnull(_stats.hasNull());
       pbStats.set_numberofvalues(_stats.getNumberOfValues());
 
@@ -407,12 +437,14 @@ namespace orc {
     }
   };
 
-  class DateColumnStatisticsImpl: public DateColumnStatistics {
+  class DateColumnStatisticsImpl: public DateColumnStatistics,
+                                  public MutableColumnStatistics{
   private:
-    InternalDateStatistics _stats; 
+    InternalDateStatistics _stats;
   public:
     DateColumnStatisticsImpl() { reset(); }
-    DateColumnStatisticsImpl(const proto::ColumnStatistics& stats, const 
StatContext& statContext);
+    DateColumnStatisticsImpl(const proto::ColumnStatistics& stats,
+                             const StatContext& statContext);
     virtual ~DateColumnStatisticsImpl();
 
     bool hasMinimum() const override {
@@ -423,7 +455,7 @@ namespace orc {
       return _stats.hasMaximum();
     }
 
-    void increase(uint64_t count) {
+    void increase(uint64_t count) override {
       _stats.setNumberOfValues(_stats.getNumberOfValues() + count);
     }
 
@@ -431,7 +463,7 @@ namespace orc {
       return _stats.getNumberOfValues();
     }
 
-    void setNumberOfValues(uint64_t value) {
+    void setNumberOfValues(uint64_t value) override {
       _stats.setNumberOfValues(value);
     }
 
@@ -439,7 +471,7 @@ namespace orc {
       return _stats.hasNull();
     }
 
-    void setHasNull(bool hasNull) {
+    void setHasNull(bool hasNull) override {
       _stats.setHasNull(hasNull);
     }
 
@@ -473,20 +505,23 @@ namespace orc {
       _stats.updateMinMax(value);
     }
 
-    void merge(const DateColumnStatisticsImpl& other) {
-      _stats.merge(other._stats);
+    void merge(const MutableColumnStatistics& other) override {
+      const DateColumnStatisticsImpl& dateStats =
+        dynamic_cast<const DateColumnStatisticsImpl&>(other);
+      _stats.merge(dateStats._stats);
     }
 
-    void reset() {
+    void reset() override {
       _stats.reset();
     }
 
-    void toProtoBuf(proto::ColumnStatistics& pbStats) const {
+    void toProtoBuf(proto::ColumnStatistics& pbStats) const override {
       pbStats.set_hasnull(_stats.hasNull());
       pbStats.set_numberofvalues(_stats.getNumberOfValues());
 
       if (_stats.hasMinimum()) {
-        proto::DateStatistics* dateStatistics = 
pbStats.mutable_datestatistics();
+        proto::DateStatistics* dateStatistics =
+          pbStats.mutable_datestatistics();
         dateStatistics->set_maximum(_stats.getMaximum());
         dateStatistics->set_minimum(_stats.getMinimum());
       }
@@ -512,13 +547,15 @@ namespace orc {
     }
   };
 
-  class DecimalColumnStatisticsImpl: public DecimalColumnStatistics {
+  class DecimalColumnStatisticsImpl: public DecimalColumnStatistics,
+                                     public MutableColumnStatistics {
   private:
-    InternalDecimalStatistics _stats; 
+    InternalDecimalStatistics _stats;
 
   public:
     DecimalColumnStatisticsImpl() { reset(); }
-    DecimalColumnStatisticsImpl(const proto::ColumnStatistics& stats, const 
StatContext& statContext);
+    DecimalColumnStatisticsImpl(const proto::ColumnStatistics& stats,
+                                const StatContext& statContext);
     virtual ~DecimalColumnStatisticsImpl();
 
     bool hasMinimum() const override {
@@ -533,7 +570,7 @@ namespace orc {
       return _stats.hasSum();
     }
 
-    void increase(uint64_t count) {
+    void increase(uint64_t count) override {
       _stats.setNumberOfValues(_stats.getNumberOfValues() + count);
     }
 
@@ -541,7 +578,7 @@ namespace orc {
       return _stats.getNumberOfValues();
     }
 
-    void setNumberOfValues(uint64_t value) {
+    void setNumberOfValues(uint64_t value) override {
       _stats.setNumberOfValues(value);
     }
 
@@ -549,7 +586,7 @@ namespace orc {
       return _stats.hasNull();
     }
 
-    void setHasNull(bool hasNull) {
+    void setHasNull(bool hasNull) override {
       _stats.setHasNull(hasNull);
     }
 
@@ -600,21 +637,24 @@ namespace orc {
       }
     }
 
-    void merge(const DecimalColumnStatisticsImpl& other) {
-      _stats.merge(other._stats);
+    void merge(const MutableColumnStatistics& other) override {
+      const DecimalColumnStatisticsImpl& decStats =
+        dynamic_cast<const DecimalColumnStatisticsImpl&>(other);
 
-      _stats.setHasSum(_stats.hasSum() && other.hasSum());
+      _stats.merge(decStats._stats);
+
+      _stats.setHasSum(_stats.hasSum() && decStats.hasSum());
       if (_stats.hasSum()) {
-        updateSum(other.getSum());
+        updateSum(decStats.getSum());
       }
     }
 
-    void reset() {
+    void reset() override {
       _stats.reset();
       setSum(Decimal());
     }
 
-    void toProtoBuf(proto::ColumnStatistics& pbStats) const {
+    void toProtoBuf(proto::ColumnStatistics& pbStats) const override {
       pbStats.set_hasnull(_stats.hasNull());
       pbStats.set_numberofvalues(_stats.getNumberOfValues());
 
@@ -687,7 +727,8 @@ namespace orc {
     }
   };
 
-  class DoubleColumnStatisticsImpl: public DoubleColumnStatistics {
+  class DoubleColumnStatisticsImpl: public DoubleColumnStatistics,
+                                    public MutableColumnStatistics {
   private:
     InternalDoubleStatistics _stats;
   public:
@@ -707,7 +748,7 @@ namespace orc {
       return _stats.hasSum();
     }
 
-    void increase(uint64_t count) {
+    void increase(uint64_t count) override {
       _stats.setNumberOfValues(_stats.getNumberOfValues() + count);
     }
 
@@ -715,7 +756,7 @@ namespace orc {
       return _stats.getNumberOfValues();
     }
 
-    void setNumberOfValues(uint64_t value) {
+    void setNumberOfValues(uint64_t value) override {
       _stats.setNumberOfValues(value);
     }
 
@@ -723,7 +764,7 @@ namespace orc {
       return _stats.hasNull();
     }
 
-    void setHasNull(bool hasNull) {
+    void setHasNull(bool hasNull) override {
       _stats.setHasNull(hasNull);
     }
 
@@ -771,21 +812,23 @@ namespace orc {
       _stats.setSum(_stats.getSum() + value);
     }
 
-    void merge(const DoubleColumnStatisticsImpl& other) {
-      _stats.merge(other._stats);
+    void merge(const MutableColumnStatistics& other) override {
+      const DoubleColumnStatisticsImpl& doubleStats =
+        dynamic_cast<const DoubleColumnStatisticsImpl&>(other);
+      _stats.merge(doubleStats._stats);
 
-      _stats.setHasSum(_stats.hasSum() && other.hasSum());
+      _stats.setHasSum(_stats.hasSum() && doubleStats.hasSum());
       if (_stats.hasSum()) {
-        _stats.setSum(_stats.getSum() + other.getSum());
+        _stats.setSum(_stats.getSum() + doubleStats.getSum());
       }
     }
 
-    void reset() {
+    void reset() override {
       _stats.reset();
       setSum(0.0);
     }
 
-    void toProtoBuf(proto::ColumnStatistics& pbStats) const {
+    void toProtoBuf(proto::ColumnStatistics& pbStats) const override {
       pbStats.set_hasnull(_stats.hasNull());
       pbStats.set_numberofvalues(_stats.getNumberOfValues());
 
@@ -825,7 +868,8 @@ namespace orc {
     }
   };
 
-  class IntegerColumnStatisticsImpl: public IntegerColumnStatistics {
+  class IntegerColumnStatisticsImpl: public IntegerColumnStatistics,
+                                     public MutableColumnStatistics {
   private:
     InternalIntegerStatistics _stats;
   public:
@@ -845,7 +889,7 @@ namespace orc {
       return _stats.hasSum();
     }
 
-    void increase(uint64_t count) {
+    void increase(uint64_t count) override {
       _stats.setNumberOfValues(_stats.getNumberOfValues() + count);
     }
 
@@ -853,7 +897,7 @@ namespace orc {
       return _stats.getNumberOfValues();
     }
 
-    void setNumberOfValues(uint64_t value) {
+    void setNumberOfValues(uint64_t value) override {
       _stats.setNumberOfValues(value);
     }
 
@@ -861,7 +905,7 @@ namespace orc {
       return _stats.hasNull();
     }
 
-    void setHasNull(bool hasNull) {
+    void setHasNull(bool hasNull) override {
       _stats.setHasNull(hasNull);
     }
 
@@ -916,26 +960,29 @@ namespace orc {
       }
     }
 
-    void merge(const IntegerColumnStatisticsImpl& other) {
-      _stats.merge(other._stats);
+    void merge(const MutableColumnStatistics& other) override {
+      const IntegerColumnStatisticsImpl& intStats =
+        dynamic_cast<const IntegerColumnStatisticsImpl&>(other);
+
+      _stats.merge(intStats._stats);
 
       // update sum and check overflow
-      _stats.setHasSum(_stats.hasSum() && other.hasSum());
+      _stats.setHasSum(_stats.hasSum() && intStats.hasSum());
       if (_stats.hasSum()) {
         bool wasPositive = _stats.getSum() >= 0;
-        _stats.setSum(_stats.getSum() + other.getSum());
-        if ((other.getSum() >= 0) == wasPositive) {
+        _stats.setSum(_stats.getSum() + intStats.getSum());
+        if ((intStats.getSum() >= 0) == wasPositive) {
           _stats.setHasSum((_stats.getSum() >= 0) == wasPositive);
         }
       }
     }
 
-    void reset() {
+    void reset() override {
       _stats.reset();
       setSum(0);
     }
 
-    void toProtoBuf(proto::ColumnStatistics& pbStats) const {
+    void toProtoBuf(proto::ColumnStatistics& pbStats) const override {
       pbStats.set_hasnull(_stats.hasNull());
       pbStats.set_numberofvalues(_stats.getNumberOfValues());
 
@@ -975,17 +1022,17 @@ namespace orc {
     }
   };
 
-  class StringColumnStatisticsImpl: public StringColumnStatistics {
+  class StringColumnStatisticsImpl: public StringColumnStatistics,
+                                    public MutableColumnStatistics{
   private:
     InternalStringStatistics _stats;
-    bool _enableStringComparison;
 
   public:
-    StringColumnStatisticsImpl(bool enableStringComparision) {
-      _enableStringComparison = enableStringComparision;
+    StringColumnStatisticsImpl() {
       reset();
     }
-    StringColumnStatisticsImpl(const proto::ColumnStatistics& stats, const 
StatContext& statContext);
+    StringColumnStatisticsImpl(const proto::ColumnStatistics& stats,
+                               const StatContext& statContext);
     virtual ~StringColumnStatisticsImpl();
 
     bool hasMinimum() const override {
@@ -1000,7 +1047,7 @@ namespace orc {
       return _stats.hasTotalLength();
     }
 
-    void increase(uint64_t count) {
+    void increase(uint64_t count) override {
       _stats.setNumberOfValues(_stats.getNumberOfValues() + count);
     }
 
@@ -1008,7 +1055,7 @@ namespace orc {
       return _stats.getNumberOfValues();
     }
 
-    void setNumberOfValues(uint64_t value) {
+    void setNumberOfValues(uint64_t value) override {
       _stats.setNumberOfValues(value);
     }
 
@@ -1016,7 +1063,7 @@ namespace orc {
       return _stats.hasNull();
     }
 
-    void setHasNull(bool hasNull) {
+    void setHasNull(bool hasNull) override {
       _stats.setHasNull(hasNull);
     }
 
@@ -1060,7 +1107,7 @@ namespace orc {
     }
 
     void update(const char* value, size_t length) {
-      if (_enableStringComparison && value != nullptr) {
+      if (value != nullptr) {
         if (!_stats.hasMinimum()) {
           setMinimum(std::string(value, value + length));
           setMaximum(std::string(value, value + length));
@@ -1069,7 +1116,8 @@ namespace orc {
           int minCmp = strncmp(_stats.getMinimum().c_str(),
                                value,
                                std::min(_stats.getMinimum().length(), length));
-          if (minCmp > 0 || (minCmp == 0 && length < 
_stats.getMinimum().length())) {
+          if (minCmp > 0 ||
+                (minCmp == 0 && length < _stats.getMinimum().length())) {
             setMinimum(std::string(value, value + length));
           }
 
@@ -1077,7 +1125,8 @@ namespace orc {
           int maxCmp = strncmp(_stats.getMaximum().c_str(),
                                value,
                                std::min(_stats.getMaximum().length(), length));
-          if (maxCmp < 0 || (maxCmp == 0 && length > 
_stats.getMaximum().length())) {
+          if (maxCmp < 0 ||
+                (maxCmp == 0 && length > _stats.getMaximum().length())) {
             setMaximum(std::string(value, value + length));
           }
         }
@@ -1090,16 +1139,18 @@ namespace orc {
       update(value.c_str(), value.length());
     }
 
-    void merge(const StringColumnStatisticsImpl& other) {
-      _stats.merge(other._stats);
+    void merge(const MutableColumnStatistics& other) override {
+      const StringColumnStatisticsImpl& strStats =
+        dynamic_cast<const StringColumnStatisticsImpl&>(other);
+      _stats.merge(strStats._stats);
     }
 
-    void reset() {
+    void reset() override {
       _stats.reset();
       setTotalLength(0);
     }
 
-    void toProtoBuf(proto::ColumnStatistics& pbStats) const {
+    void toProtoBuf(proto::ColumnStatistics& pbStats) const override {
       pbStats.set_hasnull(_stats.hasNull());
       pbStats.set_numberofvalues(_stats.getNumberOfValues());
 
@@ -1139,7 +1190,8 @@ namespace orc {
     }
   };
 
-  class TimestampColumnStatisticsImpl: public TimestampColumnStatistics {
+  class TimestampColumnStatisticsImpl: public TimestampColumnStatistics,
+                                       public MutableColumnStatistics {
   private:
     InternalIntegerStatistics _stats;
     bool _hasLowerBound;
@@ -1165,11 +1217,11 @@ namespace orc {
       return _stats.getNumberOfValues();
     }
 
-    void setNumberOfValues(uint64_t value) {
+    void setNumberOfValues(uint64_t value) override {
       _stats.setNumberOfValues(value);
     }
 
-    void increase(uint64_t count) {
+    void increase(uint64_t count) override {
       _stats.setNumberOfValues(_stats.getNumberOfValues() + count);
     }
 
@@ -1177,7 +1229,7 @@ namespace orc {
       return _stats.hasNull();
     }
 
-    void setHasNull(bool hasNull) {
+    void setHasNull(bool hasNull) override {
       _stats.setHasNull(hasNull);
     }
 
@@ -1211,15 +1263,17 @@ namespace orc {
       _stats.updateMinMax(value);
     }
 
-    void merge(const TimestampColumnStatisticsImpl& other) {
-      _stats.merge(other._stats);
+    void merge(const MutableColumnStatistics& other) override {
+      const TimestampColumnStatisticsImpl& tsStats =
+        dynamic_cast<const TimestampColumnStatisticsImpl&>(other);
+      _stats.merge(tsStats._stats);
     }
 
-    void reset() {
+    void reset() override {
       _stats.reset();
     }
 
-    void toProtoBuf(proto::ColumnStatistics& pbStats) const {
+    void toProtoBuf(proto::ColumnStatistics& pbStats) const override {
       pbStats.set_hasnull(_stats.hasNull());
       pbStats.set_numberofvalues(_stats.getNumberOfValues());
 
@@ -1244,7 +1298,8 @@ namespace orc {
         secs = static_cast<time_t>(getMinimum() / 1000);
         gmtime_r(&secs, &tmValue);
         strftime(timeBuffer, sizeof(timeBuffer), "%Y-%m-%d %H:%M:%S", 
&tmValue);
-        buffer << "Minimum: " << timeBuffer << "." << (getMinimum() % 1000) << 
std::endl;
+        buffer << "Minimum: " << timeBuffer << "."
+               << (getMinimum() % 1000) << std::endl;
       }else{
         buffer << "Minimum is not defined" << std::endl;
       }
@@ -1253,7 +1308,8 @@ namespace orc {
         secs = static_cast<time_t>(getLowerBound() / 1000);
         gmtime_r(&secs, &tmValue);
         strftime(timeBuffer, sizeof(timeBuffer), "%Y-%m-%d %H:%M:%S", 
&tmValue);
-        buffer << "LowerBound: " << timeBuffer << "." << (getLowerBound() % 
1000) << std::endl;
+        buffer << "LowerBound: " << timeBuffer << "."
+               << (getLowerBound() % 1000) << std::endl;
       }else{
         buffer << "LowerBound is not defined" << std::endl;
       }
@@ -1262,7 +1318,8 @@ namespace orc {
         secs = static_cast<time_t>(getMaximum()/1000);
         gmtime_r(&secs, &tmValue);
         strftime(timeBuffer, sizeof(timeBuffer), "%Y-%m-%d %H:%M:%S", 
&tmValue);
-        buffer << "Maximum: " << timeBuffer << "." << (getMaximum() % 1000) << 
std::endl;
+        buffer << "Maximum: " << timeBuffer << "."
+               << (getMaximum() % 1000) << std::endl;
       }else{
         buffer << "Maximum is not defined" << std::endl;
       }
@@ -1271,7 +1328,8 @@ namespace orc {
         secs = static_cast<time_t>(getUpperBound() / 1000);
         gmtime_r(&secs, &tmValue);
         strftime(timeBuffer, sizeof(timeBuffer), "%Y-%m-%d %H:%M:%S", 
&tmValue);
-        buffer << "UpperBound: " << timeBuffer << "." << (getUpperBound() % 
1000) << std::endl;
+        buffer << "UpperBound: " << timeBuffer << "."
+               << (getUpperBound() % 1000) << std::endl;
       }else{
         buffer << "UpperBound is not defined" << std::endl;
       }
@@ -1338,16 +1396,18 @@ namespace orc {
   class StripeStatisticsImpl: public StripeStatistics {
   private:
     std::unique_ptr<StatisticsImpl> columnStats;
-    std::vector<std::vector<std::shared_ptr<const ColumnStatistics> > > 
rowIndexStats;
+    std::vector<std::vector<std::shared_ptr<const ColumnStatistics> > >
+                                                                  
rowIndexStats;
 
     // DELIBERATELY NOT IMPLEMENTED
     StripeStatisticsImpl(const StripeStatisticsImpl&);
     StripeStatisticsImpl& operator=(const StripeStatisticsImpl&);
 
   public:
-    StripeStatisticsImpl(const proto::StripeStatistics& stripeStats,
-                   std::vector<std::vector<proto::ColumnStatistics> >& 
indexStats,
-                   const StatContext& statContext);
+    StripeStatisticsImpl(
+                const proto::StripeStatistics& stripeStats,
+                std::vector<std::vector<proto::ColumnStatistics> >& indexStats,
+                const StatContext& statContext);
 
     virtual const ColumnStatistics* getColumnStatistics(uint32_t columnId
                                                         ) const override {
@@ -1358,7 +1418,8 @@ namespace orc {
       return columnStats->getNumberOfColumns();
     }
 
-    virtual const ColumnStatistics* getRowIndexStatistics(uint32_t columnId, 
uint32_t rowIndex
+    virtual const ColumnStatistics* getRowIndexStatistics(uint32_t columnId,
+                                                          uint32_t rowIndex
                                                         ) const override {
       // check id indices are valid
       return rowIndexStats[columnId][rowIndex].get();
@@ -1374,11 +1435,10 @@ namespace orc {
   /**
    * Create ColumnStatistics for writers
    * @param type of column
-   * @param enableStringComparison whether enable string columns comparision
-   * @return ColumnStatistics instances
+   * @return MutableColumnStatistics instances
    */
-  std::unique_ptr<ColumnStatistics> createColumnStatistics(
-    const Type& type, bool enableStringComparison);
+  std::unique_ptr<MutableColumnStatistics> createColumnStatistics(
+                                                            const Type& type);
 
 }// namespace
 

http://git-wip-us.apache.org/repos/asf/orc/blob/b82e2f4f/c++/src/Vector.cc
----------------------------------------------------------------------
diff --git a/c++/src/Vector.cc b/c++/src/Vector.cc
index 2c7e2d3..811bcac 100644
--- a/c++/src/Vector.cc
+++ b/c++/src/Vector.cc
@@ -34,7 +34,7 @@ namespace orc {
                                           notNull(pool, cap),
                                           hasNulls(false),
                                           memoryPool(pool) {
-    // PASS
+    std::memset(notNull.data(), 1, capacity);
   }
 
   ColumnVectorBatch::~ColumnVectorBatch() {

Reply via email to